-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcsv_to_delta.py
43 lines (38 loc) · 2.28 KB
/
csv_to_delta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyspark.sql.types import *
from pyspark.sql.functions import col, year, month, dayofmonth, current_date, quarter
# data can be downloaded from here:
#
# https://azuresynapsestorage.blob.core.windows.net/sampledata/WideWorldImportersDW/csv/full/fact_sale_1y_full/
#
# it can't be read into spark directly by https. It shall be accessed via abffs from your own storage account or from local storage"
file_location = "abfss://<<your container>>@<<your storage>>.dfs.core.windows.net/<<path to csv files>>"
fact_sale_1y_full_schema = StructType([
StructField('SaleKey', LongType(), True),
StructField('CityKey', IntegerType(), True),
StructField('CustomerKey', IntegerType(), True),
StructField('BillToCustomerKey', IntegerType(), True),
StructField('StockItemKey', IntegerType(), True),
StructField('InvoiceDateKey', TimestampType(), True),
StructField('DeliveryDateKey', TimestampType(), True),
StructField('SalespersonKey', IntegerType(), True),
StructField('WWIInvoiceID', IntegerType(), True),
StructField('Description', StringType(), True),
StructField('Package', StringType(), True),
StructField('Quantity', IntegerType(), True),
StructField('UnitPrice', DecimalType(18,2), True),
StructField('TaxRate', DecimalType(18,3), True),
StructField('TotalExcludingTax', DecimalType(29,2), True),
StructField('TaxAmount', DecimalType(38,6), True),
StructField('Profit', DecimalType(18,2), True),
StructField('TotalIncludingTax', DecimalType(38,6), True),
StructField('TotalDryItems', IntegerType(), True),
StructField('TotalChillerItems', IntegerType(), True),
StructField('LineageKey', IntegerType(), True),
StructField('Year', IntegerType(), True),
StructField('Quarter', IntegerType(), True),
StructField('Month', IntegerType(), True)])
df = spark.read.format("csv").schema(fact_sale_1y_full_schema).option("header","true").load(file_location)
df = df.withColumn('Year', year(col("InvoiceDateKey")))
df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
df = df.withColumn('Month', month(col("InvoiceDateKey")))
df.repartition(2).write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("abfss://<<your container>>@<<your storage>>.dfs.core.windows.net/<<path to delta table>>")