BigData/Spark

Spark RDD 문법

Jonghee Jeon 2020. 4. 6. 23:42
  •  대화형 콘솔

     - Spark는 기본적으로 Scala, Python, SQL, R 등 대화형 콘솔 명령 프롬프트를 제공하고 있다.

 

  • sc 객체

  - SparkContext라는 Spark에서 기존에 사용하였던 객체

sc

SparkContext

Spark UI

Version
v2.4.4
Master
local[*]
AppName
PySparkShell

 

  • sc 객체를 이용한 RDD 생성
rdd = sc.parallelize([1, 2, 3])
rdd.collect()
[1, 2, 3]

type(rdd)
pyspark.rdd.RDD

list_set = [('cat', 'dog'),(1, 2, 3)]
rdd = sc.parallelize(list_set)
rdd.collect()
[('cat', 'dog'), (1, 2, 3)]

rdd_text = sc.textFile("file:///opt/spark/current/README.md")
rdd_text.count()
105

 

  • Spark 객체

 - Spark를 입력하여 보면 아래와 출력됨을 확인 할 수 있으며, 2.x에서는 Spark라는 객체로 SparkContext를 포함한 다른 Context객채를 포함하여 Spark 객체로 제공하고 있다.

Spark

SparkSession - hive
SparkContext

Spark UI
Version
v2.4.4
Master
local[*]
AppName
PySparkShell
# Spark 객체를 이용한 RDD 생성

spark.sparkContext.parallelize([1, 2, 3])
ParallelCollectionRDD[8] at parallelize at PythonRDD.scala:195

spark.sparkContext.parallelize([1, 2, 3]).collect()
[1, 2, 3]

spark.sparkContext.parallelize([1, 2, 3]).glom()
PythonRDD[8] at RDD at PythonRDD.scala:53

 

  • Spark 기본 연산
words = "Apache Spark Bigdata Processing Simple".split(" ")
words_rdd = spark.sparkContext.parallelize(words, 2)

# Count
words_rdd.count()
5

#glom - 생성된 파티션 갯수 확인
words_rdd.glom().collect()
[['Apache', 'Spark'], ['Bigdata', 'Processing', 'Simple']]

# first
words_rdd.first()
'Apache'

# take
words_rdd.take(3)
['Apache', 'Spark', 'Bigdata']

# filter()
words_rdd.filter(lambda word: word.startswith("S")).collect()
['Spark', 'Simple']
words_rdd.filter(words_rdd > 3)

# map
words_rdd2 = words_rdd.map(lambda word: (word, word[0], word.startswith("S")))
words_rdd2.collect()
[('Apache', 'A', False),
 ('Spark', 'S', True),
 ('Bigdata', 'B', False),
 ('Processing', 'P', False),
 ('Simple', 'S', True)]
 
 
 # flatMap
 words_rdd.flatMap(lambda word:list(word)).collect()
 ['A',
 'p',
 'a',
 'c',
 'h',
 'e',
 'S',
 'p',
 'a',
 'r',
 'k',
 'B',
 'i',
 'g',
 'd',
 'a',
 't',
 'a',
 'P',
 'r',
 'o',
 'c',
 'e',
 's',
 's',
 'i',
 'n',
 'g',
 'S',
 'i',
 'm',
 'p',
 'l',
 'e']
 
 # reduce
 A = sc.parallelize([1, 2, 3, 4, 5,6 ,7, 8, 9,10])
 A.reduce(lambda x, y : x+y)
 55
 
 # distinct
 A = sc.parallelize([1, 2, 2, 2, 2, 1, 1, 3, 3, 4])
 A.distinct().collect()
 [2, 4, 1, 3]
 
 
 # randomSplit
 Split = words_rdd.randomSplit([0.5, 0.5])
 
 # WordCount
 words = spark.sparkContext.textFile("file:///opt/spark/current/README.md")\
.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word:(word, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.take(10)
[('#', 1),
 ('Apache', 1),
 ('Spark', 16),
 ('', 72),
 ('is', 7),
 ('It', 2),
 ('provides', 1),
 ('high-level', 1),
 ('APIs', 1),
 ('in', 6)]

 

  • Spark에서 자주 사용되는 함수
* printSchema() - 데이터프레임이 스키마 정보 출력
* corr(col1, col2) - col1, col2의 상관계수 
* cov(co1, col2) - col1, col2의 공분산
* crosstab(col1, col2) - col1, col2의 교차표 
* describe() - 데이터프레임이 요약정보 출력
* summary() - 데이터프레임이 요약정보 출력
* na.drop() - 데이터프레임의 null 값 제거 (any, all)
* na.fill()  - 데이터프레임의 null 값을 해당값으로 채움
* groupby() - 컬럼을 기준으로 그룹 요약
* select() - 컬럼을 선택
* sort() - 컬럼을 기준으로 데이터프레임을 정렬
* orderby() - 컬럼을 기준으로 데이터프레임을 정렬
* withColumn - 데이터프레임에 새로운 컬럼 추가
* withColumnRenamed() - 데이터프레임의 컬럼명을 변경
* filter() - 특정 기준으로 컬럼의 데이터를 filter
* from pyspark.sql.functions import * - 그 외 max, min, avg, 문자열 함수 등 존재
* from pyspark.sql.functions import month, hour, year - 날짜 관련 함수
* from pyspark.sql.functions import format_number - 출력되는 데이터의 자릿수 설정
* toPandas() - Pandas Dataframe으로 전환
* createDataframe() - Spark Dataframe으로 전환