airflow: Task stuck in "scheduled" or "queued" state, pool has all slots queued, nothing is executing

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version):

Client Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.3", GitCommit:"1e11e4a2108024935ecfcb2912226cedeafd99df", GitTreeState:"clean", BuildDate:"2020-10-14T12:50:19Z", GoVersion:"go1.15.2", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"17+", GitVersion:"v1.17.14-gke.1600", GitCommit:"7c407f5cc8632f9af5a2657f220963aa7f1c46e7", GitTreeState:"clean", BuildDate:"2020-12-07T09:22:27Z", GoVersion:"go1.13.15b4", Compiler:"gc", Platform:"linux/amd64"}

Environment:

  • Cloud provider or hardware configuration: GKE
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:
    • Airflow metadata database is hooked up to a PostgreSQL instance

What happened:

  • Airflow 2.0.0 running on the KubernetesExecutor has many tasks stuck in “scheduled” or “queued” state which never get resolved.
  • The setup has a default_pool of 16 slots.
  • Currently no slots are used (see Screenshot), but all slots are queued.
  • No work is executed any more. The Executor or Scheduler is stuck.
  • There are many many tasks stuck in “scheduled” state
    • Tasks in “scheduled” state say ('Not scheduling since there are %s open slots in pool %s and require %s pool slots', 0, 'default_pool', 1) That is simply not true, because there is nothing running on the cluster and there are always 16 tasks stuck in “queued”.
  • There are many tasks stuck in “queued” state
    • Tasks in “queued” state say Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run. That is also not true. Nothing is running on the cluster and Airflow is likely just lying to itself. It seems the KubernetesExecutor and the scheduler easily go out of sync.

What you expected to happen:

  • Airflow should resolve scheduled or queued tasks by itself once the pool has available slots
  • Airflow should use all available slots in the pool
  • It should be possible to clear a couple hundred tasks and expect the system to stay consistent

How to reproduce it:

  • Vanilla Airflow 2.0.0 with KubernetesExecutor on Python 3.7.9

  • requirements.txt

    pyodbc==4.0.30
    pycryptodomex==3.9.9
    apache-airflow-providers-google==1.0.0
    apache-airflow-providers-odbc==1.0.0
    apache-airflow-providers-postgres==1.0.0
    apache-airflow-providers-cncf-kubernetes==1.0.0
    apache-airflow-providers-sftp==1.0.0
    apache-airflow-providers-ssh==1.0.0
    
  • The only reliable way to trigger that weird bug is to clear the task state of many tasks at once. (> 300 tasks)

Anything else we need to know:

Don’t know, as always I am happy to help debug this problem. The scheduler/executer seems to go out of sync and never back in sync again with the state of the world.

We actually planned to upscale our Airflow installation with many more simultaneous tasks. With these severe yet basic scheduling/queuing problems we cannot move forward at all.

Another strange, likely unrelated observation, the scheduler always uses 100% of the CPU. Burning it. Even with no scheduled or now queued tasks, its always very very busy.

Workaround:

The only workaround for this problem I could find so far, is to manually go in, find all tasks in “queued” state and clear them all at once. Without that, the whole cluster/Airflow just stays stuck like it is.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 4
  • Comments: 145 (67 by maintainers)

Most upvoted comments

This is happening with Celery executor as well. I’m using Airflow 2.0.0 with Celery executor and mysql, facing similar issue. Sorry for the basic question but I’m unable to figure-out the manual way to find all tasks in “queued” state and clearing them. Can somebody help here.

We were running into a similar issue as we have 100+ dags and around a 1000 tasks.

I figured out there is a bug in the celery_executor which I still want to fix myself and contribute.

Summary of that problem: At the start of the scheduler, the celery_executor class instance of the scheduler picks up everything from ‘dead’ schedulers (your previous run). That is (if you run one scheduler) every TaskInstance in the Running, Queued or Scheduled state. Then once it verified that this task is not running(takes 10 minutes), it clears most of the references but forgets a crucial one, making it such that the scheduler can NEVER start this task anymore. You can still start it via the webserver because that has its own celery_executor class instance.

What we noticed:

  • Many tasks were very slowly to be scheduled even though the workers were almost fully idle.
  • The TaskInstances were stuck on Queued or Scheduled.
  • Restarting the scheduler didn’t work.
  • Once restarted (with debug logging enabled) you’d get a logging line indicating you have negative open slots: [2021-06-14 14:07:31,932] {base_executor.py:152} DEBUG - -62 open slots

What you can do to verify whether you have the same issue:

  • Stop the scheduler
  • Clear all TaskInstances that are Queued or Scheduled
  • Start the scheduler

Our fix:

  • Increase the airflow.cfg parallelism -> from 32 to 1000. This is what could easily deadlock your scheduler after a restart. Because it uses this variable to see if it can launch any new task. If you had 50 tasks in Scheduled waiting, it will deadlock your entire scheduler.
  • Increase the default pool size (for a speedup) -> from 128 to 1000
  • For any task that the scheduler can’t run anymore. Do the procedure mentioned above or kick-start it yourself by clicking the task instance followed by “Ignore all deps”, “Ignore Task states”, “Ignore Task Deps” and finally “Run”.

Hope this helps anyone and saves you a couple days of debugging 😃

Back on this. I am currently observing the behaviour again.

I can confirm:

  • The solution description of @renanleme does not apply to my case. Definitely not.
  • Restarting the scheduler is a workaround to the problem

The issue persists with 2.0.1 please update the tag accordingly.

The issue is definitely “critical” as it halts THE ENTIRE airflow operation…!

I had the same problem today and I think I found the problem.

I’m testing with: Apache Airflow version: 2.0.1 Executor: Celery Running locally

I was testing one dag and after changing a few parameters in one of the tasks in the dag file and cleaning the tasks, the task got stuck on scheduled state. The problem: The changes that I made broke the task, something like this:

airflow-worker_1     | airflow.exceptions.AirflowException: Invalid arguments were passed to GoogleCloudStorageToBigQueryOperator (task_id: load_dag). Invalid arguments were:
airflow-worker_1     | **kwargs: {'gcp_conn_id': 'bigquery_default'}

So, the worker was refusing to execute because I was passing an invalid argument to the task. The problem is that the worker doesn’t notify (or update the task status to running) the scheduler/web that the file is wrong (no alert of a broken dag was being show in the Airflow home page).

After updating the task parameter and cleaning the task, it ran successfully.

Ps.: Probably is not the same problem that the OP is having but it’s related to task stuck on scheduled

@ephraimbuddy I found the root cause for my problem, and a way to reproduce it. Keep in mind my stack (Airflow + KubernetesExecutor), as this issue has been watered down by many different stacks and situations, ending with the same symptoms.

Steps to reproduce:

  • Create a DAG and schedule some work for it.
  • While work is scheduled, remove the DAG.
  • Restart the scheduler.
  • Now the DAG does no longer exist, but it still exists in the database. And its scheduled tasks also still exist.
  • The scheduler dutifully schedules work for the non-existent DAG (<- this is a problem)
  • The KubernetesExecutor spawns a new worker pod
  • The worker pod is awfully surprised that there is no DAG for the work he was tasked with
  • The worker pod commits suicide without telling anybody (<- this is a problem)
  • The scheduler faithfully keeps the task in “queued” state, although the worker is no more

Solution:

  • The scheduler should not schedule work for tasks that are no longer in the DagBag
  • The worker must fail properly (with its task ending in a “failed” state) when he cannot find the DAG + task he was tasked with

I will unsubscribe from this issue.

I have not encountered this issue again (Airflow 2.1.2). But the following circumstances made this issue pop up again:

  • An Airflow DAG/task is being scheduled (not queued yet)
  • The Airflow DAG code is being updated, but it contains an error, so that the scheduler cannot load the code and the task that starts up exits immediately
    • Now a rare condition takes place: A task is scheduled, but not yet executed.
    • The same task will boot a container
    • The same task will exit immediately, because the container loads the faulty code and crashes without bringing up the task at all.
    • No failure is recorded on the task.
  • Then the scheduler thinks the task is queued, but the task crashed immediately (using KubernetesExecutor)
    • This leads to queued slots filling up over time.
    • Once all queued slots of a pool (or the default pool) are filled with queued (but never executed, immediately crashing) tasks, the scheduler and the whole system gets stuck.

How do I prevent this issue?

I simply make sure the DAG code is 100% clean and loads both in the scheduler and the tasks that start up (using KubernetesExecutor).

How do I recover from this issue?

First, I fix the issue that prevents the DAG code from loading. Then I restart the scheduler.

@kaxil we’re still experiencing this issue in version 2.1.1 even after tuning parallelism and pool size to 1024

kill -USR2 <pid of scheduler> – how you get the pid depends upon how and where you are running it 😃

Likely exec in to the container, run ps auxww and find the oldest scheduler processs (you’ll see some sub processes, possibly named helpfully).

Thanks a lot @ashb. This command allowed our team to investigate the issue better and finally helped me figure the problem out. Expect a PR from me this week 😃

Airflow 2.0.2+e494306fb01f3a026e7e2832ca94902e96b526fa (MWAA on AWS)

This happens to us a LOT: a DAG will be running, task instances will be marked as “queued”, but nothing gets moved to “running”.

When this happened today (the first time today), I was able to track down the following error in the scheduler logs:

2022-01-12 at 7 16 PM

At some point after the scheduler had that exception, I tried to clear the state of the queued task instances to get them to run. That resulting in the following logs:

2022-01-12 at 7 18 PM

This corresponds to this section of code:

2022-01-12 at 10 38 AM

My conclusion is that when the scheduler experienced that error, it entered a pathological state: it was running but had bad state in memory. Specifically, the queued task instances were in the queued_tasks or running in-memory cache, and thus any attempts to re-queue those tasks would fail as long as that scheduler process was running because the tasks would appear to already have been queued and/or running.

Both caches use the TaskInstanceKey, which is made up of dag_id (which we can’t change), task_id (which we can’t change), execution_date (nope, can’t change), and try_number (🎉 we can change this!!).

So to work around this, I created a utility DAG that will find all task instances in a “queued” or “None” state and increment the try_number field.

The DAG runs as a single PythonOperator:

import os
from datetime import datetime, timedelta
from pprint import pprint

from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.state import State
from dateutil.parser import parse
from sqlalchemy.sql.expression import or_

DAG_NAME = os.path.splitext(os.path.basename(__file__))[0]
DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "execution_timeout": timedelta(minutes=10),
    "retries": 0,
}


@provide_session
def unstick_dag_callable(dag_run, session, **kwargs):
    dag_id = dag_run.conf.get("dag_id")
    if not dag_id:
        raise AssertionError("dag_id was not provided")
    execution_date = dag_run.conf.get("execution_date")
    if not execution_date:
        raise AssertionError("execution_date was not provided")
    execution_date = parse(execution_date)

    filter = [
        or_(TaskInstance.state == State.QUEUED, TaskInstance.state == State.NONE),
        TaskInstance.dag_id == dag_id,
        TaskInstance.execution_date == execution_date,
    ]
    print(
        (
            f"DAG id: {dag_id}, Execution Date: {execution_date}, State: "
            f"""{dag_run.conf.get("state", f"{State.QUEUED} or {State.NONE}")}, """
            f"Filter: {[str(f) for f in filter]}"
        )
    )

    tis = session.query(TaskInstance).filter(*filter).all()
    dr = (
        session.query(DagRun)
        .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
        .first()
    )
    dagrun = (
        dict(
            id=dr.id,
            dag_id=dr.dag_id,
            execution_date=dr.execution_date,
            start_date=dr.start_date,
            end_date=dr.end_date,
            _state=dr._state,
            run_id=dr.run_id,
            creating_job_id=dr.creating_job_id,
            external_trigger=dr.external_trigger,
            run_type=dr.run_type,
            conf=dr.conf,
            last_scheduling_decision=dr.last_scheduling_decision,
            dag_hash=dr.dag_hash,
        )
        if dr
        else {}
    )

    print(f"Updating {len(tis)} task instances")
    print("Here are the task instances we're going to update")
    # Print no more than 100 tis so we don't lock up the session too long
    for ti in tis[:100]:
        pprint(
            dict(
                task_id=ti.task_id,
                job_id=ti.job_id,
                key=ti.key,
                dag_id=ti.dag_id,
                execution_date=ti.execution_date,
                state=ti.state,
                dag_run={**dagrun},
            )
        )
    if len(tis) > 100:
        print("Output truncated after 100 task instances")

    for ti in tis:
        ti.try_number = ti.next_try_number
        ti.state = State.NONE
        session.merge(ti)

    if dag_run.conf.get("activate_dag_runs", True):
        dr.state = State.RUNNING
        dr.start_date = timezone.utcnow()

    print("Done")


with DAG(
    DAG_NAME,
    description="Utility DAG to fix TaskInstances stuck in queued or None state",
    default_args=DEFAULT_ARGS,
    schedule_interval=None,
    start_date=datetime(year=2021, month=8, day=1),
    max_active_runs=1,
    catchup=False,
    default_view="graph",
    is_paused_upon_creation=False,
) as dag:
    PythonOperator(task_id="unstick_dag", python_callable=unstick_dag_callable)

To use the DAG, trigger a DAG run with a dag_id and execution_date like:

{
    "dag_id": "my_stuck_dag",
    "execution_date": "2022-01-01T00:00:00Z"
}

Moments after I shipped this DAG, another DAG got stuck, and I had a chance to see if this utility DAG worked – it did! 😅


Couple of thoughts:

  • I don’t think my error is exactly the same as OP, as some very key conditions are not applicable to my case, but this issue appears to have many different and probably not at all related bugs that kind of manifest as “stuck DAGs” and this issue has pretty good Google juice – I just hope my explanation and/or work-around help someone else.
  • The MWAA product from AWS is using an older version of Airflow, so the combination of factors that leads to this pathological state may no longer be possible in the current version of Airflow.
  • MWAA uses the CeleryExecutor, which I suspect is where the pathological state is coming from, not BaseExecutor directly.
  • All that being said, I’m surprised to see this critical state being kept in memory (queued_tasks and running), but I don’t have a complete mental model of how the executor and the scheduler are distinct or not. My understanding is that this is scheduler code, but with the scheduler being high-availability (we’re running 3 schedulers), in-memory state seems like something we should be using very judiciously and be flushing and rehydrating from the database regularly.

Thanks @easontm , It’s a bug. I have a PR trying to address that see https://github.com/apache/airflow/pull/17945

@ephraimbuddy no, I have to revisit this issue again.

I can now confirm, that tasks are stuck in “queued” even with the afroimentioned issue solved. I no longer have any missing DAGs, but the issue persists.

I now have a complete Airflow installation with 100+ DAGs that gets stuck every day or so. This is a critical issue. Please adjust your priority accordingly. Many other issues pop up with the same symptoms!

@kaxil I have checked, min_file_process_interval is set to 30, however the problem is still there for me.

@SalmonTimo I have a pretty high CPU utilisation (60%), albeit the scheduler settings are default. But why? Does this matter?

––

Same issue, new day: I have Airflow running, the scheduler running, but the whole cluster has 103 scheduled tasks and 3 queued tasks, but nothing is running at all. I highly doubt that min_file_process_interval is the root of the problem. I suggest somebody mark that issue with a higher priority, I do not think that “regularly restarting the scheduler” is a reasonable solution.

What we need here is some factual inspection of the Python process. I am no Python expert, however I am proficient and know myself around in other VMs (Erlang, Ruby).

Following that stack trace idea, I just learned that Python cannot dump a process (https://stackoverflow.com/a/141826/128351), unfortunately, otherwise I would have provided you with such a process dump of my running “scheduler”.

I am very happy to provide you with some facts about my stalled scheduler, if you tell me how you would debug such an issue.

What I currently have:

  • CPU utilisation of the scheduler is still pretty high (around 60%).
  • AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL is set to 30
  • AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL is set to 10
  • Log output of scheduler:
[2021-04-06 05:19:56,201] {scheduler_job.py:1063} INFO - Setting the following tasks to queued state:

[2021-04-06 05:19:57,865] {scheduler_job.py:941} INFO - 15 tasks up for execution:

# ... snip ...

[2021-04-06 05:19:57,876] {scheduler_job.py:975} INFO - Figuring out tasks to run in Pool(name=mssql_dwh) with 0 open slots and 15 task instances ready to be queued
[2021-04-06 05:19:57,882] {scheduler_job.py:985} INFO - Not scheduling since there are 0 open slots in pool mssql_dwh

What I find striking, is the message INFO - Not scheduling since there are 0 open slots in pool mssql_dwh. That is a pool configured for max 3 slots. However no single task is running. I fear bug is, that the “scheduler” might be loosing track of running tasks on Kubernetes. Bluntly, I guess there is a bug in the components:

  • Scheduler
  • KubernetesExecutor
  • Pools

Hello, found same issue when i used ver 2.2.4 (latest) maybe we have some workaround for this things ?

@haninp - this might be (and likely is - because MWAA which plays a role here has no 2.2.4 support yet) completely different issue. It’s not helpful to say “I also have similar problem” without specifying details, logs .

As a “workaround” (or diagnosis) I suggest you to follow this FAQ here: https://airflow.apache.org/docs/apache-airflow/stable/faq.html?highlight=faq#why-is-task-not-getting-scheduled and double check if your problem is not one of those with the configuration that is explained there.

If you find you stil have a problem, then I invite you to describe it in detail in a separate issue (if this is something that is easily reproducible) or GitHub Discussion (if you have a problem but unsure how to reproduce it). Providing as many details such as your deployment details, logs, circumstances etc. are crucial to be able to help you. Just stating “I also have this problem” helps no-one (including yourself because you might thiink you delegated the problem and it will be solved, but in fact this might be a completely different problem.

I am facing the same issue in airflow-2.2.2 with kubernetes Executor.

Thanks @ephraimbuddy! That wasn’t it but I did find something that may be issue-worthy. I added some custom logging to scheduler_job._start_queued_dagruns() and noticed that the contents of dag_runs was the same 20 DAGruns in every loop (default count from config max_dagruns_per_loop_to_schedule) – the single DAG associated with these DAGruns is not the one pictured. The DAGruns in question show up first when ordering by the following code from dagrun.next_dagruns_to_examine():

.order_by(
    nulls_first(cls.last_scheduling_decision, session=session),
    cls.execution_date,
)

This DAG is set to max_active_runs=1 so all 20 examined queued DAGruns do not change state (because there is another running already). The problem arises because the _start_queued_dagruns() function (AFAIK) doesn’t update last_scheduling_decision, so every time the query is run to get next DAGruns, the same ones appear (and continue to not be scheduled if the currently active DAGrun for that DAG takes a long time – and it continues so long as there are more than 20 DAGruns queued).

I think the last_scheduling_decision column needs to be updated somewhere here:

if dag.max_active_runs and active_runs >= dag.max_active_runs:
    self.log.debug(
        "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
        dag.dag_id,
        active_runs,
        dag_run.execution_date,
    )

I was able to get around this issue currently by simply increasing the number of DAGruns handled per loop (and telling my users not to queue so many), but perhaps it should still be addressed.

I’m on Airflow 2.1.0 deployed to ECS Fargate–seeing similar issues where they are 128 slots, a small number used, but queued tasks that aren’t ever run. Perhaps relatedly I also see a task in a none state which I’m not able to track down from the UI. Cluster resources seem fine, using less than 50% CPU and memory.

I also saw an error from Sentry around this time:

could not queue task ["amplitude.events","run-amplitude-events-crawler","2021-05-27T04:15:00.000000Z",1]
Screen Shot 2021-05-28 at 9 15 32 AM Screen Shot 2021-05-28 at 9 14 58 AM

We are facing the same issue, with the kubernetes executor. K8s: 1.19.3. We use the helmchart from https://airflow-helm.github.io/charts (8.0.6), which refers to Airflow Version 2.0.1.

Running on 2.0.2 now, will keep you posted.

@lukas-at-harren – Can you check the Airflow Webserver -> Admin -> Pools and then in the row with your pool (mssql_dwh) check the Used slots. And click on the number in Used slots, it should take you to the TaskInstance page that should show the currently “running” taskinstances in that Pool.

It is possible that they are not actually running but somehow got in that state in DB. If you see 3 entries over here, please mark those tasks as success or failed, that should clear your pool.

image

image

@mattvonrocketstein I can confirm the issue is still present with autoscaling disabled.

In our case, we dynamically create dags, so the MWAA team’s first suggestion was to reduce the load on the scheduler by increasing the refresh dags interval. It seems to help. We see fewer errors in the logs, and tasks getting stuck less often, but it didn’t resolve the issue.

Now we are waiting for the second round of suggestions.

Hi All! With release 2.2.5 scheduling issues have gone away for me. I guess one of the following resolved issues made it.

  1. Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
  2. Fix race condition between triggerer and scheduler (#21316)

I am still using mostly SubDags instead of TaskGroups, since the latter makes the tree view incomprehensible. If you have a similar setup, then give 2.2.5 release a try!

Thanks for pointing me in the right direction, @potiuk. We’re planning to continue with our investigation when some resources free up to continue the migration.

@val2k I may know why increasing the try_number didn’t work. The DAG file that @danmactough posted is missing session.commit() after session.merge(). After the addition, the DAG file worked as intended for me.

@jpkoponen At least in v2.0.2 (which is the only v2.x version available on AWS MWAA), there’s no reason to call session.commit() when using the @provide_session decorator. It creates the session, and calls session.commit() for you.

Okay, I didn’t know that. In that case, I’m confused why the try_number increased with commit() but didn’t without it. 🤷

@cdibble what’s your setting for AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE? Since your task instances gets to queued state, it’s not a scheduler issue but your executor. Since you’re on kubernetes, I suggest you increase the above configuration if you haven’t done so

Hello @danmactough,

Thanks for your work ! Could you provide the whole code for your utility DAG please ?

But if I understand correctly, when you see that a task within a dag run is blocked, you have to manually trigger your utility DAG passing its dag id / execution date ?

Sure @val2k. I updated my previous comment to include the full DAG as well as a description of how to provide the DAG run config.

I ran into the same issue, the scheduler’s log:

[2021-12-01 11:45:11,850] {scheduler_job.py:1206} INFO - Executor reports execution of jira_pull_5_min.jira_pull execution_date=2021-12-01 03:40:00+00:00 exited with status success for try_number 1
[2021-12-01 11:46:26,870] {scheduler_job.py:941} INFO - 1 tasks up for execution:
        <TaskInstance: data_etl_daily_jobs.dwd.dwd_ti_lgc_project 2021-11-29 21:06:00+00:00 [scheduled]>
[2021-12-01 11:46:26,871] {scheduler_job.py:975} INFO - Figuring out tasks to run in Pool(name=data_etl_daily_jobs_pool) with 10 open slots and 1 task instances ready to be queued
[2021-12-01 11:46:26,871] {scheduler_job.py:1002} INFO - DAG data_etl_daily_jobs has 0/16 running and queued tasks
[2021-12-01 11:46:26,871] {scheduler_job.py:1063} INFO - Setting the following tasks to queued state:
        <TaskInstance: data_etl_daily_jobs.dwd.dwd_ti_lgc_project 2021-11-29 21:06:00+00:00 [scheduled]>
[2021-12-01 11:46:26,873] {scheduler_job.py:1105} INFO - Sending TaskInstanceKey(dag_id='data_etl_daily_jobs', task_id='dwd.dwd_ti_lgc_project', execution_date=datetime.datetime(2021, 11, 29, 21, 6, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 2 and queue default
[2021-12-01 11:46:26,873] {base_executor.py:85} ERROR - could not queue task TaskInstanceKey(dag_id='data_etl_daily_jobs', task_id='dwd.dwd_ti_lgc_project', execution_date=datetime.datetime(2021, 11, 29, 21, 6, tzinfo=Timezone('UTC')), try_number=1)

Stucked task: dwd.dwd_ti_lgc_project The restart of the scheduler, webserver and executor does not do anything.

After clear state of the task, the executor does not received the task.

Version: v2.0.1 Git Version:.release:2.0.1+beb8af5ac6c438c29e2c186145115fb1334a3735

i got same issue here. in my case, after i resizing airflow worker pod resouece size. (smaller than before.) it happend!. i’m not sure but it might related with your worker pod resource size… i guess…

Couple questions to help you further:

  • What version are you using?
  • What state are your tasks in?
  • What is your setup? Celery Redis etc?
  • Is this only with TaskInstances that were already present before you scaled down or also with new TaskInstances?

@hafid-d / @trucnguyenlam - have you tried separating DAGs into separate pools? The more pools that are available the less ‘lanes’ you’ll have that are piled up. Just beware that in doing this, you are also somewhat breaking the parallelism - but it does organize your DAGs much better over time!

@Vbubblery A fix was included in 2.1.1, but it is not yet clear if that fixes all the cases of this behaviour or not.

@Jorricks Amazing! Shout (here, with a direct ping, or on Airflow slack) if I can help at all.

I have resolved this issue for my environment! I’m not sure if this is the same “bug” as others, or a different issue with similar symptoms. But here we go


In my Airflow Docker image, the entrypoint is just a bootstrap script that accepts webserver or scheduler as arguments, and executes the appropriate command.

# Installing python libs, jars, etc
...
ENTRYPOINT ["/bootstrap.sh"]

bootstrap.sh:

if [ "$1" = "webserver" ]
then
	exec airflow webserver
fi

if [ "$1" = "scheduler" ]
then
	exec airflow scheduler
fi

Previous to #12766, the KubernetesExecutor fed the airflow tasks run (or airflow run in older versions) into the command section of pod YAML.

"containers": [
      {
        "args": [],
        "command": [
          "airflow",
          "run",
          "my_dag",
          "my_task",
          "2021-06-03T03:40:00+00:00",
          "--local",
          "--pool",
          "default_pool",
          "-sd",
          "/usr/local/airflow/dags/my_dag.py"
        ],

This works fine for my setup – the command just overrides my Docker’s ENTRYPOINT, the pod executes its given command and terminates on completion. However, this change moved the airflow tasks run issuance to the args section of the YAML.

'containers': [{'args': ['airflow',
                                   'tasks',
                                   'run',
                                   'my_dag,
                                   'my_task',
                                   '2021-06-02T00:00:00+00:00',
                                   '--local',
                                   '--pool',
                                   'default_pool',
                                   '--subdir',
                                   '/usr/local/airflow/dags/my_dag.py'],
                          'command': None,

These new args do not match either webserver or scheduler in bootstrap.sh, therefore the script ends cleanly and so does the pod. Here is my solution, added to the bottom of bootstrap.sh:

if [ "$1" = "airflow" ] && [ "$2" = "tasks" ] && [ "$3" = "run" ]
then
	exec "$@"
fi

Rather than just allow the pod to execute whatever it’s given in args (aka just running exec "$@" without a check), I decided to at least make sure the pod is being fed an airflow run task command.

Any solution or workaround to fix this? This makes the scheduler very unreliable and the tasks are stuck in the queued state.

Any update on this ? Facing the same issue.

Environment:

  • Airflow 2.1.0
  • Docker : apache/airflow:2.1.0-python3.8
  • Executor : Celery

I am facing the same issue again. This happens randomly and even after cleaning up the tasks instances and/or restarting the container is not fixing the issue.

airflow-scheduler | [2021-05-24 06:46:52,008] {scheduler_job.py:1105} INFO - Sending TaskInstanceKey(dag_id=‘airflow_health_checkup’, task_id=‘send_heartbeat’, execution_date=datetime.datetime(2021, 5, 24, 4, 21, tzinfo=Timezone(‘UTC’)), try_number=1) to executor with priority 100 and queue airflow_maintenance airflow-scheduler | [2021-05-24 06:46:52,008] {base_executor.py:85} ERROR - could not queue task TaskInstanceKey(dag_id=‘airflow_health_checkup’, task_id=‘send_heartbeat’, execution_date=datetime.datetime(2021, 5, 24, 4, 21, tzinfo=Timezone(‘UTC’)), try_number=1) airflow-scheduler | [2021-05-24 06:46:52,008] {scheduler_job.py:1105} INFO - Sending TaskInstanceKey(dag_id=‘airflow_health_checkup’, task_id=‘send_heartbeat’, execution_date=datetime.datetime(2021, 5, 24, 4, 24, tzinfo=Timezone(‘UTC’)), try_number=1) to executor with priority 100 and queue airflow_maintenance airflow-scheduler | [2021-05-24 06:46:52,009] {base_executor.py:85} ERROR - could not queue task TaskInstanceKey(dag_id=‘airflow_health_checkup’, task_id=‘send_heartbeat’, execution_date=datetime.datetime(2021, 5, 24, 4, 24, tzinfo=Timezone(‘UTC’)), try_number=1)

The only solution that works is manually deleting the dag… which isn’t a feasible option. This should be a very high priority as it breaks the scheduling.

in my case (Airflow 2.0.2), tasks seem to get stuck in queued state if the tasks happen to run during the deployment of Airflow (we are using ecs deploy to deploy) 🤔

As many others here, we’re also experiencing 0 slots, and task scheduled but never being run on Airflow 1.10.14 on kubernetes. Restarting the scheduler pod did trigger the run for those stuck tasks.

We’ve updated to Airflow 2.0.2 and, after one DAG run, the problem did not resurface. None of the 2,000 or so tasks that are executed by the DAG ended up in a zombie state. We’re going to keep watch over this for the next few days, but the initial results look promising.

@ashb The setup works like this (I am @overbryd, just a different account.)

  • There are dynamic DAGs that are part of the repository, and that gets pulled in regularly with a side-car container on the scheduler.
  • The worker has an init container that pulls the latest state of the repository.
  • Secondly I have DAGs that are built (very quickly) based on database query results.

This scenario opens up the following edge cases/problems:

  • A DAG can go missing, after it has been queued when the entry is removed from the database, rare but happens.
  • A DAG can go missing, after it has been queued when a git push happens, which change the DAGs, before the worker started.

I can observe the same problem with version 2.0.2:

  • Tasks fail, because a DAG/task has gone missing (we are using dynamically created DAGs, and they can go missing)
  • The scheduler keeps those queued
  • The pool gradually fills up with these queued tasks
  • The whole operation stops, because of this behaviour

My current remedy:

  • Manually remove those queued tasks

My desired solution:

When a DAG/task goes missing while it is queued, it should end up in a failed state.

@ephraimbuddy ok. I have now set AIRFLOW__KUBERNETES__DELETE_WORKER_PODS to False. Let me see if it triggers again.

@ephraimbuddy no I do not have that issue.

When I try to observe it closely, it always goes like this:

  • There are 3 tasks in the mssql_dwh pool. All of them have the state queued. Nothing is running. Nothing is started. The scheduler does not start anything new, because the pool has 0 available slots.
  • Then I clear those 3 tasks.
  • The scheduler immediately picks some tasks and puts them into queued state. Meanwhile, Kubernetes starts the pods.
  • If I am lucky, some of the tasks get executed properly, and the scheduler continues what it is supposed to do.
  • But not long, it starts to accumulate “dead” tasks in queued state. Those are NOT running in Kubernetes.
  • I checked the scheduler for error logs, and I can see some log lines like these
ERROR - Executor reports task instance <TaskInstance: <redacted> 2021-04-10 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?

@kaxil So think there must be some kind of race condition between the scheduler and the Kubernetes pod startup. Some tasks finish really quickly (successfully so) and the scheduler KEEPS them in queued state.

I’m facing the same issue as OP and unfortunately what @renanleme said does not apply to my situation. I read a lot about the AIRFLOW__SCHEDULER__RUN_DURATION that @jonathonbattista mentioned to be the solution, but first of all, this config does not appear to be implemented in airflow 2.X and second, as the Scheduler documentation says:

scheduler is designed to run as a persistent service in an Airflow production environment.

I wouldn’t mind restarting the scheduler, but it is not clear for me the reason of the hanging queued tasks. In my environment, it appears to be very random.