airflow: Tasks stuck in queued state

Apache Airflow version

2.2.3 (latest released)

What happened

Tasks are stuck in the queued state and will not be scheduled for execution. In the logs, these tasks have a message of could not queue task <task details>, as they are currently in the queued or running sets in the executor.

What you expected to happen

Tasks run 😃

How to reproduce

We have a dockerized airflow setup, using celery with a rabbit broker & postgres as the result db. When rolling out DAG updates, we redeploy most of the components (the workers, scheduler, webserver, and rabbit). We can have a few thousand Dagruns at a given time. This error seems to happen during a load spike when a deployment happens.

Looking at the code, this is what I believe is happening:

Starting from the initial debug message of could not queue task I found tasks are marked as running (but in the UI they still appear as queued): https://github.com/apache/airflow/blob/main/airflow/executors/base_executor.py#L85

Tracking through our logs, I see these tasks are recovered by the adoption code, and the state there is STARTED (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L540).

Following the state update code, I see this does not cause any state updates to occur in Airflow (https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py#L465). Thus, if a task is marked as STARTED in the results db, but queued in the airflow task state, it will never be transferred out by the scheduler. However ,you can get these tasks to finally run by clicking the run button.

Operating System

Ubuntu 20.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

I believe this could be addressed by asking celery if a task is actually running in try_adopt_task_instances. There are obvious problems like timeouts, but celery inspect active|reserved can return a json output of running and reserved tasks to verify a STARTED task is actually running

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 21
  • Comments: 50 (26 by maintainers)

Commits related to this issue

Most upvoted comments

This is not ideal, but for those who want to kick all their queued tasks to running, here’s a snippet i’ve been using:

from airflow import models, settings
from airflow.executors.executor_loader import ExecutorLoader

session = settings.Session()
tis = session.query(models.TaskInstance).filter(models.TaskInstance.state=='queued')
dagbag = models.DagBag()
for ti in tis:
  dag = dagbag.get_dag(ti.dag_id)
  task = dag.get_task(ti.task_id)
  ti.refresh_from_task(task)
  executor = ExecutorLoader.get_default_executor()
  executor.job_id = "manual"
  executor.start()
  executor.queue_task_instance(ti, ignore_all_deps=False, ignore_task_deps=False, ignore_ti_state=False)
  executor.heartbeat()

Hi guys. I have the same issue. I’m using KubernetesExecutor (on Azure), airflow version 2.4.2 (helm chart 1.7.0), kubernetes 1.24.6

Anyone knows a workaround for KubernetesExecutor?

This issue is causing a lot of troubles

It’s been released BTW 😃

I had similar problem, it was happening, because KubernetesExecutor is picking CeleryExecutor tasks on CeleryKubernetesExecutor

Celery is changing task state to queued and KubernetesExecutor to scheduled, it is happening over and over again(depends how fast task gets to running state)

I fixed it by adding additional filter on task queue(which defaults to ‘kubernetes’ ) for KubernetesExecutor in couple of places, below is probably the one that is causing most of the problems: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L455

It is taking all of the tasks in queued state, but it should take only those that should run on Kubernetes - has queue equaled to: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/config_templates/default_airflow.cfg#L743

That is why there is log entry: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/base_executor.py#L85

Because celery gets the same task that is already inside queued_tasks

If this is the case then you will see following messages in the logs as well: https://github.com/apache/airflow/blob/5a6a2d604979cb70c5c9d3797738f0876dd38c3b/airflow/executors/kubernetes_executor.py#L494

Hi guys. I have the same issue. I’m using KubernetesExecutor (on Azure), airflow version 2.4.2 (helm chart 1.7.0), kubernetes 1.24.6

Anyone knows a workaround for KubernetesExecutor?

This issue is causing a lot of troubles

did you figure out bro? I am stuck with this!

https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution in my case, I use kubernetes executor and dynamic DAG, I found it is stuck in DAG import in each task, I followed the example suggested by the official documentation, skipped other DAG scans, and finally solved the problem