-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_k8s_executor.py
77 lines (63 loc) · 1.75 KB
/
example_k8s_executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from time import sleep
import os
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_kubernetes_executor', default_args=args,
schedule_interval=None
)
affinity = {
'podAntiAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': [
{
'topologyKey': 'kubernetes.io/hostname',
'labelSelector': {
'matchExpressions': [
{
'key': 'app',
'operator': 'In',
'values': ['airflow']
}
]
}
}
]
}
}
tolerations = [{
'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'
}]
def print_stuff():
print("stuffFFFF!")
sleep(120)
def print_stuff1():
print("stuff1!")
sleep(240)
def print_stuff2():
print("stuff2!")
sleep(480)
# You don't have to use any special KubernetesExecutor configuration if you don't want to
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag
)
# But you can if you want to
one_task = PythonOperator(
task_id="one_task", python_callable=print_stuff1, dag=dag
)
# Use the airflow -h binary
two_task = PythonOperator(
task_id="two_task", python_callable=print_stuff2, dag=dag
)
# Add arbitrary labels to worker pods
four_task = PythonOperator(
task_id="four_task", python_callable=print_stuff2, dag=dag,
executor_config={"KubernetesExecutor": {"labels": {"foo": "bar"}}}
)
start_task.set_downstream([one_task, two_task, four_task])