airflow: Scheduler "deadlocks" itself when max_active_runs_per_dag is reached by up_for_retry tasks

Apache Airflow version: 2.0.1

What happened:

Let’s say we have max_active_runs_per_dag = 2 in config. Now we manually trigger, for example, 10 DAG runs for some specific DAG. In the DAG there are some tasks, that should be retried on fail with some interval.

The issue is when at least 2 DAG runs have tasks inside that are failed, moved to up_for_retry state, and waiting to be rescheduled again, the scheduler will not reschedule them at all. In stdout it keeps saying that DAG <dag_name> already has 2 active DAG runs, not queuing any tasks for run <execution_date>. Even DAG runs inside other DAGs stop to run

Executor: CeleryExecutor

What you expected to happen:

I expected that up_for_retry tasks would be rescheduled when they reached their retry interval

How to reproduce it:

Just follow the instructions above. Set max_active_runs_per_dag = 2, create a DAG with PythonOperator with the function inside that fails, set retry_delay to something like 1 minute, trigger manually 2 DAG runs, and verify that task wouldn’t be rescheduled on delay

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 9
  • Comments: 27 (16 by maintainers)

Commits related to this issue

Most upvoted comments

I think the issue I’m experiencing is related to this.

Apache Airflow version: 2.0.1 Executor: LocalExecutor

What happened: I have max_active set to 4, and when running a backfill for this dag, if 4 sensor tasks get set for up_for_reschedule at the same time, the backfill exits telling me that all the tasks downstream for these sensors are deadlocked.

I have made a PR related to this issue, see https://github.com/apache/airflow/pull/17945

What happens is that the method DagRun.next_dagruns_to_examine gets the earliest dagruns without considering the dag that has the dagrun. For example: If you have a dag with execution_date 2020,1,1 and set catchup=True, max_active_runs=1, schedule_interval=‘@daily’ and another dag with execution_date 2021,1,1 and also set catchup=True, schedule_interval=‘@daily’. When you unpause the two dags(the one with max_active_runs first), the dagruns would be created but only one dagrun would be active because of how DagRun.next_dagruns_to_examine works. I’m hopeful my PR would resolve this issue but I’m worried about performance. Please take a look: https://github.com/apache/airflow/pull/17945 @uranusjr @kaxil @ash

Here is what I believe is causing the bug: https://github.com/apache/airflow/blob/88199eefccb4c805f8d6527bab5bf600b397c35e/airflow/jobs/scheduler_job.py#L1765

        if dag.max_active_runs:
            if (
                len(currently_active_runs) >= dag.max_active_runs
                and dag_run.execution_date not in currently_active_runs
            ):
                self.log.info(
                    "DAG %s already has %d active runs, not queuing any tasks for run %s",
                    dag.dag_id,
                    len(currently_active_runs),
                    dag_run.execution_date,
                )
                return 0

Notice that the dagrun state is not updated before return 0. This means that on the next iteration of the scheduler the same dagrun will be tried for processing. If there are enough dagruns which satisfy this condition then the scheduler will never try to process other dagruns and will deadlock.

The code which makes the scheduler go forward is found in dag_run.update_state: https://github.com/apache/airflow/blob/88199eefccb4c805f8d6527bab5bf600b397c35e/airflow/models/dagrun.py#L384

    @provide_session
    def update_state(
        self, session: Session = None, execute_callbacks: bool = True
    ) -> Tuple[List[TI], Optional[callback_requests.DagCallbackRequest]]:
        
        #..........
        start_dttm = timezone.utcnow()
        self.last_scheduling_decision = start_dttm
        # ........
        session.merge(self)

        return schedulable_tis, callback

Updating last_scheduling_decision is what makes the scheduler chose a different set of dagruns on the next iteration. To fix the bug this update needs to be made before the return 0 line in the first code snippet.

In the current main branch the scheduling logic is different, so the bug might have disappeared. The bug exists in all versions up to 2.1.2.

The symptom I’m seeing is that whenever any single DAG hits the max_active_runs_per_dag limit, the scheduler log will be full of lines like this. At this point, the scheduler basically stops scheduling any task for any dags. The fact that one dag hitting this limit causes all other dags to be halted is pretty bad. Our workaround at this moment is to make max_active_runs_per_dag a big number that is unlikely going to be hit.

DAG ... already has .. active runs, not queuing any tasks

+1 for us on this issue as well, I think? Very strangely, we see the most recent run for a DAG have its run be set to ‘running’, but the only task in the DAG be a clear success:

image

This is a catchup=False DAG, where the only task runs in a pool, and there is nothing in the Scheduler log for this DAG for two hours (the DAG runs is supposed to run every 5 minutes) about why it can’t schedule this DAG. No “max active runs reached”, no “no slots available in pool”, nothing. It’s like the scheduler forgot this DAG existed until we rebooted it.

edit This has happened again, here are the relevant log lines (cherry-picked via grep):

[2021-04-16 17:44:01,418] {{base_executor.py:82}} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'mls_ivbor_smart_incremental_v1', 'streaming-importer', '2021-04-16T17:30:00+00:00', '--local', '--pool', 'ivbor', '--subdir', '/efs/airflow/dags/mls_incrementals_i.py']
[2021-04-16 17:44:04,638] {{scheduler_job.py:1206}} INFO - Executor reports execution of mls_ivbor_smart_incremental_v1.streaming-importer execution_date=2021-04-16 17:30:00+00:00 exited with status queued for try_number 1
[2021-04-16 17:44:04,661] {{scheduler_job.py:1226}} INFO - Setting external_id for <TaskInstance: mls_ivbor_smart_incremental_v1.streaming-importer 2021-04-16 17:30:00+00:00 [queued]> to 2c9ee22a-ad2b-4255-846a-85896fa517ed
[2021-04-16 17:44:43,326] {{scheduler_job.py:1206}} INFO - Executor reports execution of mls_ivbor_smart_incremental_v1.streaming-importer execution_date=2021-04-16 17:30:00+00:00 exited with status success for try_number 1
[2021-04-16 18:00:18,075] {{dagrun.py:445}} INFO - Marking run <DagRun mls_ivbor_smart_incremental_v1 @ 2021-04-16 17:30:00+00:00: scheduled__2021-04-16T17:30:00+00:00, externally triggered: False> successful

So the task succeeded at 17:44, but the dagrun wasn’t set to success until 16 minutes later?

I am hoping this is solved by https://github.com/apache/airflow/pull/17945 which will be released in 2.1.4 (should be released tomorrow). We can reopen if this is still an issue

After a few hours spelunking through the codebase, we came across max_dagruns_per_loop_to_schedule in scheduler_job.py. We have several thousand DAGs, all between 1-3 tasks (80% are probably 1 task) and this was a massive improvement for us in setting it to 200 (10x the default value of 20).

@zachliu Currently the fix is to clear up_for_retry tasks or run them manually from UI when this occurs. Restarting scheduler doesn’t help, yes

Of course, I can spend some time creating some kind of logic, that constantly makes requests to Airflow API to get the number of active DAG runs and trigger new ones only when there are less than 16 of them. But this would look like I’m trying to create some kind of my own scheduler on top of Airflow scheduler, just to get around this bug 😃

I think that scheduler should process any kind of DAG runs properly, even if there are more of them than max_active_runs. And as I said above, it worked properly in earlier versions

@zachliu this is an important thing for me, because in production setup we use Airflow sometimes to run lot’s of “jobs” (DAG runs) with different configs. For example we sometimes need to run 7000 dag runs with different parameters in config, so these DAG runs are created using some kind of code that makes requests to Airflow HTTP API and triggers these runs. The tasks inside DAG runs use another external APIs that are unreliable, so the ability to retry tasks with some interval is a great feature for us

In 1.10.12 there was no issue, scheduler picks 16 runs with most recent execution dates, runs them, and when they are finished it picks another ones, and so on until there is no more dag runs. If something failed, it will just retry let’s say in 10 min and will be finished

Airflow 2.0 brings great speed up in this process because of new micro schedulers inside workers, which is awesome and now there is a possibility to greatly speed up this process, which is why we upgraded. But this issue makes everything broken and I need to give a look sometimes to Airflow and perform some manual steps to exit deadlock if it occurs 😦