BigData/Spark

Spark DataFrame01 (Pyspark)

Jonghee Jeon 2020. 4. 11. 16:10

Spark에서 Row와 Column의 형태로 RDD를 표현하여 처리 할 수 있음 타입

 - Python의 Pandas 패키지의 DataFrame과 R의 DataFrame과 동일한 개념

 - Spark 2.x에서 Catalyst Optimizer의 도입으로 인해 Spark에서 지원하는 프로그래밍 타입 별 처리 성능이 동일하게 향상되었음

 

 

Spark의 데이터 처리 실행계획

 

Spark 지원 언어별 성능

 

 

 

 

 

Spark DataFrame

 
  • spark.read.csv()
  • spark.read.json()
  • spark.read.format("csv")
  • spark.read.format("json")
 
  • file://
  • hdfs://
  • hbase://
  • s3://
In [1]:
stock = spark.read.csv("data/appl_stock.csv", inferSchema=True, header = True)
In [2]:
stock.printSchema()
 
root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

In [4]:
stock.columns
Out[4]:
['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']
In [6]:
stock.describe().show()
 
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|        702.409988|        705.070023|        699.569977|       702.100021|          470249500|127.96609099999999|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+

In [7]:
stock.summary().show()
 
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|              Open|              High|               Low|             Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+
|  count|              1762|              1762|              1762|              1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457| 312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008| 185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|         90.279999|           11475900|         24.881912|
|    25%|        115.199997|        116.349998|             114.0|        115.190002|           49161400|         50.260037|
|    50%|        317.990002|320.18001200000003|        316.340004|318.21000699999996|           80500000| 72.95419100000001|
|    75%|470.94001799999995|478.55001799999997|468.05001799999997|472.69001799999995|          121095800|        100.228673|
|    max|        702.409988|        705.070023|        699.569977|        702.100021|          470249500|127.96609099999999|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+

 

pandas Dataframe -> spark Datafrmae

In [9]:
import pandas as pd
In [10]:
pdf = pd.DataFrame({
    'x': [[1,2,3], [4,5,6]],
    'y': [['a','b','c'], ['d','e','f']]
})
In [11]:
pdf
Out[11]:
  x y
0 [1, 2, 3] [a, b, c]
1 [4, 5, 6] [d, e, f]
In [12]:
sdf = spark.createDataFrame(pdf)
In [13]:
sdf.show()
 
+---------+---------+
|        x|        y|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
|[4, 5, 6]|[d, e, f]|
+---------+---------+

 

RDD -> Dataframe

In [16]:
from pyspark.sql import Row
In [17]:
rdd = spark.sparkContext.parallelize([
    Row(x=[1,2,3], y=['a','b','c']),
    Row(x=[4,5,6], y=['d','e','f'])
])
In [18]:
rdf = spark.createDataFrame(rdd)
In [19]:
rdf.show()
 
+---------+---------+
|        x|        y|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
|[4, 5, 6]|[d, e, f]|
+---------+---------+

 

json -> Dataframe

In [20]:
jdf = spark.read.json("data/people.json")
In [21]:
jdf.show()
 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

 

DataFrame Schema 설정

In [24]:
from pyspark.sql.types import StructField, IntegerType, StringType, StringType, StructType
In [23]:
data_schema = [StructField('age', IntegerType(), True), \
               StructField('name', StringType(), True)]
In [25]:
struct_schema = StructType(fields=data_schema)
In [26]:
jdf = spark.read.json("data/people.json", schema=struct_schema)
In [27]:
jdf.show()
 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

In [28]:
jdf.printSchema()
 
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

In [ ]: