airflow: Backfill crashes with "KeyError: TaskInstanceKey" when task has retries

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version): No Kubernetes

Environment: Docker python environment (3.8)

  • OS (e.g. from /etc/os-release):“Ubuntu 20.04.1 LTS”
  • Kernel (e.g. uname -a): Linux b494b1048cf4 5.4.39-linuxkit #1 SMP Fri May 8 23:03:06 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

What happened: Backfill command crashes with this stack error:

Traceback (most recent call last):
  File "/opt/conda/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py", line 103, in dag_backfill
    dag.run(
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dag.py", line 1701, in run
    job.run()
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 799, in _execute
    self._execute_for_run_dates(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 722, in _execute_for_run_dates
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances
    self._update_counters(ti_status=ti_status)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 211, in _update_counters
    ti_status.running.pop(key)
KeyError: TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2)

From the webserver, it looks like after the second try the task actually finished successfully (the first time there was a network error. Just before the error I also see this warning: WARNING - TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2) state success not in running=dict_values([<TaskInstance: dag_id.task_name 2020-12-15 00:00:00+00:00 [queued]>])

This happens whenever a task has to retry. The subsequent commands are not run and the backfill command has to be re-run to continue.
What you expected to happen: The backfill command to continue to the next step.

How to reproduce it: Not sure. Create a DAG with a future start date with a task that fails on the first try but succeeds in the second, keep it turned off, and run a backfill command with a single past date. Command that was used: airflow dags backfill dag_id -s 2020-12-15 -e 2020-12-15

Anything else we need to know:


default_args = {
        'owner':            'owner',
        'depends_on_past':  False,
        'email':            ['email@address.com'],
        'email_on_failure': False,
        'email_on_retry':   False,
        'retries':          3,
        'retry_delay':      timedelta(minutes=5),
        'concurrency':      4
        }
dag_id = DAG(
        dag_id='dag_id',
        default_args=default_args,
        description='Some Description',
        start_date=datetime(2021, 1, 1),
        schedule_interval=timedelta(weeks=1), catchup=True,
        template_searchpath=templates_searchpath,
        )

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 2
  • Comments: 19 (10 by maintainers)

Commits related to this issue

Most upvoted comments

@potiuk Sorry, accidentally hit enter before entering all the info (with no way to delete?). In any case, added the details

It happened in airflow 2.1.0 also, and there is a bug in backfill logic. I have fixed it in my env and I will submit a PR for it later when idle…