-
Notifications
You must be signed in to change notification settings - Fork 0
/
6.2_callbacks_hw.py
123 lines (96 loc) · 4.3 KB
/
6.2_callbacks_hw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
### Uso de los argumentos "execution_timeout", "on_success_callback"
### y "on_failure_callback".
El primero de ellos nos va a permitir esperar una determinada
cantidad de tiempo para que una tarea finalice, una vez que haya
pasado el tiempo determinado, la tarea se definirá como fallida.
El segundo y el tercero son argumentos que nos van a permitir
generar un callback a una función definida previamente. Esto quiere
decir que podemos hacer un callback si la tarea es fallida o tuvo
éxito.
Este DAG fue una tarea.
"""
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.utils.helpers import cross_downstream
from airflow.exceptions import AirflowTaskTimeout
def failure_callback(context):
if (isinstance(context['exception'], AirflowTaskTimeout)):
print('The task timed out')
else:
print('Other Error')
with DAG(
'6.2_callbacks_hw',
start_date=datetime(2023,2,21),
default_args={"owner":"Andrade"},
schedule='@daily',
catchup=False,
tags=['Curso 4', 'The Operators Guide']
) as dag:
dag.doc_md = __doc__
extract_a = BashOperator(
owner='Tinmar',
task_id='extract_a',
bash_command="echo 'extract_a' && sleep 10",
execution_timeout=timedelta(seconds=(10)),
on_failure_callback=failure_callback
)
extract_b = BashOperator(
owner='Armando',
task_id='extract_b',
bash_command="echo 'extract_b' && sleep 5",
execution_timeout=timedelta(seconds=(5)),
on_failure_callback=failure_callback
)
process_a = BashOperator(
owner='Armando',
task_id='process_a',
bash_command="echo 'process_a' && sleep 5",
pool='sequential_pool'
)
clean_a = BashOperator(
task_id='clean_a',
bash_command="echo 'clean_a' && sleep 5",
trigger_rule='one_failed'
)
process_b = BashOperator(
owner='Armando',
task_id='process_b',
bash_command="echo 'process_b' && sleep 5",
pool='sequential_pool'
)
clean_b = BashOperator(
task_id='clean_b',
bash_command="echo 'clean_b' && sleep 5",
trigger_rule='one_failed'
)
process_c = BashOperator(
owner='Armando',
task_id='process_c',
bash_command="echo 'process_c' && exit 1",
pool='sequential_pool'
)
clean_c = BashOperator(
task_id='clean_c',
bash_command="echo 'clean_c' && sleep 5",
trigger_rule='one_failed'
)
store = BashOperator(
task_id='store',
bash_command="sleep 10 && exit 0",
retries=3,
retry_delay=timedelta(seconds=10),
retry_exponential_backoff=True,
trigger_rule='all_done'
)
# [extract_a,extract_b] >> [process_a,process_b,process_c] >> task_b # Este arreglo de dependencias no funciona
cross_downstream([extract_a,extract_b], [process_a,process_b,process_c])
process_a >> clean_a
process_b >> clean_b
process_c >> clean_c
[clean_c,clean_a,clean_b] >> store
# process_a >> clean_a
# process_b >> clean_b
# process_c >> clean_c
# [process_a,process_b,process_c] >> store