-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl_web_to_gcs.py
54 lines (47 loc) · 2.02 KB
/
etl_web_to_gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import pandas as pd
import math
from pathlib import Path
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
@task(retries=3)
def extract(dataset_url: str) -> pd.DataFrame:
"""Reads NYC MVC data into pandas DataFrame."""
return pd.read_csv(dataset_url)
@task(log_prints=True)
def transform(data: pd.DataFrame) -> pd.DataFrame:
"""Merges columns CRASH DATE and CRASH TIME into a new column CRASH DATETIME and transforms ZIP CODE column to int."""
def transform_zipcode(zip_code):
if isinstance(zip_code, str):
zip_code = zip_code.strip()
if zip_code:
return int(zip_code)
return -1
if math.isnan(zip_code):
return -1
return int(zip_code)
data['ZIP CODE'] = data['ZIP CODE'].apply(transform_zipcode)
data['CRASH DATETIME'] = data['CRASH DATE'] + ' ' + data['CRASH TIME']
data['CRASH DATETIME'] = pd.to_datetime(data['CRASH DATETIME'], format='%m/%d/%Y %H:%M')
data.drop(['CRASH DATE', 'CRASH TIME'], axis=1, inplace=True)
return data
@task()
def write_to_local(data: pd.DataFrame, data_dir: str = 'raw', dataset_filename: str = 'nyc_mvc') -> Path:
"""Writes a DataFrame as a parquet file."""
Path(data_dir).mkdir(parents=True, exist_ok=True)
path = Path(f'{data_dir}/{dataset_filename}.parquet')
data.to_parquet(path, compression='gzip')
return path
@task()
def write_to_gcs(path: Path) -> None:
"""Uploads the local parquet file to GCS."""
gcp_cloud_storage_bucket_block = GcsBucket.load("nyc-mvc-bucket")
gcp_cloud_storage_bucket_block.upload_from_path(from_path=path, to_path=path)
@flow()
def etl_web_to_gcs(dataset_url: str = 'https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv') -> None:
"""Runs the NYC Motor Vehicle Collisions ETL flow. Extracts data from the web, transforms and loads it into GCS."""
data = extract(dataset_url)
data = transform(data)
path = write_to_local(data)
write_to_gcs(path)
if __name__ == '__main__':
etl_web_to_gcs()