-
Notifications
You must be signed in to change notification settings - Fork 119
/
Copy pathspark_hive.py
28 lines (22 loc) · 1.04 KB
/
spark_hive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = '/home/awantik/spark-warehouse'
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH '/home/awantik/packages/spark-2.4.3-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT * FROM src")
df.show()
spark.sql("CREATE TABLE IF NOT EXISTS newsrc (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'")
spark.sql("LOAD DATA LOCAL INPATH '/home/awantik/emp.txt' INTO TABLE newsrc")
df2 = spark.sql("SELECT * FROM newsrc")
df2.show()
df = df2.unionAll(df)
df.show()