airflow: DAG Run fails when chaining multiple empty mapped tasks

Apache Airflow version

2.3.3 (latest released)

What happened

On Kubernetes Executor and Local Executor (others not tested) a significant fraction of the DAG Runs of a DAG that has two consecutive mapped tasks which are are being passed an empty list are marked as failed when all tasks are either succeeding or being skipped.

image

What you think should happen instead

The DAG Run should be marked success.

How to reproduce

Run the following DAG on Kubernetes Executor or Local Executor.

The real world version of this DAG has several mapped tasks that all point to the same list, and that list is frequently empty. I have made a minimal reproducible example.

from datetime import datetime

from airflow import DAG
from airflow.decorators import task


with DAG(dag_id="break_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def say_hi():
        print("Hi")


    added_values = add_one.expand(x=[])
    added_more_values = add_one.expand(x=[])
    say_hi() >> added_values
    added_values >> added_more_values

Operating System

Debian Bullseye

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==1!4.0.0
apache-airflow-providers-cncf-kubernetes==1!4.1.0
apache-airflow-providers-elasticsearch==1!4.0.0
apache-airflow-providers-ftp==1!3.0.0
apache-airflow-providers-google==1!8.1.0
apache-airflow-providers-http==1!3.0.0
apache-airflow-providers-imap==1!3.0.0
apache-airflow-providers-microsoft-azure==1!4.0.0
apache-airflow-providers-mysql==1!3.0.0
apache-airflow-providers-postgres==1!5.0.0
apache-airflow-providers-redis==1!3.0.0
apache-airflow-providers-slack==1!5.0.0
apache-airflow-providers-sqlite==1!3.0.0
apache-airflow-providers-ssh==1!3.0.0

Deployment

Astronomer

Deployment details

Local was tested on docker compose (from astro-cli)

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 6
  • Comments: 22 (19 by maintainers)

Most upvoted comments

For anyone else experiencing this, there is a workaround to put a sleep between your two sets of mapped tasks.

from airflow import DAG
from airflow.decorators import task

with DAG(dag_id="break_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def say_hi():
        print("Hi")

    @task(trigger_rule="all_done")
    def sleep_task():
        sleep(5)

    added_values = add_one.expand(x=[])
    added_more_values = add_one.expand(x=[])
    say_hi() >> added_values
    added_values >> sleep_task() >> added_more_values

Hmm, I wonder if this has something to do with the fact that a task mapped against an empty list produces one single task instance with state REMOVED.

@frankcash My workaround specifically needs an any operator downstream of a mapped task (that might get skipped), so In your example: added_values >> added_more_values >> sleep_task

I am using PostgreSQL 14. No issues with version 2.3.2, but beyond that the scheduler eventually fails due to primary key constraints.

FYI - ran into the same issue on Airflow 2.3.4. I had to revert back to 2.3.2 for a stable experience for now, and I had to delete one row from my task instance table (a UPDATE was trying to change map_index to 0 which already existed, apparently, so the primary constraint was violated and the scheduler crashed).

In my case, none of my dynamic tasks happen to have empty lists though - there are values.