airflow: TaskGroup does not support dynamically generated tasks

Apache Airflow version: 2.0 / master

Environment:

breeze

What happened:

Using this DAG:

from airflow.operators.bash import BashOperator
from airflow.operators.python import task
from airflow.models import DAG
from airflow.utils.task_group import TaskGroup

@task
def show():
    print("Cats are awesome!")

with DAG(
    "using_task_group",
    default_args={'owner': 'airflow'},
    start_date=days_ago(2),
    schedule_interval=None,
) as dag3:
    start_task = BashOperator(
        task_id="start_task",
        bash_command="echo start",
    )

    end_task = BashOperator(
        task_id="end_task",
        bash_command="echo end",
    )

    with TaskGroup(group_id="show_tasks") as tg1:
        previous_show = show()
        for _ in range(100):
            next_show = show()
            previous_show >> next_show
            previous_show = next_show

I get:

Broken DAG: [/files/dags/test.py] Traceback (most recent call last):
  File "/opt/airflow/airflow/models/baseoperator.py", line 410, in __init__
    task_group.add(self)
  File "/opt/airflow/airflow/utils/task_group.py", line 140, in add
    raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'show_tasks.show' has already been added to the DAG

If I remove the task group the task are generated as expected.

What you expected to happen:

I expect to be able to generate tasks dynamically using TaskGroup and task decoratos.

How to reproduce it:

Use the DAG from above.

Anything else we need to know:

N/A

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 3
  • Comments: 15 (10 by maintainers)

Commits related to this issue

Most upvoted comments

@ssiddiqui-apixio I’m having the same problem on Airflow 2.0.2. Any chance of the Issue being reopened for a new solution?

I’m still having the problem on Airflow 2.1.2

Broken DAG: [/usr/local/airflow/dags/dags/auto/bronze/xywyxw_full_bronze_dag.py] Traceback (most recent call last): File “/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py”, line 524, in init task_group.add(self) File “/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/task_group.py”, line 158, in add raise DuplicateTaskIdFound(f"Task id ‘{key}’ has already been added to the DAG") airflow.exceptions.DuplicateTaskIdFound: Task id ‘xywyxw_wwww_full_bronze-20220217-104624’ has already been added to the DAG

Can you please reopen this issue to track the problem to a new solution ?

Looking at @turbaszek code, that should address this issue

Not sure how complex it is and whether we can manage it for beta3/4/5 @yuqian90 @casassg ? ButI marked it for beta4 now.