airflow: Tasks stuck in queued state
Apache Airflow version
2.2.3 (latest released)
What happened
Tasks are stuck in the queued state and will not be scheduled for execution. In the logs, these tasks have a message of could not queue task <task details>
, as they are currently in the queued
or running
sets in the executor.
What you expected to happen
Tasks run 😃
How to reproduce
We have a dockerized airflow setup, using celery with a rabbit broker & postgres as the result db. When rolling out DAG updates, we redeploy most of the components (the workers, scheduler, webserver, and rabbit). We can have a few thousand Dagruns at a given time. This error seems to happen during a load spike when a deployment happens.
Looking at the code, this is what I believe is happening:
Starting from the initial debug message of could not queue task
I found tasks are marked as running (but in the UI they still appear as queued): https://github.com/apache/airflow/blob/main/airflow/executors/base_executor.py#L85
Tracking through our logs, I see these tasks are recovered by the adoption code, and the state there is STARTED (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L540).
Following the state update code, I see this does not cause any state updates to occur in Airflow (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L465). Thus, if a task is marked as STARTED in the results db, but queued in the airflow task state, it will never be transferred out by the scheduler. However ,you can get these tasks to finally run by clicking the run
button.
Operating System
Ubuntu 20.04.3 LTS
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else
I believe this could be addressed by asking celery if a task is actually running in try_adopt_task_instances
. There are obvious problems like timeouts, but celery inspect active|reserved
can return a json output of running and reserved tasks to verify a STARTED task is actually running
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 21
- Comments: 50 (26 by maintainers)
Commits related to this issue
- port scheduler related fix from https://github.com/apache/airflow/issues/21225 — committed to gurmeetsaran/airflow by gurmeetsaran 7 months ago
This is not ideal, but for those who want to kick all their queued tasks to running, here’s a snippet i’ve been using:
Hi guys. I have the same issue. I’m using KubernetesExecutor (on Azure), airflow version 2.4.2 (helm chart 1.7.0), kubernetes 1.24.6
Anyone knows a workaround for KubernetesExecutor?
This issue is causing a lot of troubles
It’s been released BTW 😃
I had similar problem, it was happening, because KubernetesExecutor is picking CeleryExecutor tasks on CeleryKubernetesExecutor
Celery is changing task state to queued and KubernetesExecutor to scheduled, it is happening over and over again(depends how fast task gets to running state)
I fixed it by adding additional filter on task queue(which defaults to ‘kubernetes’ ) for KubernetesExecutor in couple of places, below is probably the one that is causing most of the problems: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L455
It is taking all of the tasks in queued state, but it should take only those that should run on Kubernetes - has queue equaled to: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/config_templates/default_airflow.cfg#L743
That is why there is log entry: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/base_executor.py#L85
Because celery gets the same task that is already inside queued_tasks
If this is the case then you will see following messages in the logs as well: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L494
did you figure out bro? I am stuck with this!
https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution in my case, I use kubernetes executor and dynamic DAG, I found it is stuck in DAG import in each task, I followed the example suggested by the official documentation, skipped other DAG scans, and finally solved the problem