airflow: TaskGroup does not support dynamically generated tasks
Apache Airflow version: 2.0 / master
Environment:
breeze
What happened:
Using this DAG:
from airflow.operators.bash import BashOperator
from airflow.operators.python import task
from airflow.models import DAG
from airflow.utils.task_group import TaskGroup
@task
def show():
print("Cats are awesome!")
with DAG(
"using_task_group",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
) as dag3:
start_task = BashOperator(
task_id="start_task",
bash_command="echo start",
)
end_task = BashOperator(
task_id="end_task",
bash_command="echo end",
)
with TaskGroup(group_id="show_tasks") as tg1:
previous_show = show()
for _ in range(100):
next_show = show()
previous_show >> next_show
previous_show = next_show
I get:
Broken DAG: [/files/dags/test.py] Traceback (most recent call last):
File "/opt/airflow/airflow/models/baseoperator.py", line 410, in __init__
task_group.add(self)
File "/opt/airflow/airflow/utils/task_group.py", line 140, in add
raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'show_tasks.show' has already been added to the DAG
If I remove the task group the task are generated as expected.
What you expected to happen:
I expect to be able to generate tasks dynamically using TaskGroup and task decoratos.
How to reproduce it:
Use the DAG from above.
Anything else we need to know:
N/A
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 3
- Comments: 15 (10 by maintainers)
Commits related to this issue
- Check for TaskGroup in _PythonDecoratedOperator Crucial feature of functions decorated by @task is to be able to invoke them multiple times in single DAG. To do this we are generating custom task_id ... — committed to PolideaInternal/airflow by turbaszek 4 years ago
- Check for TaskGroup in _PythonDecoratedOperator (#12312) Crucial feature of functions decorated by @task is to be able to invoke them multiple times in single DAG. To do this we are generating cust... — committed to apache/airflow by turbaszek 4 years ago
@ssiddiqui-apixio I’m having the same problem on Airflow 2.0.2. Any chance of the Issue being reopened for a new solution?
I’m still having the problem on Airflow 2.1.2
Broken DAG: [/usr/local/airflow/dags/dags/auto/bronze/xywyxw_full_bronze_dag.py] Traceback (most recent call last): File “/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py”, line 524, in init task_group.add(self) File “/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/task_group.py”, line 158, in add raise DuplicateTaskIdFound(f"Task id ‘{key}’ has already been added to the DAG") airflow.exceptions.DuplicateTaskIdFound: Task id ‘xywyxw_wwww_full_bronze-20220217-104624’ has already been added to the DAG
Can you please reopen this issue to track the problem to a new solution ?
Looking at @turbaszek code, that should address this issue
Not sure how complex it is and whether we can manage it for beta3/4/5 @yuqian90 @casassg ? ButI marked it for beta4 now.