airflow: DAG Run fails when chaining multiple empty mapped tasks
Apache Airflow version
2.3.3 (latest released)
What happened
On Kubernetes Executor and Local Executor (others not tested) a significant fraction of the DAG Runs of a DAG that has two consecutive mapped tasks which are are being passed an empty list are marked as failed when all tasks are either succeeding or being skipped.
What you think should happen instead
The DAG Run should be marked success.
How to reproduce
Run the following DAG on Kubernetes Executor or Local Executor.
The real world version of this DAG has several mapped tasks that all point to the same list, and that list is frequently empty. I have made a minimal reproducible example.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="break_mapping", start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def say_hi():
print("Hi")
added_values = add_one.expand(x=[])
added_more_values = add_one.expand(x=[])
say_hi() >> added_values
added_values >> added_more_values
Operating System
Debian Bullseye
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==1!4.0.0
apache-airflow-providers-cncf-kubernetes==1!4.1.0
apache-airflow-providers-elasticsearch==1!4.0.0
apache-airflow-providers-ftp==1!3.0.0
apache-airflow-providers-google==1!8.1.0
apache-airflow-providers-http==1!3.0.0
apache-airflow-providers-imap==1!3.0.0
apache-airflow-providers-microsoft-azure==1!4.0.0
apache-airflow-providers-mysql==1!3.0.0
apache-airflow-providers-postgres==1!5.0.0
apache-airflow-providers-redis==1!3.0.0
apache-airflow-providers-slack==1!5.0.0
apache-airflow-providers-sqlite==1!3.0.0
apache-airflow-providers-ssh==1!3.0.0
Deployment
Astronomer
Deployment details
Local was tested on docker compose (from astro-cli)
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 6
- Comments: 22 (19 by maintainers)
For anyone else experiencing this, there is a workaround to put a sleep between your two sets of mapped tasks.
Hmm, I wonder if this has something to do with the fact that a task mapped against an empty list produces one single task instance with state REMOVED.
@frankcash My workaround specifically needs an any operator downstream of a mapped task (that might get skipped), so In your example:
added_values >> added_more_values >> sleep_task
I am using PostgreSQL 14. No issues with version 2.3.2, but beyond that the scheduler eventually fails due to primary key constraints.
FYI - ran into the same issue on Airflow 2.3.4. I had to revert back to 2.3.2 for a stable experience for now, and I had to delete one row from my task instance table (a UPDATE was trying to change map_index to 0 which already existed, apparently, so the primary constraint was violated and the scheduler crashed).
In my case, none of my dynamic tasks happen to have empty lists though - there are values.