Spark ML (Pyspark)
-
Spark ML은 Spark의 머신러닝 라이브러리
-
Regression/Classification/Clustering/collaborative filtering 등의 알고리즘을 제공하고 있으며, 아직까지는 딥러닝 기능은 제공되고 있지 않음
-
그 외에 다양한 Featurization과 Pipeline 등도 제공되며 Spark 2.x 버전은 두 개의 라이브러리로 제공되어 사용 할 수 있음
Spark ML에서 제공되는 하위 기능들에 대한 설명
- org.apache.spark.mllib
• 스파크 저 수준 RDD API를 위한 인터페이스 제공
• 향후 Spark 3.0 에서는 RDD기반의 API는 제거 될 예정
- org.apache.spark.ml
• 공식적으로 권장하고 있음
• DataFrame을 사용할 수 있는 인터페이스가 포함
• 선행 단계 수행 과정을 표준화 인터페이스 제공
• 머신러닝 파이프라인 구축 인터페이스 제공
- Spark의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
- ML Dataset
• DataFrame을 주요 데이터로 동작
- Transformer
• 원시 데이터를 다양한 방식으로 변환하는 함수
• DataFrame을 입출력 타입으로 사용
• transform이라는 이름의 메소드
• transform(DataFrame) = DataFrame_input + new Column
- FeatureTransformer
• 입력 데이터셋 컬럼의 변형을 적용하여 새로운 컬럼을 추가한 DF리턴
- Model
• 머신 러닝 모델을 의미
• DataFrame을 입력으로 각 특성 벡터에 의해 예측 된 새로운 DataFrame 출력
- Estimator
• 훈련용 데이터 세트에 머신 러닝 모델을 훈련시키거나 적합하도록 조정
• 기계학습 알고리즘을 의미
• DataFrame을 입력으로 받아 머신러닝 모델을 리턴하는 fit이라는 메소드 구현
- Pipeline
• 머신 러닝 워크플로를 형성하기 위해 특정 순서로 다수의 transformer와 estimator를 연결
• 머신 러닝 워크플로는 데이터 프로세싱, 특성 추출, 모델 훈련까지 구성
- Evaluator
• 모델의 예측 성능과 효과성을 평가
Spark ML 예제
- Regression
adDF = spark.read.csv("data/Advertising.csv", inferSchema=True, header=True)
adDF.show(5)
adDF.count()
from pyspark.ml.feature import RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
RFormula¶
adRF = RFormula().setFormula("Sales ~.").setFeaturesCol("features").setLabelCol("label")
adRF_fit = adRF.fit(adDF).transform(adDF)
Vectors¶
adV = adDF.rdd.map(lambda x: [Vectors.dense(x[0:3]), x[-1]]).toDF(['features', 'label'])
Model Build¶
lr = LinearRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(adV)
lr_model.save
Prediction¶
pred = lr_model.transform(adV)
pred.show(5)
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.setMetricName('r2').evaluate(pred)
Cross-validation¶
train, test = adV.randomSplit([0.7, 0.3])
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0, 0.5, 1]) \
.addGrid(lr.elasticNetParam, [0, 0.5, 1]).build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, \
evaluator=evaluator, numFolds=5)
cv_model = cv.fit(train)
Prediction¶
cv_train_pred = cv_model.transform(train)
cv_test_pred = cv_model.transform(test)
cv_test_pred_summary = cv_test_pred.summary
Evaluation¶
evaluator.setMetricName('r2').evaluate(cv_train_pred)
evaluator.setMetricName('r2').evaluate(cv_test_pred)
cv_model.bestModel.coefficients
cv_model.bestModel.elasticNetParam