airflow: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs
Apache Airflow version: 2.0.0
Environment: Docker Stack Celery Executor w/ Redis 3 Workers, Scheduler + Webserver Cloudwatch remote config turned on
What happened: Following execution of a DAG when using cloudwatch integration, the state of the Task Instance is being externally set, causing SIGTERM/SIGKILL signals to be sent. This causes error logs in Workers, which is a nuisance for alert monitoring
*** Reading remote log from Cloudwatch log_group: dev1-airflow-task log_stream: xxxx/task/2021-01-21T17_59_19.643994+00_00/1.log.
Dependencies all met for <TaskInstance: xxxx.task 2021-01-21T17:59:19.643994+00:00 [queued]>
Dependencies all met for <TaskInstance: xxxx.task 2021-01-21T17:59:19.643994+00:00 [queued]>
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
Executing <Task(TaskVerificationOperator): task > on 2021-01-21T17:59:19.643994+00:00
Started process 654 to run task
Running <TaskInstance: xxxx.task 2021-01-21T17:59:19.643994+00:00 [running]> on host 88f99fbc97a8
Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=xxxxxxxx
AIRFLOW_CTX_DAG_OWNER=xxxxxxxx
AIRFLOW_CTX_DAG_ID=xxxxxxxx
AIRFLOW_CTX_TASK_ID=xxxxxx
AIRFLOW_CTX_EXECUTION_DATE=2021-01-21T17:59:19.643994+00:00
AIRFLOW_CTX_DAG_RUN_ID=85
Set new audit correlation_id xxxxxxxxxx-xxxxxx-xxxxxxxxx
Using connection to: id: xxxxx. Host: xxxxxxx, Port: 5432, Schema: xxxxxx, Login: xxxxxx, Password: XXXXXXXX, extra: None
Marking task as SUCCESS. dag_id=xxxxxx, task_id=xxxxxx, execution_date=20210121T175919, start_date=20210121T175936, end_date=20210121T175938
1 downstream tasks scheduled from follow-on schedule check
However following the completion of the DAG, the following is appended to the logs:
State of this instance has been externally set to success. Terminating instance.
Sending Signals.SIGTERM to GPID 654
process psutil.Process(pid=654, name='xxxxx', status='sleeping', started='17:59:36') did not respond to SIGTERM. Trying SIGKILL
Process psutil.Process(pid=654, name='xxxxx', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='17:59:36') (654) terminated with exit code Negsignal.SIGKILL
Task exited with return code Negsignal.SIGKILL
This is a problem, because it causes the following to appear in Worker logs:
[2021-01-21 15:00:01,102: WARNING/ForkPoolWorker-8] Running <TaskInstance: xxxx.task 2021-01-21T14:00:00+00:00 [queued]> on host ip-172-31-3-210.ec2.internal
...
[2021-01-21 15:00:06,599: ERROR/ForkPoolWorker-8] Failed to execute task Task received SIGTERM signal.
What you expected to happen: No errors to appear in Worker logs, if this SIGTERM/SIGKILL is intended
How to reproduce it: Use Airflow w/ Celery Executor and Cloudwatch Remote Logging
Anything else we need to know: Occurs every time, every task in DAG
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 32 (23 by maintainers)
It also seems that this issues sets all the JobRuns to have
failed
state although the TaskInstance is correctly marked assuccess
I did some dive deep on this issue and root caused it.
TL;DR It is related to
watchtower
and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force installwatchtower
version 2.0.0 or later. Notice, however, thatwatchtower
made some changes to theCloudWatchLogHandler
__init__
method so you need to update the relevant code.The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at this line. Now, since the task has actually finished (thus its state is success), but the process didn’t yet exit (and will never since it is in a deadlock state), the
heartbeat_callback
will end up thinking that the task state was set externally and issue the following warning and then SIGTERM the task:Notice that this could cause further issues:
ack_late
feature (see here), a SIGTERM signal will result in the task message not to be deleted from the SQS queue, and thus after its timeout, it will go back to the queue and will be picked again by another worker.References