BigData/Spark
Spark RDD 문법
Jonghee Jeon
2020. 4. 6. 23:42
- 대화형 콘솔
- Spark는 기본적으로 Scala, Python, SQL, R 등 대화형 콘솔 명령 프롬프트를 제공하고 있다.
- sc 객체
- SparkContext라는 Spark에서 기존에 사용하였던 객체
sc
SparkContext
Spark UI
Version
v2.4.4
Master
local[*]
AppName
PySparkShell
- sc 객체를 이용한 RDD 생성
rdd = sc.parallelize([1, 2, 3])
rdd.collect()
[1, 2, 3]
type(rdd)
pyspark.rdd.RDD
list_set = [('cat', 'dog'),(1, 2, 3)]
rdd = sc.parallelize(list_set)
rdd.collect()
[('cat', 'dog'), (1, 2, 3)]
rdd_text = sc.textFile("file:///opt/spark/current/README.md")
rdd_text.count()
105
- Spark 객체
- Spark를 입력하여 보면 아래와 출력됨을 확인 할 수 있으며, 2.x에서는 Spark라는 객체로 SparkContext를 포함한 다른 Context객채를 포함하여 Spark 객체로 제공하고 있다.
Spark
SparkSession - hive
SparkContext
Spark UI
Version
v2.4.4
Master
local[*]
AppName
PySparkShell
# Spark 객체를 이용한 RDD 생성
spark.sparkContext.parallelize([1, 2, 3])
ParallelCollectionRDD[8] at parallelize at PythonRDD.scala:195
spark.sparkContext.parallelize([1, 2, 3]).collect()
[1, 2, 3]
spark.sparkContext.parallelize([1, 2, 3]).glom()
PythonRDD[8] at RDD at PythonRDD.scala:53
- Spark 기본 연산
words = "Apache Spark Bigdata Processing Simple".split(" ")
words_rdd = spark.sparkContext.parallelize(words, 2)
# Count
words_rdd.count()
5
#glom - 생성된 파티션 갯수 확인
words_rdd.glom().collect()
[['Apache', 'Spark'], ['Bigdata', 'Processing', 'Simple']]
# first
words_rdd.first()
'Apache'
# take
words_rdd.take(3)
['Apache', 'Spark', 'Bigdata']
# filter()
words_rdd.filter(lambda word: word.startswith("S")).collect()
['Spark', 'Simple']
words_rdd.filter(words_rdd > 3)
# map
words_rdd2 = words_rdd.map(lambda word: (word, word[0], word.startswith("S")))
words_rdd2.collect()
[('Apache', 'A', False),
('Spark', 'S', True),
('Bigdata', 'B', False),
('Processing', 'P', False),
('Simple', 'S', True)]
# flatMap
words_rdd.flatMap(lambda word:list(word)).collect()
['A',
'p',
'a',
'c',
'h',
'e',
'S',
'p',
'a',
'r',
'k',
'B',
'i',
'g',
'd',
'a',
't',
'a',
'P',
'r',
'o',
'c',
'e',
's',
's',
'i',
'n',
'g',
'S',
'i',
'm',
'p',
'l',
'e']
# reduce
A = sc.parallelize([1, 2, 3, 4, 5,6 ,7, 8, 9,10])
A.reduce(lambda x, y : x+y)
55
# distinct
A = sc.parallelize([1, 2, 2, 2, 2, 1, 1, 3, 3, 4])
A.distinct().collect()
[2, 4, 1, 3]
# randomSplit
Split = words_rdd.randomSplit([0.5, 0.5])
# WordCount
words = spark.sparkContext.textFile("file:///opt/spark/current/README.md")\
.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word:(word, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.take(10)
[('#', 1),
('Apache', 1),
('Spark', 16),
('', 72),
('is', 7),
('It', 2),
('provides', 1),
('high-level', 1),
('APIs', 1),
('in', 6)]
- Spark에서 자주 사용되는 함수
* printSchema() - 데이터프레임이 스키마 정보 출력
* corr(col1, col2) - col1, col2의 상관계수
* cov(co1, col2) - col1, col2의 공분산
* crosstab(col1, col2) - col1, col2의 교차표
* describe() - 데이터프레임이 요약정보 출력
* summary() - 데이터프레임이 요약정보 출력
* na.drop() - 데이터프레임의 null 값 제거 (any, all)
* na.fill() - 데이터프레임의 null 값을 해당값으로 채움
* groupby() - 컬럼을 기준으로 그룹 요약
* select() - 컬럼을 선택
* sort() - 컬럼을 기준으로 데이터프레임을 정렬
* orderby() - 컬럼을 기준으로 데이터프레임을 정렬
* withColumn - 데이터프레임에 새로운 컬럼 추가
* withColumnRenamed() - 데이터프레임의 컬럼명을 변경
* filter() - 특정 기준으로 컬럼의 데이터를 filter
* from pyspark.sql.functions import * - 그 외 max, min, avg, 문자열 함수 등 존재
* from pyspark.sql.functions import month, hour, year - 날짜 관련 함수
* from pyspark.sql.functions import format_number - 출력되는 데이터의 자릿수 설정
* toPandas() - Pandas Dataframe으로 전환
* createDataframe() - Spark Dataframe으로 전환