BigData/Spark

Spark DataFrame 03 (Pyspark)

Jonghee Jeon 2020. 4. 11. 16:18
Spark DataFrame 03

Titanic DataFrame 생성

In [1]:
import pandas as pd
In [15]:
data1 = {'PassengerId':{0:1, 1:2, 2:3, 3:4, 4:5},
         'Name' : {0:'Owen', 1:'Florence', 2:'Laina', 3:'Lily', 4:"William"},
         'sex' : {0: 'male', 1: 'female', 2:'female', 3:'female', 4:'male'},
         'Survived': {0:0, 1:1, 2:1, 3:1, 4:0}
        }

data2 = {'PassengerId':{0:1, 1:2, 2:3, 3:4, 4:5},
         'Age' : {0: 22, 1: 38, 2: 33, 3: 35, 4: 35},
         'Fare' : {0: 7.3, 1: 71.3, 2:7.9, 3:53.1, 4:8.0},
         'Pclass': {0:3, 1:1, 2:3, 3:1, 4:3}
        }
df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())
In [16]:
df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
In [5]:
df1.printSchema()
root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- Survived: long (nullable = true)

In [6]:
df1.show()
+-----------+--------+------+--------+
|PassengerId|    Name|   sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+

count - 갯수

In [7]:
df1.count()
Out[7]:
5
In [8]:
df2.count()
Out[8]:
5

select - 변수 선택

In [9]:
cols = ["PassengerId", "Name"]
df1.select(cols).show()
+-----------+--------+
|PassengerId|    Name|
+-----------+--------+
|          1|    Owen|
|          2|Florence|
|          3|   Laina|
|          4|    Lily|
|          5| William|
+-----------+--------+

filter - 조건

In [12]:
df1.filter(df1.sex == 'female').show()
+-----------+--------+------+--------+
|PassengerId|    Name|   sex|Survived|
+-----------+--------+------+--------+
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
+-----------+--------+------+--------+

withcolumn - 컬럼생성

In [17]:
df2.withColumn('AgeFare', df2.Age*df2.Fare).show()
+-----------+---+----+------+-------+
|PassengerId|Age|Fare|Pclass|AgeFare|
+-----------+---+----+------+-------+
|          1| 22| 7.3|     3|  160.6|
|          2| 38|71.3|     1| 2709.4|
|          3| 33| 7.9|     3|  260.7|
|          4| 35|53.1|     1| 1858.5|
|          5| 35| 8.0|     3|  280.0|
+-----------+---+----+------+-------+

요약집계 groupby, avg, agg

In [20]:
gdf2 = df2.groupBy('Pclass')
In [21]:
avg_Cols = ['Age', 'Fare']
gdf2.avg(*avg_Cols).show()
+------+--------+-----------------+
|Pclass|avg(Age)|        avg(Fare)|
+------+--------+-----------------+
|     1|    36.5|             62.2|
|     3|    30.0|7.733333333333333|
+------+--------+-----------------+

sort 정렬

In [23]:
df2.sort('Fare', ascending=False).show()
+-----------+---+----+------+
|PassengerId|Age|Fare|Pclass|
+-----------+---+----+------+
|          2| 38|71.3|     1|
|          4| 35|53.1|     1|
|          5| 35| 8.0|     3|
|          3| 33| 7.9|     3|
|          1| 22| 7.3|     3|
+-----------+---+----+------+

데이터 프레임 결합 join

In [26]:
df1.join(df2, ['PassengerId']).show()
+-----------+--------+------+--------+---+----+------+
|PassengerId|    Name|   sex|Survived|Age|Fare|Pclass|
+-----------+--------+------+--------+---+----+------+
|          5| William|  male|       0| 35| 8.0|     3|
|          1|    Owen|  male|       0| 22| 7.3|     3|
|          3|   Laina|female|       1| 33| 7.9|     3|
|          2|Florence|female|       1| 38|71.3|     1|
|          4|    Lily|female|       1| 35|53.1|     1|
+-----------+--------+------+--------+---+----+------+

In [27]:
df1.createOrReplaceTempView('df1_tmp')
df2.createOrReplaceTempView('df2_tmp')
In [28]:
query = """
select *
from df1_tmp a
join df2_tmp b
on a.PassengerId = b.PassengerId
"""
dfj = spark.sql(query)
In [29]:
dfj.show()
+-----------+--------+------+--------+-----------+---+----+------+
|PassengerId|    Name|   sex|Survived|PassengerId|Age|Fare|Pclass|
+-----------+--------+------+--------+-----------+---+----+------+
|          5| William|  male|       0|          5| 35| 8.0|     3|
|          1|    Owen|  male|       0|          1| 22| 7.3|     3|
|          3|   Laina|female|       1|          3| 33| 7.9|     3|
|          2|Florence|female|       1|          2| 38|71.3|     1|
|          4|    Lily|female|       1|          4| 35|53.1|     1|
+-----------+--------+------+--------+-----------+---+----+------+

In [31]:
df1.union(df1).show()
+-----------+--------+------+--------+
|PassengerId|    Name|   sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+

In [32]:
df1.explain()
== Physical Plan ==
Scan ExistingRDD[PassengerId#69L,Name#70,sex#71,Survived#72L]
In [33]:
dfj.explain()
== Physical Plan ==
*(5) SortMergeJoin [PassengerId#69L], [PassengerId#77L], Inner
:- *(2) Sort [PassengerId#69L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(PassengerId#69L, 200)
:     +- *(1) Filter isnotnull(PassengerId#69L)
:        +- Scan ExistingRDD[PassengerId#69L,Name#70,sex#71,Survived#72L]
+- *(4) Sort [PassengerId#77L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(PassengerId#77L, 200)
      +- *(3) Filter isnotnull(PassengerId#77L)
         +- Scan ExistingRDD[PassengerId#77L,Age#78L,Fare#79,Pclass#80L]
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: