airflow: Dynamic tasks marked as `upstream_failed` when none of their upstream tasks are `failed` or `upstream_failed`

Apache Airflow version

2.4.2

What happened

There is a mapped task is getting marked as upstream_failed when none of its upstream tasks are failed or upstream_failed.

image

In the above graph view, if first_task finishes before second_task, first_task immediately tries to expand middle_task. Note - this is an important step to reproduce - The order the tasks finish matter.

Note that the value of the Airflow configuration variable schedule_after_task_execution must be True (the default) for this to occur.

The expansion occurs when the Task supervisor performs the “mini scheduler”, in this line in dagrun.py.

Which then marks middle_task as upstream_failed in this line in mappedoperator.py:

                # If the map length cannot be calculated (due to unavailable
                # upstream sources), fail the unmapped task.

I believe this was introduced by the PR Fail task if mapping upstream fails.

What you think should happen instead

The dynamic tasks should successfully execute. I don’t think the mapped task should expand because its upstream task hasn’t completed at the time it’s expanded. If the upstream task were to complete earlier, it would expand successfully.

How to reproduce

Execute this DAG, making sure Airflow configuration schedule_after_task_execution is set to default value True.

from datetime import datetime, timedelta
import time

from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator


class PrintIdOperator(PythonOperator):
    def __init__(self, id, **kwargs) -> None:
        super().__init__(**kwargs)
        self.op_kwargs["id"] = id


DAG_ID = "test_upstream_failed_on_mapped_operator_expansion"


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=1),
    "retries": 0,
}


def nop(id):
    print(f"{id=}")


def get_ids(delay: int = 0):
    print(f"Delaying {delay} seconds...")
    time.sleep(delay)
    print("Done!")
    return [0, 1, 2]


with DAG(
    dag_id=DAG_ID,
    default_args=default_args,
    start_date=datetime(2022, 8, 3),
    catchup=False,
    schedule=None,
    max_active_runs=1,
) as dag:

    second_task = PythonOperator(
        task_id="second_task",
        python_callable=get_ids,
        op_kwargs={"delay": 10}
    )

    first_task = PythonOperator(
        task_id="first_task",
        python_callable=get_ids,
    )

    middle_task = PrintIdOperator.partial(
        task_id="middle_task",
        python_callable=nop,
    ).expand(id=XComArg(second_task))

    last_task = PythonOperator(
        task_id="last_task",
        python_callable=nop,
        op_kwargs={"id": 1},
    )

    [first_task, middle_task] >> last_task

Operating System

debian buster

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 19 (14 by maintainers)

Most upvoted comments

A

@eladkal - I corrected my test case with op_kwargs. Do you have AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION set to True or False? The default value True causes this error, but setting it to False prevents the mini scheduler from forcing an expansion too early, and the DAG succeeds.

I used the default for AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION which is True. What database are you using? Did you set any other environment variable other than the above? I can’t reproduce it running your updated dag

However, I see that in my case, the second task starts before the first task and they finish together

Reproduced with LocalExecutor!

@eladkal - I corrected my test case with op_kwargs.

Do you have AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION set to True or False? The default value True causes this error, but setting it to False prevents the mini scheduler from forcing an expansion too early, and the DAG succeeds.