airflow: DAG getting stuck in "running" state indefinitely

Apache Airflow version: 2.0.2

Kubernetes version (if you are using kubernetes) (use kubectl version):

  • Cloud provider or hardware configuration:
  • OS : Ubuntu 18.04.3
  • Install tools: celery = 4.4.7, redis = 3.5.3

What happened: When I trigger manually my dag, some of the tasks are stuck in the “queued” state in the logs.

[2021-05-21 16:55:57,808: WARNING/ForkPoolWorker-9] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,080: WARNING/ForkPoolWorker-17] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,203: WARNING/ForkPoolWorker-13] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,221: WARNING/ForkPoolWorker-5] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,247: WARNING/ForkPoolWorker-4] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,296: WARNING/ForkPoolWorker-10] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,362: WARNING/ForkPoolWorker-1] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,367: WARNING/ForkPoolWorker-8] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,433: WARNING/ForkPoolWorker-3] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,445: WARNING/ForkPoolWorker-11] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,458: WARNING/ForkPoolWorker-6] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,459: WARNING/ForkPoolWorker-2] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******
[2021-05-21 16:55:58,510: WARNING/ForkPoolWorker-12] Running <TaskInstance: ******* 2021-05-21T08:54:59.100511+00:00 [queued]> on host *******

Even when I mark them as “failed” and rerun them again it is still getting stuck. When I check on the airflow UI the dag is in the “running” state : image

And when I check the subdags some of them are in the “running” (but nothing is happening) and “scheduled” state : image

I made sure to set all the other running tasks to “failed” before running this dag.

What you expected to happen: I expect all my tasks to be run and my dag to be marked as “success” or “failed” if there is an issue.

How to reproduce it: It occures when I run the following command : airflow celery worker. It doesnt occure everytime, sometimes the dags are not running indefinitely and it works well. I restarted few times airflow webserver, worker and scheduler but it didn’t change anything.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 20 (6 by maintainers)

Most upvoted comments

+1 for this issue, I am following how this unfolds

The same issue with SubDagOperator in 2.1.2

Just tried it on both 2.1.1 and 1.10.15 same behaviour, i even tried the example found here: https://airflow.apache.org/docs/apache-airflow/1.10.15/_modules/airflow/example_dags/example_subdag_operator.html

from airflow.example_dags.subdags.subdag import subdag
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'example_subdag_operator'

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id=DAG_NAME,
    default_args=args,
    schedule_interval="@once",
    tags=['example']
)

start = DummyOperator(
    task_id='start',
    dag=dag,
)

section_1 = SubDagOperator(
    task_id='section-1',
    subdag=subdag(DAG_NAME, 'section-1', args),
    dag=dag,
)

some_other_task = DummyOperator(
    task_id='some-other-task',
    dag=dag,
)

section_2 = SubDagOperator(
    task_id='section-2',
    subdag=subdag(DAG_NAME, 'section-2', args),
    dag=dag,
)

end = DummyOperator(
    task_id='end',
    dag=dag,
)

start >> section_1 >> some_other_task >> section_2 >> end

The log is the following:

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Starting flask
 * Serving Flask app "airflow.utils.serve_logs" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
[2021-07-13 08:57:29,682] {_internal.py:113} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
[2021-07-13 08:57:29,690] {scheduler_job.py:1270} INFO - Starting the scheduler
[2021-07-13 08:57:29,691] {scheduler_job.py:1275} INFO - Processing each file at most -1 times
[2021-07-13 08:57:29,696] {dag_processing.py:254} INFO - Launched DagFileProcessorManager with pid: 129
[2021-07-13 08:57:29,697] {scheduler_job.py:1839} INFO - Resetting orphaned tasks for active dag runs
[2021-07-13 08:57:29,701] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
[2021-07-13 08:57:29,711] {dag_processing.py:532} WARNING - Because we cannot use more than 1 thread (parsing_processes = 2 ) when using sqlite. So we set parallelism to 1.
[2021-07-13 08:57:37,907] {scheduler_job.py:964} INFO - 1 tasks up for execution:
        <TaskInstance: example_subdag_operator.section-1 2021-07-11 00:00:00+00:00 [scheduled]>
[2021-07-13 08:57:37,909] {scheduler_job.py:998} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2021-07-13 08:57:37,909] {scheduler_job.py:1025} INFO - DAG example_subdag_operator has 0/16 running and queued tasks
[2021-07-13 08:57:37,909] {scheduler_job.py:1086} INFO - Setting the following tasks to queued state:
        <TaskInstance: example_subdag_operator.section-1 2021-07-11 00:00:00+00:00 [scheduled]>
[2021-07-13 08:57:37,911] {scheduler_job.py:1128} INFO - Sending TaskInstanceKey(dag_id='example_subdag_operator', task_id='section-1', execution_date=datetime.datetime(2021, 7, 11, 0, 0, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 4 and queue default
[2021-07-13 08:57:37,911] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_subdag_operator', 'section-1', '2021-07-11T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/monitor.py']
[2021-07-13 08:57:37,922] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'example_subdag_operator', 'section-1', '2021-07-11T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/monitor.py']
[2021-07-13 08:57:39,332] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags/monitor.py
Running <TaskInstance: example_subdag_operator.section-1 2021-07-11T00:00:00+00:00 [queued]> on host 969b77ead72f

@hafid-d Can you get the scheduler logs?