Skip to content

Commit 99e713e

Browse files
authored
[BACKPORT] Prevent using trigger_rule=TriggerRule.ALWAYS in a task-generated mapping within bare tasks (#44751) (#44769)
1 parent b5f033a commit 99e713e

File tree

6 files changed

+75
-7
lines changed

6 files changed

+75
-7
lines changed

airflow/decorators/base.py

+21
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,12 @@ def _validate_arg_names(self, func: ValidationSource, kwargs: dict[str, Any]):
403403
super()._validate_arg_names(func, kwargs)
404404

405405
def expand(self, **map_kwargs: OperatorExpandArgument) -> XComArg:
406+
if self.kwargs.get("trigger_rule") == TriggerRule.ALWAYS and any(
407+
[isinstance(expanded, XComArg) for expanded in map_kwargs.values()]
408+
):
409+
raise ValueError(
410+
"Task-generated mapping within a task using 'expand' is not allowed with trigger rule 'always'."
411+
)
406412
if not map_kwargs:
407413
raise TypeError("no arguments to expand against")
408414
self._validate_arg_names("expand", map_kwargs)
@@ -416,6 +422,21 @@ def expand(self, **map_kwargs: OperatorExpandArgument) -> XComArg:
416422
return self._expand(DictOfListsExpandInput(map_kwargs), strict=False)
417423

418424
def expand_kwargs(self, kwargs: OperatorExpandKwargsArgument, *, strict: bool = True) -> XComArg:
425+
if (
426+
self.kwargs.get("trigger_rule") == TriggerRule.ALWAYS
427+
and not isinstance(kwargs, XComArg)
428+
and any(
429+
[
430+
isinstance(v, XComArg)
431+
for kwarg in kwargs
432+
if not isinstance(kwarg, XComArg)
433+
for v in kwarg.values()
434+
]
435+
)
436+
):
437+
raise ValueError(
438+
"Task-generated mapping within a task using 'expand_kwargs' is not allowed with trigger rule 'always'."
439+
)
419440
if isinstance(kwargs, Sequence):
420441
for item in kwargs:
421442
if not isinstance(item, (XComArg, Mapping)):

airflow/utils/task_group.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,9 @@ def __iter__(self):
610610

611611
for child in self.children.values():
612612
if isinstance(child, AbstractOperator) and child.trigger_rule == TriggerRule.ALWAYS:
613-
raise ValueError("Tasks in a mapped task group cannot have trigger_rule set to 'ALWAYS'")
613+
raise ValueError(
614+
"Task-generated mapping within a mapped task group is not allowed with trigger rule 'always'"
615+
)
614616
yield from self._iter_child(child)
615617

616618
def iter_mapped_dependencies(self) -> Iterator[Operator]:

docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst

+6-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ The grid view also provides visibility into your mapped tasks in the details pan
8484

8585
Although we show a "reduce" task here (``sum_it``) you don't have to have one, the mapped tasks will still be executed even if they have no downstream tasks.
8686

87-
.. warning:: ``TriggerRule.ALWAYS`` cannot be utilized in expanded tasks
88-
89-
Assigning ``trigger_rule=TriggerRule.ALWAYS`` in expanded tasks is forbidden, as expanded parameters will be undefined with the task's immediate execution.
90-
This is enforced at the time of the DAG parsing, and will raise an error if you try to use it.
9187

9288
Task-generated Mapping
9389
----------------------
@@ -113,6 +109,12 @@ The above examples we've shown could all be achieved with a ``for`` loop in the
113109
114110
The ``make_list`` task runs as a normal task and must return a list or dict (see `What data types can be expanded?`_), and then the ``consumer`` task will be called four times, once with each value in the return of ``make_list``.
115111

112+
.. warning:: Task-generated mapping cannot be utilized with ``TriggerRule.ALWAYS``
113+
114+
Assigning ``trigger_rule=TriggerRule.ALWAYS`` in task-generated mapping is not allowed, as expanded parameters are undefined with the task's immediate execution.
115+
This is enforced at the time of the DAG parsing, for both tasks and mapped tasks groups, and will raise an error if you try to use it.
116+
In the recent example, setting ``trigger_rule=TriggerRule.ALWAYS`` in the ``consumer`` task will raise an error since ``make_list`` is a task-generated mapping.
117+
116118
Repeated mapping
117119
----------------
118120

newsfragments/44751.bugfix.rst

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
``TriggerRule.ALWAYS`` cannot be utilized within a task-generated mapping, either in bare tasks (fixed in this PR) or mapped task groups (fixed in PR #44368). The issue with doing so, is that the task is immediately executed without waiting for the upstreams's mapping results, which certainly leads to failure of the task. This fix avoids it by raising an exception when it is detected during DAG parsing.

tests/decorators/test_mapped.py

+41
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20+
import pytest
21+
22+
from airflow.decorators import task
2023
from airflow.models.dag import DAG
2124
from airflow.utils.task_group import TaskGroup
2225
from tests.models import DEFAULT_DATE
@@ -36,3 +39,41 @@ def f(z):
3639

3740
dag.get_task("t1") == x1.operator
3841
dag.get_task("g.t2") == x2.operator
42+
43+
44+
@pytest.mark.db_test
45+
def test_fail_task_generated_mapping_with_trigger_rule_always__exapnd(dag_maker, session):
46+
with DAG(dag_id="d", schedule=None, start_date=DEFAULT_DATE):
47+
48+
@task
49+
def get_input():
50+
return ["world", "moon"]
51+
52+
@task(trigger_rule="always")
53+
def hello(input):
54+
print(f"Hello, {input}")
55+
56+
with pytest.raises(
57+
ValueError,
58+
match="Task-generated mapping within a task using 'expand' is not allowed with trigger rule 'always'",
59+
):
60+
hello.expand(input=get_input())
61+
62+
63+
@pytest.mark.db_test
64+
def test_fail_task_generated_mapping_with_trigger_rule_always__exapnd_kwargs(dag_maker, session):
65+
with DAG(dag_id="d", schedule=None, start_date=DEFAULT_DATE):
66+
67+
@task
68+
def get_input():
69+
return ["world", "moon"]
70+
71+
@task(trigger_rule="always")
72+
def hello(input, input2):
73+
print(f"Hello, {input}, {input2}")
74+
75+
with pytest.raises(
76+
ValueError,
77+
match="Task-generated mapping within a task using 'expand_kwargs' is not allowed with trigger rule 'always'",
78+
):
79+
hello.expand_kwargs([{"input": get_input(), "input2": get_input()}])

tests/decorators/test_task_group.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def tg():
135135

136136

137137
@pytest.mark.db_test
138-
def test_expand_fail_trigger_rule_always(dag_maker, session):
138+
def test_fail_task_generated_mapping_with_trigger_rule_always(dag_maker, session):
139139
@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
140140
def pipeline():
141141
@task
@@ -151,7 +151,8 @@ def tg(param):
151151
t1(param)
152152

153153
with pytest.raises(
154-
ValueError, match="Tasks in a mapped task group cannot have trigger_rule set to 'ALWAYS'"
154+
ValueError,
155+
match="Task-generated mapping within a mapped task group is not allowed with trigger rule 'always'",
155156
):
156157
tg.expand(param=get_param())
157158

0 commit comments

Comments
 (0)