-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathdaily_euro_ohlc_dag.py
69 lines (63 loc) · 2.28 KB
/
daily_euro_ohlc_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
The daily_euro_ohlc_dag DAG updates the currency table in Bigquey every day.
"""
from datetime import datetime
from json import loads
from airflow import DAG
from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema
from stellar_etl_airflow.build_coingecko_api_to_gcs_task import response_to_gcs
from stellar_etl_airflow.default import alert_after_max_retries, alert_sla_miss
with DAG(
dag_id="daily_euro_ohlc_dag",
start_date=datetime(2023, 1, 1, 0, 0),
description="This DAG updates the currency tables in Bigquey every day",
schedule_interval="35 0 * * *",
params={
"alias": "euro",
},
user_defined_filters={"fromjson": lambda s: loads(s)},
catchup=False,
sla_miss_callback=alert_sla_miss,
) as dag:
currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True)
project_name = Variable.get("bq_project")
dataset_name = Variable.get("bq_dataset")
bucket_name = Variable.get("currency_bucket")
columns = currency_ohlc["columns_ohlc_currency"]
currency = currency_ohlc["currency"]
today = "{{ ds }}"
filename = f"{currency}_{today}.csv"
upload_to_gcs = PythonOperator(
task_id=f"upload_{currency}_to_gcs",
python_callable=response_to_gcs,
op_kwargs={
"bucket_name": bucket_name,
"endpoint": currency_ohlc["endpoint"],
"destination_blob_name": filename,
"columns": columns,
},
dag=dag,
)
gcs_to_bq = GCSToBigQueryOperator(
task_id=f"send_{currency}_to_bq",
bucket=bucket_name,
schema_fields=read_local_schema(currency),
autodetect=False,
source_format="CSV",
source_objects=filename,
destination_project_dataset_table="{}.{}.{}".format(
project_name, dataset_name, currency_ohlc["table_name"]
),
write_disposition="WRITE_APPEND",
create_disposition="CREATE_IF_NEEDED",
max_bad_records=10,
on_failure_callback=alert_after_max_retries,
dag=dag,
)
upload_to_gcs >> gcs_to_bq