Skip to content

Commit 478dc2c

Browse files
Optimized DAGs parse time (#258)
* replace `Variable.get()` when field is templatable * streamline imports * remove `Variable.get()` dependency from non templatable field * only set `config_file` when in default namespace
1 parent 7e3f883 commit 478dc2c

25 files changed

+239
-233
lines changed

dags/asset_#_pipeline_dag.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import datetime
2-
import json
2+
from json import loads
33

44
from airflow import DAG
55
from airflow.models.variable import Variable
@@ -16,7 +16,7 @@
1616
description="This DAG runs dbt to calculate asset # based on stablecoin and XLM trades",
1717
schedule_interval="0 2 * * *", # daily at 2am
1818
params={},
19-
user_defined_filters={"fromjson": lambda s: json.loads(s)},
19+
user_defined_filters={"fromjson": lambda s: loads(s)},
2020
user_defined_macros={
2121
"subtract_data_interval": macros.subtract_data_interval,
2222
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,

dags/audit_log_dag.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""
22
This DAG runs an audit log SQL to update the audit log dashboard.
33
"""
4-
import datetime
5-
import json
4+
from datetime import datetime
5+
from json import loads
66

77
from airflow import DAG
88
from airflow.models import Variable
@@ -16,12 +16,12 @@
1616
"audit_log_dag",
1717
default_args=get_default_dag_args(),
1818
description="This DAG runs periodically to update the audit log dashboard.",
19-
start_date=datetime.datetime(2023, 1, 1, 0, 0),
19+
start_date=datetime(2023, 1, 1, 0, 0),
2020
schedule_interval="10 9 * * *",
2121
params={
2222
"alias": "audit-log",
2323
},
24-
user_defined_filters={"fromjson": lambda s: json.loads(s)},
24+
user_defined_filters={"fromjson": lambda s: loads(s)},
2525
catchup=False,
2626
)
2727

dags/bucket_list_dag.py

+34-26
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
to stop exporting. This end ledger is determined by when the Airflow DAG is run. This DAG should be triggered manually
55
when initializing the tables in order to catch up to the current state in the network, but should not be scheduled to run constantly.
66
"""
7-
import ast
8-
import datetime
9-
import json
7+
from ast import literal_eval
8+
from datetime import datetime
9+
from json import loads
1010

1111
from airflow import DAG
1212
from airflow.models import Variable
@@ -23,29 +23,31 @@
2323
dag = DAG(
2424
"bucket_list_export",
2525
default_args=get_default_dag_args(),
26-
start_date=datetime.datetime(2021, 10, 15),
27-
end_date=datetime.datetime(2021, 10, 15),
26+
start_date=datetime(2021, 10, 15),
27+
end_date=datetime(2021, 10, 15),
2828
description="This DAG loads a point forward view of state tables. Caution: Does not capture historical changes!",
2929
schedule_interval="@daily",
3030
params={
3131
"alias": "bucket",
3232
},
33-
user_defined_filters={"fromjson": lambda s: json.loads(s)},
33+
user_defined_filters={
34+
"fromjson": lambda s: loads(s),
35+
"literal_eval": lambda e: literal_eval(e),
36+
},
3437
user_defined_macros={
3538
"subtract_data_interval": macros.subtract_data_interval,
3639
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
3740
},
3841
)
3942

40-
file_names = Variable.get("output_file_names", deserialize_json=True)
4143
table_names = Variable.get("table_ids", deserialize_json=True)
42-
internal_project = Variable.get("bq_project")
43-
internal_dataset = Variable.get("bq_dataset")
44-
public_project = Variable.get("public_project")
45-
public_dataset = Variable.get("public_dataset")
46-
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
47-
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))
48-
44+
internal_project = "{{ var.value.bq_project }}"
45+
internal_dataset = "{{ var.value.bq_dataset }}"
46+
public_project = "{{ var.value.public_project }}"
47+
public_dataset = "{{ var.value.public_dataset }}"
48+
public_dataset_new = "{{ var.value.public_dataset_new }}"
49+
use_testnet = "{{ var.value.use_testnet | literal_eval }}"
50+
use_futurenet = "{{ var.value.use_futurenet | literal_eval }}"
4951
"""
5052
The time task reads in the execution time of the current run, as well as the next
5153
execution time. It converts these two times into ledger ranges.
@@ -60,7 +62,7 @@
6062
dag,
6163
"bucket",
6264
"export_accounts",
63-
file_names["accounts"],
65+
"{{ var.json.output_file_names.accounts }}",
6466
use_testnet=use_testnet,
6567
use_futurenet=use_futurenet,
6668
use_gcs=True,
@@ -69,7 +71,7 @@
6971
dag,
7072
"bucket",
7173
"export_claimable_balances",
72-
file_names["claimable_balances"],
74+
"{{ var.json.output_file_names.claimable_balances }}",
7375
use_testnet=use_testnet,
7476
use_futurenet=use_futurenet,
7577
use_gcs=True,
@@ -78,7 +80,7 @@
7880
dag,
7981
"bucket",
8082
"export_offers",
81-
file_names["offers"],
83+
"{{ var.json.output_file_names.offers }}",
8284
use_testnet=use_testnet,
8385
use_futurenet=use_futurenet,
8486
use_gcs=True,
@@ -87,7 +89,7 @@
8789
dag,
8890
"bucket",
8991
"export_pools",
90-
file_names["liquidity_pools"],
92+
"{{ var.json.output_file_names.liquidity_pools }}",
9193
use_testnet=use_testnet,
9294
use_futurenet=use_futurenet,
9395
use_gcs=True,
@@ -96,7 +98,7 @@
9698
dag,
9799
"bucket",
98100
"export_signers",
99-
file_names["signers"],
101+
"{{ var.json.output_file_names.signers }}",
100102
use_testnet=use_testnet,
101103
use_futurenet=use_futurenet,
102104
use_gcs=True,
@@ -105,7 +107,7 @@
105107
dag,
106108
"bucket",
107109
"export_trustlines",
108-
file_names["trustlines"],
110+
"{{ var.json.output_file_names.trustlines }}",
109111
use_testnet=use_testnet,
110112
use_futurenet=use_futurenet,
111113
use_gcs=True,
@@ -131,37 +133,37 @@
131133
dag, internal_project, internal_dataset, table_names["accounts"]
132134
)
133135
delete_acc_pub_task = build_delete_data_task(
134-
dag, public_project, public_dataset, table_names["accounts"]
136+
dag, public_project, public_dataset, table_names["accounts"], "pub"
135137
)
136138
delete_bal_task = build_delete_data_task(
137139
dag, internal_project, internal_dataset, table_names["claimable_balances"]
138140
)
139141
delete_bal_pub_task = build_delete_data_task(
140-
dag, public_project, public_dataset, table_names["claimable_balances"]
142+
dag, public_project, public_dataset, table_names["claimable_balances"], "pub"
141143
)
142144
delete_off_task = build_delete_data_task(
143145
dag, internal_project, internal_dataset, table_names["offers"]
144146
)
145147
delete_off_pub_task = build_delete_data_task(
146-
dag, public_project, public_dataset, table_names["offers"]
148+
dag, public_project, public_dataset, table_names["offers"], "pub"
147149
)
148150
delete_pool_task = build_delete_data_task(
149151
dag, internal_project, internal_dataset, table_names["liquidity_pools"]
150152
)
151153
delete_pool_pub_task = build_delete_data_task(
152-
dag, public_project, public_dataset, table_names["liquidity_pools"]
154+
dag, public_project, public_dataset, table_names["liquidity_pools"], "pub"
153155
)
154156
delete_sign_task = build_delete_data_task(
155157
dag, internal_project, internal_dataset, table_names["signers"]
156158
)
157159
delete_sign_pub_task = build_delete_data_task(
158-
dag, public_project, public_dataset, table_names["signers"]
160+
dag, public_project, public_dataset, table_names["signers"], "pub"
159161
)
160162
delete_trust_task = build_delete_data_task(
161163
dag, internal_project, internal_dataset, table_names["trustlines"]
162164
)
163165
delete_trust_pub_task = build_delete_data_task(
164-
dag, public_project, public_dataset, table_names["trustlines"]
166+
dag, public_project, public_dataset, table_names["trustlines"], "pub"
165167
)
166168

167169
"""
@@ -244,6 +246,7 @@
244246
"",
245247
partition=True,
246248
cluster=True,
249+
dataset_type="pub",
247250
)
248251
send_bal_to_pub_task = build_gcs_to_bq_task(
249252
dag,
@@ -254,6 +257,7 @@
254257
"",
255258
partition=True,
256259
cluster=True,
260+
dataset_type="pub",
257261
)
258262
send_off_to_pub_task = build_gcs_to_bq_task(
259263
dag,
@@ -264,6 +268,7 @@
264268
"",
265269
partition=True,
266270
cluster=True,
271+
dataset_type="pub",
267272
)
268273
send_pool_to_pub_task = build_gcs_to_bq_task(
269274
dag,
@@ -274,6 +279,7 @@
274279
"",
275280
partition=True,
276281
cluster=True,
282+
dataset_type="pub",
277283
)
278284
send_sign_to_pub_task = build_gcs_to_bq_task(
279285
dag,
@@ -284,6 +290,7 @@
284290
"",
285291
partition=True,
286292
cluster=True,
293+
dataset_type="pub",
287294
)
288295
send_trust_to_pub_task = build_gcs_to_bq_task(
289296
dag,
@@ -294,6 +301,7 @@
294301
"",
295302
partition=True,
296303
cluster=True,
304+
dataset_type="pub",
297305
)
298306

299307
(

dags/daily_euro_ohlc_dag.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
The daily_euro_ohlc_dag DAG updates the currency table in Bigquey every day.
33
"""
44

5-
import datetime
6-
import json
5+
from datetime import datetime
6+
from json import loads
77

88
from airflow import DAG
99
from airflow.decorators import dag
@@ -18,13 +18,13 @@
1818

1919
with DAG(
2020
dag_id="daily_euro_ohlc_dag",
21-
start_date=datetime.datetime(2023, 1, 1, 0, 0),
21+
start_date=datetime(2023, 1, 1, 0, 0),
2222
description="This DAG updates the currency tables in Bigquey every day",
2323
schedule_interval="35 0 * * *",
2424
params={
2525
"alias": "euro",
2626
},
27-
user_defined_filters={"fromjson": lambda s: json.loads(s)},
27+
user_defined_filters={"fromjson": lambda s: loads(s)},
2828
catchup=False,
2929
) as dag:
3030
currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True)

dags/dataset_reset_dag.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""
22
When the Test net server is reset, the dataset reset DAG deletes all the datasets in the test Hubble.
33
"""
4-
import ast
5-
import datetime
6-
import json
4+
from ast import literal_eval
5+
from datetime import datetime
6+
from json import loads
77

88
from airflow import DAG
99
from airflow.models import Variable
@@ -20,13 +20,13 @@
2020
"testnet_data_reset",
2121
default_args=get_default_dag_args(),
2222
description="This DAG runs after the Testnet data reset that occurs periodically.",
23-
start_date=datetime.datetime(2023, 1, 1, 0, 0),
23+
start_date=datetime(2023, 1, 1, 0, 0),
2424
schedule_interval="10 9 * * *",
25-
is_paused_upon_creation=ast.literal_eval(Variable.get("use_testnet")),
25+
is_paused_upon_creation=literal_eval(Variable.get("use_testnet")),
2626
params={
2727
"alias": "testnet-reset",
2828
},
29-
user_defined_filters={"fromjson": lambda s: json.loads(s)},
29+
user_defined_filters={"fromjson": lambda s: loads(s)},
3030
)
3131

3232
internal_project = "test-hubble-319619"

dags/enriched_tables_dag.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import datetime
1+
from datetime import datetime
22

33
from airflow import DAG
4-
from airflow.models.variable import Variable
54
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
65
from stellar_etl_airflow.build_dbt_task import build_dbt_task
76
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
@@ -11,7 +10,7 @@
1110
dag = DAG(
1211
"enriched_tables",
1312
default_args=get_default_dag_args(),
14-
start_date=datetime.datetime(2023, 4, 12, 0, 0),
13+
start_date=datetime(2023, 4, 12, 0, 0),
1514
description="This DAG runs dbt to create the tables for the models in marts/enriched/.",
1615
schedule_interval="*/30 * * * *", # Runs every 30 mins
1716
params={},

0 commit comments

Comments
 (0)