airflow: Cloudwatch Integration: SIGTERM/SIGKILL Sent Following DAG Completion, Causing Errors in Worker Logs

Apache Airflow version: 2.0.0

Environment: Docker Stack Celery Executor w/ Redis 3 Workers, Scheduler + Webserver Cloudwatch remote config turned on

What happened: Following execution of a DAG when using cloudwatch integration, the state of the Task Instance is being externally set, causing SIGTERM/SIGKILL signals to be sent. This causes error logs in Workers, which is a nuisance for alert monitoring

*** Reading remote log from Cloudwatch log_group: dev1-airflow-task log_stream: xxxx/task/2021-01-21T17_59_19.643994+00_00/1.log.
Dependencies all met for <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [queued]>
Dependencies all met for <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [queued]>
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
Executing <Task(TaskVerificationOperator): task > on 2021-01-21T17:59:19.643994+00:00
Started process 654 to run task
Running <TaskInstance: xxxx.task  2021-01-21T17:59:19.643994+00:00 [running]> on host 88f99fbc97a8
Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=xxxxxxxx
AIRFLOW_CTX_DAG_OWNER=xxxxxxxx
AIRFLOW_CTX_DAG_ID=xxxxxxxx
AIRFLOW_CTX_TASK_ID=xxxxxx
AIRFLOW_CTX_EXECUTION_DATE=2021-01-21T17:59:19.643994+00:00
AIRFLOW_CTX_DAG_RUN_ID=85
Set new audit correlation_id xxxxxxxxxx-xxxxxx-xxxxxxxxx
Using connection to: id: xxxxx. Host: xxxxxxx, Port: 5432, Schema: xxxxxx, Login: xxxxxx, Password: XXXXXXXX, extra: None
Marking task as SUCCESS. dag_id=xxxxxx, task_id=xxxxxx, execution_date=20210121T175919, start_date=20210121T175936, end_date=20210121T175938
1 downstream tasks scheduled from follow-on schedule check

However following the completion of the DAG, the following is appended to the logs:

State of this instance has been externally set to success. Terminating instance.
Sending Signals.SIGTERM to GPID 654
process psutil.Process(pid=654, name='xxxxx', status='sleeping', started='17:59:36') did not respond to SIGTERM. Trying SIGKILL
Process psutil.Process(pid=654, name='xxxxx', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='17:59:36') (654) terminated with exit code Negsignal.SIGKILL
Task exited with return code Negsignal.SIGKILL

This is a problem, because it causes the following to appear in Worker logs:

[2021-01-21 15:00:01,102: WARNING/ForkPoolWorker-8] Running <TaskInstance: xxxx.task 2021-01-21T14:00:00+00:00 [queued]> on host ip-172-31-3-210.ec2.internal
...
[2021-01-21 15:00:06,599: ERROR/ForkPoolWorker-8] Failed to execute task Task received SIGTERM signal.

What you expected to happen: No errors to appear in Worker logs, if this SIGTERM/SIGKILL is intended

How to reproduce it: Use Airflow w/ Celery Executor and Cloudwatch Remote Logging

Anything else we need to know: Occurs every time, every task in DAG

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 32 (23 by maintainers)

Most upvoted comments

It also seems that this issues sets all the JobRuns to have failed state although the TaskInstance is correctly marked as success

I did some dive deep on this issue and root caused it.

TL;DR It is related to watchtower and has been fixed in version 2.0.0 and later versions. Airflow 2.2.4 now uses that version (see #19907). So, if you are using it, you should be good. If not, then you need to force install watchtower version 2.0.0 or later. Notice, however, that watchtower made some changes to the CloudWatchLogHandler __init__ method so you need to update the relevant code.


The issue is related to a combination of three factors: forking + threading + logging. This combination can lead to a deadlock when logs are being flushed after a task finishes execution. This means that the StandardTaskRunner will be stuck at this line. Now, since the task has actually finished (thus its state is success), but the process didn’t yet exit (and will never since it is in a deadlock state), the heartbeat_callback will end up thinking that the task state was set externally and issue the following warning and then SIGTERM the task:

State of this instance has been externally set to success. Terminating instance.

Notice that this could cause further issues:

  • Could not read remote logs from log_group: This error happens when we don’t have the necessary log data in CloudWatch. This can easily happen with a task that writes logs at the end which gets interrupted by the SIGTERM and thus no log is published.
  • Celery command failed on host: Obviously, when a SIGTERM is sent, the process will exit with a non-zero code, and Airflow ends up generating this error here.
  • Tasks executing multiple times: In case a Celery Executor + SQS is used (as in Amazon MWAA for example), and since Airflow uses Celery’s ack_late feature (see here), a SIGTERM signal will result in the task message not to be deleted from the SQS queue, and thus after its timeout, it will go back to the queue and will be picked again by another worker.

References