airflow: Dynamic tasks marked as `upstream_failed` when none of their upstream tasks are `failed` or `upstream_failed`
Apache Airflow version
2.4.2
What happened
There is a mapped task is getting marked as upstream_failed
when none of its upstream tasks are failed
or upstream_failed
.
In the above graph view, if first_task
finishes before second_task
, first_task
immediately tries to expand middle_task
. Note - this is an important step to reproduce - The order the tasks finish matter.
Note that the value of the Airflow configuration variable schedule_after_task_execution
must be True
(the default) for this to occur.
The expansion occurs when the Task supervisor performs the “mini scheduler”, in this line in dagrun.py
.
Which then marks middle_task
as upstream_failed
in this line in mappedoperator.py
:
# If the map length cannot be calculated (due to unavailable
# upstream sources), fail the unmapped task.
I believe this was introduced by the PR Fail task if mapping upstream fails.
What you think should happen instead
The dynamic tasks should successfully execute. I don’t think the mapped task should expand because its upstream task hasn’t completed at the time it’s expanded. If the upstream task were to complete earlier, it would expand successfully.
How to reproduce
Execute this DAG, making sure Airflow configuration schedule_after_task_execution
is set to default value True
.
from datetime import datetime, timedelta
import time
from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator
class PrintIdOperator(PythonOperator):
def __init__(self, id, **kwargs) -> None:
super().__init__(**kwargs)
self.op_kwargs["id"] = id
DAG_ID = "test_upstream_failed_on_mapped_operator_expansion"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retry_delay": timedelta(minutes=1),
"retries": 0,
}
def nop(id):
print(f"{id=}")
def get_ids(delay: int = 0):
print(f"Delaying {delay} seconds...")
time.sleep(delay)
print("Done!")
return [0, 1, 2]
with DAG(
dag_id=DAG_ID,
default_args=default_args,
start_date=datetime(2022, 8, 3),
catchup=False,
schedule=None,
max_active_runs=1,
) as dag:
second_task = PythonOperator(
task_id="second_task",
python_callable=get_ids,
op_kwargs={"delay": 10}
)
first_task = PythonOperator(
task_id="first_task",
python_callable=get_ids,
)
middle_task = PrintIdOperator.partial(
task_id="middle_task",
python_callable=nop,
).expand(id=XComArg(second_task))
last_task = PythonOperator(
task_id="last_task",
python_callable=nop,
op_kwargs={"id": 1},
)
[first_task, middle_task] >> last_task
Operating System
debian buster
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
No response
Anything else
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 1
- Comments: 19 (14 by maintainers)
Reproduced with LocalExecutor!
@eladkal - I corrected my test case with
op_kwargs
.Do you have
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION
set toTrue
orFalse
? The default valueTrue
causes this error, but setting it toFalse
prevents the mini scheduler from forcing an expansion too early, and the DAG succeeds.