airflow: Copy of [AIRFLOW-5071] JIRA: Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally?
Apache Airflow version: 1.10.9
Kubernetes version (if you are using kubernetes) (use kubectl version
): Server: v1.10.13, Client: v1.17.0
Environment:
- Cloud provider or hardware configuration: AWS
- OS (e.g. from /etc/os-release): Debian GNU/Linux 9 (stretch)
- Kernel (e.g.
uname -a
):Linux airflow-web-54fc4fb694-ftkp5 4.19.123-coreos #1 SMP Fri May 22 19:21:11 -00 2020 x86_64 GNU/Linux
- Others: Redis, CeleryExecutor
What happened:
In line with the guidelines laid out in AIRFLOW-7120, I’m copying over a JIRA for a bug that has significant negative impact on our pipeline SLAs. The original ticket is AIRFLOW-5071 which has a lot of details from various users who use ExternalTaskSensors in reschedule mode and see their tasks going through the following unexpected state transitions:
running -> up_for_reschedule -> scheduled -> queued -> up_for_retry
In our case, this issue seems to affect approximately ~2000 tasks per day.
What you expected to happen:
I would expect that tasks would go through the following state transitions instead: running -> up_for_reschedule -> scheduled -> queued -> running
How to reproduce it:
Unfortunately, I don’t have configuration available that could be used to easily reproduce the issue at the moment. However, based on the thread in AIRFLOW-5071, the problem seems to arise in deployments that use a large number of sensors in reschedule mode.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 25
- Comments: 60 (25 by maintainers)
Links to this issue
- Airflow 2.2.4 manually triggered DAG stuck in 'queued' status - Stack Overflow
- Cloud Composer release notes | Google Cloud
- Bekannte Probleme | Cloud Composer | Google Cloud
- Problemas conhecidos | Cloud Composer | Google Cloud
- Problèmes connus | Cloud Composer | Google Cloud
- 已知问题 | Cloud Composer | Google Cloud
- Problemas conocidos | Cloud Composer | Google Cloud
- 已知问题 | Cloud Composer | Google Cloud
- 알려진 문제 | Cloud Composer | Google Cloud
- 알려진 문제 | Cloud Composer | Google Cloud
- Bekannte Probleme | Cloud Composer | Google Cloud
- Problemas conocidos | Cloud Composer | Google Cloud
- 既知の問題 | Cloud Composer | Google Cloud
- Problèmes connus | Cloud Composer | Google Cloud
- Known issues | Cloud Composer | Google Cloud
- 既知の問題 | Cloud Composer | Google Cloud
- Problemi noti | Cloud Composer | Google Cloud
- Problemi noti | Cloud Composer | Google Cloud
- Known issues | Cloud Composer | Google Cloud
- Problemas conhecidos | Cloud Composer | Google Cloud
- Masalah umum | Cloud Composer | Google Cloud
- Masalah umum | Cloud Composer | Google Cloud
- Problemas conocidos | Cloud Composer | Google Cloud
We have just introduced ExternalTaskSensor into our pipeline and faced the same issue. When initially tested on our dev instance (~200 DAGs) it worked fine, after running it on our prod environment (~400 DAGs) it was always failing after reschedule.
After digging into the code, it looks that this is simply race condition in the scheduler.
We have child_dag.parent_dag_completed task that waits for business process to complete calculations in parent_dag, task execution logs:
Scheduler logs:
From scheduler log it’s visible that event from executor is processed after task is already queued for the second time.
Logic related to those logs is here:
This is the place where task state is changes:
Unfortunately I think that moving _process_executor_events before _process_and_execute_tasks would not solve the issue as event might arrive from executor while _process_and_execute_tasks is executing. Increasing poke_interval reduces chance of this race condition happening when scheduler is under a heavy load.
I’m not too familiar with Airflow code base, but it seems that the root cause is the way how reschedule works and the fact that try_number is not changing. Because of that scheduler thinks that event for past execution is for the ongoing one.
Just summarising what others have reported worked for them:
After STRUGLING, We found a method to 100% reproduce this issue !!!
tl;dr
https://github.com/apache/airflow/blob/9ac742885ffb83c15f7e3dc910b0cf9df073407a/airflow/models/taskinstance.py#L1253
Add a
raise
to simulate db error which will likely happen when the DB is under great pressure.Then you will get this issue
Was the task killed externally
in all the time.Conditions:
It’s becasue the worker use a local task job which will spwan a child process to execute the job. The parent process set the task from
Queued
toRunning
State. However, when the prepare work for the parent process failed, it will lead to this error directly.related code is here: https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/local_task_job.py#L89
Hi @turbaszek, any finding on this? We have a CeleryExecutor + Redis setup with three workers (apache-airflow 1.10.12). The airflow-scheduler log has a lot of lines like this. I remember this was already a problem when we were using older versions such as 1.10.10. It’s just we never paid much attention to it.
Same with others in this thread, we have a lot of sensors in “reschedule” mode with
poke_interval
set to 60s. These are the ones that most often hit this error. So far our workaround has been to add aretries=3
to these sensors. That way when this error happens it retries and we don’t get any spam. This is definitely not a great long term solution though. Such sensors go intoup_for_retry
state when this happen.I also tried to tweak these parameters. They don’t seem to matter much as far as this error is concerned:
The way to reproduce this issue seems to be to create a DAG with a bunch of parallel
reschedule
sensors. And make the DAG slow to import. For example, like this. If we add atime.sleep(30)
at the end to simulate the experience of slow-to-import DAGs, this error happens a lot for such sensors. You may also need to tweak thedagbag_import_timeout
anddag_file_processor_timeout
if adding thesleep
causes dags to fail to import altogether.When the scheduler starts to process this DAG, we then start to see the above error happening to these sensors. And the go into
up_for_retry
.we reviewed the code and found that in
local_task_job.py
, the parent process has aheatbeat_callback
, and will check the state and child process return code of thetask_instance
.However, theses lines may cover a bug?
The raw task command write back the taskintance’s state(like sucess) doesn’t mean the child process is finished(returned)?
So, in this heatbeat callback, there maybe a race condition when task state is filled back while the child process is not returned.
In this senario, the local task will kill the child process by mistake. And then, the scheduler will checkout this and report “task instance X finished (success) although the task says its queued. Was the task killed externally?”
this is a simple schematic diagram:
Same here with Airflow 2.0.1
@turbaszek Let me make a PR later~ We are doing pressure tests these days and this problem had appeared often.
@turbaszek I am currently testing Airflow v2.0.0b3 against the same DAGS we currently run on production against 1.10.12 and I can confirm that this is still an issue.
Combined with #12552 it makes the problem even worse too.
We also got the same error message. In our case, it turns out that we are using the same name for different dags. Changing different dags from
as dag
to likeas dags1
andas dags2
solve the issue for us.@yuqian90
Those parameters won’t help you much. I was struggling to somehow workaround this issue and I believe I’ve found the right solution now. In my case the biggest hint while debugging was not scheduler/worker logs but the Celery Flower Web UI. We have a setup of 3 Celery workers, 4 CPU each. It often happened that Celery was running 8 or more python reschedule sensors on one worker but 0 on the others and that was the exact time when sensors started to fail. There are two Celery settings that are responsible for this behavior:
worker_concurrency
with a default value of “16” andworker_autoscale
with a default value of “16,12” (it basically means that minimum Celery process # on the worker is 12 and can be scaled up to 16). With those set with default values Celery was configured to load up to 16 tasks (mainly reschedule sensors) to one node. After settingworker_concurrency
to match the CPU number andworker_autoscale
to “4,2” the problem is literally gone. Maybe that might be anothe clue for @turbaszek.I’ve been trying a lot to setup a local docker compose file with scheduler, webserver, flower, postgres and RabbitMQ as a Celery backend but I was not able to replicate the issue as well. I tried to start a worker container with limited CPU to somehow imitate this situation, but I failed. There are in fact tasks killed and shown as failed in Celery Flower, but not with the “killed externally” reason.
We also run into this fairly often, despite not using any sensors. We only seemed to start getting this error once we changed our Airflow database to be in the cloud (AWS RDB); our Airflow webserver & scheduler runs on desktop workstations on-premises. As others have suggested in this thread, this is a very annoying problem that requires manual intervention.
@ghostbody any progress on determining if that’s the correct root cause?
I found that in the code of
airflow/jobs/scheduler_job.py
: https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L535The scheduler checks the state of the task instance. When a task instance is rescheduled (e.g: an external sensor), its state transition up_for_reschedule -> scheduled -> queued -> running. If its state is queued and not moved to the running state, the scheduler will raise an error. So I think the code needs to be changed:
Here is my PR: https://github.com/apache/airflow/pull/19123
Hello. I am using airflow 2.0 and just ran into this error.
How can I fix it??
Quite agree. There were multiple people reporting problems in huge airflow installation where EFS was used. I can also recommend (as usual) switching to Git Sync. I wrote an article about it https://medium.com/apache-airflow/shared-volumes-in-airflow-the-good-the-bad-and-the-ugly-22e9f681afca - especially when you are using Git to store your DAGs already, using shared volume is completely unnecessary and using Git Sync directly is far better solution.
airflow: 2.2.2 with mysql8、 HA scheduler、celery executor(redis backend)
From logs, it show that those ti reported this error
killed externally (status: success)
, were rescheduled!From mysql we get that: all failed task has no external_executor_id!
We use 5000 dags, each with 50 dummy task, found that, if the following two conditions are met,the probability of triggering this problem will highly increase:
adopt_or_reset_orphaned_tasks
judge that schedulerJob failed, and try adopt orphaned ti https://github.com/apache/airflow/blob/9ac742885ffb83c15f7e3dc910b0cf9df073407a/airflow/executors/celery_executor.py#L442We do these tests:
SchedulerJob. _process_executor_events
, not to set external_executor_id to those queued tikilled externally (status: success)
normally less than 10adopt_or_reset_orphaned_tasks
, not to adopt orphaned tiI read the notes below , but still don’t understand this problems:
The problem for us was that we had one dag that reach 32 parallelize runnable task ( 32 leaf tasks) which was the value of parameter
parallelism
. After this, the scheduler was not able to run (or queue) any task. Increasing this parameter solve the problem for us.Hey Guys, Currently we are on the Airflow 1.14 version; We were getting a similar issue with our tasks going under up_for_retry state for hours. I went thru this thread & comments|inputs from various users on tweaking the poke_interval values; Our original poke_interval was set to 60 and changing the value to ~93 seconds resolved the issue with tasks getting into up_for_retry state ; This worked like a charm, but wanted to get more details on the race condition that scheduler is getting into when the poke_interval values are <= 60. Appreciate your help.
Trying Airflow 2.0.1. No tasks could be executed 😦
@turbaszek I just tried it again and I couldn’t replicate this error again on 2.0.
The cause is clear as @rafalkozik mentioned. After scheduler schedule the task at the second time(put it in queue) and then it start process the executor events of the task’s first-try. It occurs when the scheduling loop time > sensor task reschedule interval. Either reducing the scheduler looping time(dag processing time, etc) or increasing the sensor task reschedule interval will work.
The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.
Hi, @turbaszek in my case I have
dagbag_import_timeout = 100
anddag_file_processor_timeout = 300
. Most of the time dag import takes about 10s. dag file processing can take 60s that’s why it’s set to a large number.After digging further, I think the slowness that causes the error for our case is in this function:
SchedulerJob._process_dags()
. If this function takes around 60s, thosereschedule
sensors will hit theERROR - Executor reports task instance ... killed externally?
error. My previous comment about adding thetime.sleep(30)
is just one way to replicate this issue. Anything that causes_process_dags()
to slow down should be able to replicate this error.As it was reported in original issue and comments this behavior should be possible to reproduce in case of fast sensors in reschedule mode. That’s why I was trying to use many DAGs like this:
And I was also playing with airflow config settings as described in comments. Although I saw failing tasks there was no issue like this one or… eventually the log was missing?
I did some tests with external task sensor but also no results.