BigData/Spark

Spark ML (Pyspark)

Jonghee Jeon 2020. 4. 25. 18:48
  •  Spark ML은 Spark의 머신러닝 라이브러리

  •  Regression/Classification/Clustering/collaborative filtering 등의 알고리즘을 제공하고 있으며, 아직까지는 딥러닝 기능은 제공되고 있지 않음

  •  그 외에 다양한 Featurization과 Pipeline 등도 제공되며 Spark 2.x 버전은 두 개의 라이브러리로 제공되어 사용 할 수 있음


Spark ML에서 제공되는 하위  기능들에 대한 설명

 

- org.apache.spark.mllib

   • 스파크 저 수준 RDD API를 위한 인터페이스 제공

   • 향후 Spark 3.0 에서는 RDD기반의 API는 제거 될 예정

 

 - org.apache.spark.ml

   • 공식적으로 권장하고 있음

   • DataFrame을 사용할 수 있는 인터페이스가 포함

   • 선행 단계 수행 과정을 표준화 인터페이스 제공

   • 머신러닝 파이프라인 구축 인터페이스 제공

 

 - Spark의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용

 

 - ML Dataset

   • DataFrame을 주요 데이터로 동작

 

 - Transformer

   • 원시 데이터를 다양한 방식으로 변환하는 함수

   • DataFrame을 입출력 타입으로 사용

   • transform이라는 이름의 메소드

   • transform(DataFrame) = DataFrame_input + new Column

 

 - FeatureTransformer

   • 입력 데이터셋 컬럼의 변형을 적용하여 새로운 컬럼을 추가한 DF리턴

 

 - Model

   • 머신 러닝 모델을 의미

   • DataFrame을 입력으로 각 특성 벡터에 의해 예측 된 새로운 DataFrame 출력

 

 - Estimator

   • 훈련용 데이터 세트에 머신 러닝 모델을 훈련시키거나 적합하도록 조정

   • 기계학습 알고리즘을 의미

   • DataFrame을 입력으로 받아 머신러닝 모델을 리턴하는 fit이라는 메소드 구현

 

 - Pipeline

   • 머신 러닝 워크플로를 형성하기 위해 특정 순서로 다수의 transformer와 estimator를 연결

   • 머신 러닝 워크플로는 데이터 프로세싱, 특성 추출, 모델 훈련까지 구성

 

- Evaluator

   • 모델의 예측 성능과 효과성을 평가

 

Spark ML 예제

 - Regression

 

 

 

In [1]:
adDF = spark.read.csv("data/Advertising.csv", inferSchema=True, header=True)
In [2]:
adDF.show(5)
 
+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows

In [7]:
adDF.count()
Out[7]:
200
In [3]:
from pyspark.ml.feature import RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
 

RFormula

In [4]:
adRF = RFormula().setFormula("Sales ~.").setFeaturesCol("features").setLabelCol("label")
In [11]:
adRF_fit = adRF.fit(adDF).transform(adDF)
 

Vectors

In [6]:
adV = adDF.rdd.map(lambda x: [Vectors.dense(x[0:3]), x[-1]]).toDF(['features', 'label'])
 

Model Build

In [8]:
lr = LinearRegression(featuresCol='features', labelCol='label')
In [10]:
lr_model = lr.fit(adV)
In [ ]:
lr_model.save
 

Prediction

In [13]:
pred = lr_model.transform(adV)
pred.show(5)
 
+-----------------+-----+------------------+
|         features|label|        prediction|
+-----------------+-----+------------------+
|[230.1,37.8,69.2]| 22.1| 20.52397440971517|
| [44.5,39.3,45.1]| 10.4|12.337854820894362|
| [17.2,45.9,69.3]|  9.3|12.307670779994238|
|[151.5,41.3,58.5]| 18.5| 17.59782951168913|
|[180.8,10.8,58.4]| 12.9|13.188671856831299|
+-----------------+-----+------------------+
only showing top 5 rows

In [14]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.setMetricName('r2').evaluate(pred)
Out[14]:
0.897210638178952
 

Cross-validation

In [20]:
train, test = adV.randomSplit([0.7, 0.3])
In [21]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
In [22]:
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0, 0.5, 1]) \
                               .addGrid(lr.elasticNetParam, [0, 0.5, 1]).build()
In [23]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, \
                    evaluator=evaluator, numFolds=5)
In [24]:
cv_model = cv.fit(train)
 

Prediction

In [25]:
cv_train_pred = cv_model.transform(train)
cv_test_pred = cv_model.transform(test)
In [33]:
cv_test_pred_summary = cv_test_pred.summary
 

Evaluation

In [26]:
evaluator.setMetricName('r2').evaluate(cv_train_pred)
Out[26]:
0.9024696179882863
In [27]:
evaluator.setMetricName('r2').evaluate(cv_test_pred)
Out[27]:
0.8724906273151293
In [28]:
cv_model.bestModel.coefficients
Out[28]:
DenseVector([0.0485, 0.1889, -0.0059])
In [32]:
cv_model.bestModel.elasticNetParam
Out[32]:
Param(parent='LinearRegression_7fd0e29301b4', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty')
In [ ]: