airflow: Confusing log for long running tasks: "dependency 'Task Instance Not Running' FAILED: Task is in the running state"

Apache Airflow version: 1.10.* / 2.0.* / 2.1.*

Kubernetes version (if you are using kubernetes) (use kubectl version): Any

Environment:

  • Cloud provider or hardware configuration: Any
  • OS (e.g. from /etc/os-release): Any
  • Kernel (e.g. uname -a): Any
  • Install tools: Any
  • Others: N/A

What happened:

This line in the TaskInstance log is very misleading. It seems to happen for tasks that take longer than one hour. When users are waiting for tasks to finish and see this in the log, they often get confused. They may think something is wrong with their task or with Airflow. In fact, this line is harmless. It’s simply saying “the TaskInstance is already running so it cannot be run again”.

{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

What you expected to happen:

The confusion is unnecessary. This line should be silenced in the log. Or it should log something clearer.

How to reproduce it:

Any task that takes more than an hour to run has this line in the log.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 24
  • Comments: 46 (18 by maintainers)

Most upvoted comments

Hello, we are on Version: 2.2.5+composer and we have the same problem but in our case the task is really stopped and retried by our policy. So the message is not harmless as it ends the job :

[2022-11-08, 05:30:41 UTC] {taskinstance.py:1034} INFO - Dependencies not met for <TaskInstance: pipeline_analytics_revenu_billing.analytics_revenu_insert scheduled__2022-11-07T03:10:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state. [2022-11-08, 05:30:41 UTC] {taskinstance.py:1034} INFO - Dependencies not met for <TaskInstance: pipeline_analytics_revenu_billing.analytics_revenu_insert scheduled__2022-11-07T03:10:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state [2022-11-08, 05:30:41 UTC] {local_task_job.py:99} INFO - Task is not able to be run

@ashb @potiuk and others watching, if you experience this issue, it’s just that your task is taking more than 6 hours, in most cases your task will continue running, but you can’t see your logs until the task finishes or fails.

As a temporary fix, you can increase the time it takes for this to happen by increasing the value of AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT to something like 86400 (24 hours).


I believe I have uncovered the cause of this issue and would appreciate feedback.

The question is, why are we losing the logs after 6 hours?
I believe this is because of a slightly complex combination of celery configs:

The visibility_timeout is the number of seconds celery will wait for an acknowledgment before it returns a task to the queue (for it to be picked up by another worker). But because task_acks_late is True, airflow’s celery workers will not acknowledge tasks until AFTER the task has finished. Hence, if a task takes more than visibility_timeout seconds to finish, celery will tell another worker to start this task.

When this happens, the new instance of this task will realize the task is still running on the old worker (it’s not failed, and is even heart-beating the TaskInstance), and will correctly wait (because a task can not be “running” in two places). But now the airflow UI only shows the logs for this new “phantom task”, which will always be:

{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
{taskinstance.py:874} INFO - Dependencies not met for <TaskInstance: ... [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

Effectively, this issue is the result of celery taking matters into its own hands (outside of the control of the airflow scheduler), and telling a new worker to start the task which is still running on the old worker.


Setting our task_acks_late to False will likely fix this issue. However, I am not sure what the other implications would be, as I can see that it has been set to True for most of Airflow’s existence, it was set to True in this commit from 2015.

Celery’s purpose of task_acks_late is to prevent “zombie” tasks that take too long. But Airflow already has “heartbeat” checks for zombie tasks, so there is probably no need to have celery re-issuing tasks.

Setting task_acks_late to False, and visibility_timeout to a much lower number (like 300 sec), will also have the added benefit of detecting celery transport failures


Finally, I want to highlight that Airflow’s docs are confusing about what the default visibility_timeout is, because this piece of code will set it to 21600, when AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT is None, which is different from the celery default of 3600

After some more investigation, it’s very likely we see this log appearing an hour after a long running task started because of the default visibility_timeout setting in Celery. This code in default_celery.py sets visibility_timeout to 21600 only if the broker_url starts with redis or sqs. In our case we are using redis sentinels so it’s still redis although the URL starts with sentinel. Therefore the visibility_timeout is left at 3600 which is the default according to celery documentation. The weird thing is that after I tried to manually change visibility_timeout to a very large integer in airflow.cfg, the same log still showed up exactly an hour after a task started. So it seems changing visibility_timeout in this case does not make any difference. Not sure if anyone experienced the same.

@david30907d maybe try changing visibility_timeout to a large number in your setup and see if it still happens after an hour. If it stops for you, it means visibility_timeout is probably the cause. There may be something wrong in our own setup causing changing visibility_timeout not to take effect.

def _broker_supports_visibility_timeout(url):
    return url.startswith("redis://") or url.startswith("sqs://")


log = logging.getLogger(__name__)

broker_url = conf.get('celery', 'BROKER_URL')

broker_transport_options = conf.getsection('celery_broker_transport_options') or {}
if 'visibility_timeout' not in broker_transport_options:
    if _broker_supports_visibility_timeout(broker_url):
        broker_transport_options['visibility_timeout'] = 21600

I bumped into this issue and I misunderstood until now 😅 thanks for pointing out it’s okay to wait

@potiuk @malthe here is the PR to change task_acks_late to False:

Let’s discuss the implications and do some testing in that thread.

In our context (GCP Airflow 2.2.5 https://github.com/apache/airflow/issues/16163#issuecomment-1306729126) the problem was correlated with Kubernetes pods being evicted due of lacking ressources. After scaling up our kubernetes cluster and updating to Airflow 2.4.3 (+ google-providers=8.8.0 due to https://stackoverflow.com/questions/74730471/gcstobigqueryoperator-not-working-in-composer-2-1-0-airflow-2-3-4/75319966#75319966 ) we have not seen this problem in the last month since upgrading . As we no longer have pods evictions I cannot conclude if the issue was fixed in Airflow or the workaround was sufficient (preventing Airflow workers to be killed).

Please also help us to know if you get any response for AWS with any solution : as we have also opened a ticket to Google Cloud

In the case where the visibility timeout is reached, it’s confusing that there is not a clear log line that the task has been killed for taking too long to complete.

(If that’s indeed what is happening.)

@potiuk is it the case, that the Celery task is killed or is it simply no longer streaming logs into Airflow at that point?

According to the Celery docs:

Late ack means the task messages will be acknowledged after the task has been executed, not right before (the default behavior).

It seems that it’s a way to ensure at-least-once processing rather than at-most-once. I would say that since Airflow does retrying on its own accord, we want the latter which is why it probably should remain disabled.

Care for submitting a PR @thesuperzapper ? That would be a small thing to give back for the years of business you have on top of Airflow

I can give you the full logs right now, my task is a PythonOperator that inserts huge CSV files into Postgres. Concretely, I pass a method argument to the to_sql pandas method, in order to apply an upsert operation.

My task is always externally failed after 6 hours 😕

It seems likely that the logs are lost for really long running celery tasks (perhaps due to the visibility_timeout, I am not sure), so people only see the Task is not able to be run error, but something else (likely their code) is deadlocking, they just cant see it.

Has anyone tried setting execution_timeout on those tasks, I assume that airflow will eventually mark that task as failed.

yes I have, and that’s not the solution, task keeps being “externally set to failed”

@yuqian90 is that possible for you to update the references of the issue in order to report updated information about the bug (e.g. version affected, episodes of task killings etc…) so it can be triaged accordingly?

I am also experiencing this in airflow 2.4.3 with a number of DAGs.

The Task is not able to be run error message is interesting, and people seem to think that this message is displayed when the AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT is shorter than the DAGs takes to run, but that is not my reading of the docs for that config, it’s more of a celery timeout for the worker to acknowledge a task once it has been assigned.

It seems likely that the logs are lost for really long running celery tasks (perhaps due to the visibility_timeout, I am not sure), so people only see the Task is not able to be run error, but something else (likely their code) is deadlocking, they just cant see it.

Has anyone tried setting execution_timeout on those tasks, I assume that airflow will eventually mark that task as failed.

Following the Airflow manual, setting AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT to a high value should fix the problem. I haven’t tryed the parameter. Do we have some more report about moving this timeout to higher value? It’s seems that this parameter is not working. We also do not know if this behaviour is due to Airflow or to Celery bugs posted. This is definetely causing production problems to some Airflow users. I suppose that the issue title is misleading since it’s not a “confusing log” but it forces a behaviour that must be definetely investigated and possibly fixed.

In order to help I noticed a similar behaviour in 2.5.0 (with celery, redis, postgres, default configs). I also found related issues on Celery ( https://github.com/celery/celery/issues/5935 , https://github.com/celery/celery/issues/6229).

In my case I have a long running task (e.g. 1 day) that logs a reschedule attempts. It seems it does not have a practical effect on the task or the DAG, the task still runs and logs correctly till the end, no other runs are created.

@malthe That’s not quite what is happening. My understanding is this:

  • The task message on celery is picked up and the TI starts running
  • After 1 hour (the celery visiblity timeout) celery makes the message visible again
  • A second worker picks up the celery message
  • When the second one runs, it gets this message.

So the first attempt is still running, and this message is the second concurrent worker (but for the same try_number) saying “I can’t run this task, it’s already running.”