airflow: Scheduler "deadlocks" itself when max_active_runs_per_dag is reached by up_for_retry tasks
Apache Airflow version: 2.0.1
What happened:
Let’s say we have max_active_runs_per_dag = 2
in config. Now we manually trigger, for example, 10 DAG runs for some specific DAG. In the DAG there are some tasks, that should be retried on fail with some interval.
The issue is when at least 2 DAG runs have tasks inside that are failed, moved to up_for_retry state, and waiting to be rescheduled again, the scheduler will not reschedule them at all. In stdout it keeps saying that DAG <dag_name> already has 2 active DAG runs, not queuing any tasks for run <execution_date>
. Even DAG runs inside other DAGs stop to run
Executor: CeleryExecutor
What you expected to happen:
I expected that up_for_retry tasks would be rescheduled when they reached their retry interval
How to reproduce it:
Just follow the instructions above. Set max_active_runs_per_dag = 2
, create a DAG with PythonOperator with the function inside that fails, set retry_delay to something like 1 minute, trigger manually 2 DAG runs, and verify that task wouldn’t be rescheduled on delay
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 9
- Comments: 27 (16 by maintainers)
Commits related to this issue
- Fix issue #14205 — committed to santiment/airflow by tzanko-matev 3 years ago
I think the issue I’m experiencing is related to this.
Apache Airflow version: 2.0.1 Executor: LocalExecutor
What happened: I have
max_active
set to 4, and when running a backfill for this dag, if 4 sensor tasks get set forup_for_reschedule
at the same time, the backfill exits telling me that all the tasks downstream for these sensors are deadlocked.I have made a PR related to this issue, see https://github.com/apache/airflow/pull/17945
What happens is that the method
DagRun.next_dagruns_to_examine
gets the earliest dagruns without considering the dag that has the dagrun. For example: If you have a dag with execution_date 2020,1,1 and set catchup=True, max_active_runs=1, schedule_interval=‘@daily’ and another dag with execution_date 2021,1,1 and also set catchup=True, schedule_interval=‘@daily’. When you unpause the two dags(the one with max_active_runs first), the dagruns would be created but only one dagrun would be active because of howDagRun.next_dagruns_to_examine
works. I’m hopeful my PR would resolve this issue but I’m worried about performance. Please take a look: https://github.com/apache/airflow/pull/17945 @uranusjr @kaxil @ashHere is what I believe is causing the bug: https://github.com/apache/airflow/blob/88199eefccb4c805f8d6527bab5bf600b397c35e/airflow/jobs/scheduler_job.py#L1765
Notice that the dagrun state is not updated before
return 0
. This means that on the next iteration of the scheduler the same dagrun will be tried for processing. If there are enough dagruns which satisfy this condition then the scheduler will never try to process other dagruns and will deadlock.The code which makes the scheduler go forward is found in
dag_run.update_state
: https://github.com/apache/airflow/blob/88199eefccb4c805f8d6527bab5bf600b397c35e/airflow/models/dagrun.py#L384Updating
last_scheduling_decision
is what makes the scheduler chose a different set of dagruns on the next iteration. To fix the bug this update needs to be made before thereturn 0
line in the first code snippet.In the current main branch the scheduling logic is different, so the bug might have disappeared. The bug exists in all versions up to 2.1.2.
The symptom I’m seeing is that whenever any single DAG hits the
max_active_runs_per_dag
limit, the scheduler log will be full of lines like this. At this point, the scheduler basically stops scheduling any task for any dags. The fact that one dag hitting this limit causes all other dags to be halted is pretty bad. Our workaround at this moment is to makemax_active_runs_per_dag
a big number that is unlikely going to be hit.+1 for us on this issue as well, I think? Very strangely, we see the most recent run for a DAG have its run be set to ‘running’, but the only task in the DAG be a clear success:
This is a catchup=False DAG, where the only task runs in a pool, and there is nothing in the Scheduler log for this DAG for two hours (the DAG runs is supposed to run every 5 minutes) about why it can’t schedule this DAG. No “max active runs reached”, no “no slots available in pool”, nothing. It’s like the scheduler forgot this DAG existed until we rebooted it.
edit This has happened again, here are the relevant log lines (cherry-picked via grep):
So the task succeeded at 17:44, but the dagrun wasn’t set to success until 16 minutes later?
I am hoping this is solved by https://github.com/apache/airflow/pull/17945 which will be released in 2.1.4 (should be released tomorrow). We can reopen if this is still an issue
After a few hours spelunking through the codebase, we came across
max_dagruns_per_loop_to_schedule
inscheduler_job.py
. We have several thousand DAGs, all between 1-3 tasks (80% are probably 1 task) and this was a massive improvement for us in setting it to 200 (10x the default value of 20).@zachliu Currently the fix is to clear
up_for_retry
tasks or run them manually from UI when this occurs. Restarting scheduler doesn’t help, yesOf course, I can spend some time creating some kind of logic, that constantly makes requests to Airflow API to get the number of active DAG runs and trigger new ones only when there are less than 16 of them. But this would look like I’m trying to create some kind of my own scheduler on top of Airflow scheduler, just to get around this bug 😃
I think that scheduler should process any kind of DAG runs properly, even if there are more of them than
max_active_runs
. And as I said above, it worked properly in earlier versions@zachliu this is an important thing for me, because in production setup we use Airflow sometimes to run lot’s of “jobs” (DAG runs) with different configs. For example we sometimes need to run 7000 dag runs with different parameters in config, so these DAG runs are created using some kind of code that makes requests to Airflow HTTP API and triggers these runs. The tasks inside DAG runs use another external APIs that are unreliable, so the ability to retry tasks with some interval is a great feature for us
In 1.10.12 there was no issue, scheduler picks 16 runs with most recent execution dates, runs them, and when they are finished it picks another ones, and so on until there is no more dag runs. If something failed, it will just retry let’s say in 10 min and will be finished
Airflow 2.0 brings great speed up in this process because of new micro schedulers inside workers, which is awesome and now there is a possibility to greatly speed up this process, which is why we upgraded. But this issue makes everything broken and I need to give a look sometimes to Airflow and perform some manual steps to exit deadlock if it occurs 😦