-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbasicoperations.py
62 lines (49 loc) · 2.32 KB
/
basicoperations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import sys
INCOME_FILE = sys.argv[1]
SCHEMA = "FIPS INT,State STRING,County STRING,MedHHInc BIGINT,PerCapitaInc LONG,PovertyUnder18Pct DOUBLE, PovertyAllAgesPct DOUBLE,Deep_Pov_All DOUBLE,Deep_Pov_Children DOUBLE,PovertyUnder18Num LONG,PovertyAllAgesNum LONG"
if __name__ == '__main__':
#python basicoperations.py ..\Rural_Atlas_Update20\Income.csv
print ('running basic operations in spark')
spark = (SparkSession
.builder
.appName('basicOperations')
.getOrCreate())
#Read data
df = spark.read.csv(INCOME_FILE, header=True, schema=SCHEMA)
# df.show(1)
print(df.columns)
#select coumns
poverty_df = df.select('state', 'county', 'PovertyAllAgesPct', 'Deep_Pov_All','PovertyAllAgesNum')
poverty_df.show(10)
print('Distinct states')
poverty_df.select('state').distinct().orderBy('state').show()
print('Find total population and add that column, drop all ages num column')
poverty_df = poverty_df.withColumn(
'Population', col('PovertyAllAgesNum') / col('PovertyAllAgesPct')
).drop('PovertyAllAgesNum')
poverty_df.show(5)
print('Rename columns')
poverty_df = poverty_df.withColumnRenamed('PovertyAllAgesPct', 'PovertyPct').withColumnRenamed('Deep_Pov_All', 'DeepPovertyPct')
poverty_df.show(5)
#aggregate
print('total population and average poverty percentage')
state_df = poverty_df.groupBy('state')\
.agg(
sum('Population').alias('population'),
avg('PovertyPct').alias('avgPovertyPct'),
avg('DeepPovertyPct')
)\
.sort(col('state'),ascending = True)
state_df.show(5)
#filter using row value
#collect() creates an array of row from dataframe
print('Find state with maximum poverty')
#not very elegant solution. How to print other columns?
max_row = state_df.groupBy().agg(max('avgPovertyPct')).collect()[0]
max_poverty_val = max_row[0]
state_df.filter(col('avgPovertyPct') == max_poverty _val).show()
#easier way
rows = state_df.sort(col('avgPovertyPct'), ascending=False).head(1)#head does not return dataframe but list of rows.
spark.createDataFrame(rows, schema = state_df.schema).show()