airflow: on_failure_callback does not seem to fire on pod deletion/eviction
Apache Airflow version: 2.0.1
Kubernetes version (if you are using kubernetes) (use kubectl version
): 1.16.x
Environment: KubernetesExecutor with single scheduler pod
What happened: On all previous versions we used (from 1.10.x to 2.0.0), evicting or deleting a running task pod triggered the on_failure_callback
from BaseOperator
. We use this functionality quite a lot to detect eviction and provide work carry-over and automatic task clear.
We’ve recently updated our dev environment to 2.0.1 and it seems that now on_failure_callback
is only fired when pod completes naturally, i.e. not evicted / deleted with kubectl
Everything looks the same on task log level when pod is removed with kubectl delete pod...
:
Received SIGTERM. Terminating subprocesses
Sending Signals.SIGTERM to GPID 16
Received SIGTERM. Terminating subprocesses.
Task received SIGTERM signal
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/li_operator.py", line 357, in execute
self.operator_task_code(context)
File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/yarn_jar_operator.py", line 62, in operator_task_code
ssh_connection=_ssh_con
File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/li_mapreduce_cluster_operator.py", line 469, in watch_application
existing_apps=_associated_applications.keys()
File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/li_mapreduce_cluster_operator.py", line 376, in get_associated_application_info
logger=self.log
File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/yarn_api/yarn_api_ssh_client.py", line 26, in send_request
_response = requests.get(request)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 76, in get
return request('get', url, params=params, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/lib/python3.7/http/client.py", line 1277, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/lib/python3.7/http/client.py", line 1323, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.7/http/client.py", line 1272, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.7/http/client.py", line 1032, in _send_output
self.send(msg)
File "/usr/local/lib/python3.7/http/client.py", line 972, in send
self.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1241, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
Marking task as FAILED. dag_id=mock_dag_limr, task_id=SetupMockScaldingDWHJob, execution_date=20190910T000000, start_date=20210224T162811, end_date=20210224T163044
Process psutil.Process(pid=16, status='terminated', exitcode=1, started='16:28:10') (16) terminated with exit code 1
But on_failure_callback
is not triggered. For simplicity, let’s assume the callback does this:
def act_on_failure(context):
send_slack_message(
message=f"{context['task_instance_key_str']} fired failure callback",
channel=get_stored_variable('slack_log_channel')
)
def get_stored_variable(variable_name, deserialize=False):
try:
return Variable.get(variable_name, deserialize_json=deserialize)
except KeyError:
if os.getenv('PYTEST_CURRENT_TEST'):
_root_dir = str(Path(__file__).parent)
_vars_path = os.path.join(_root_dir, "vars.json")
_vars_json = json.loads(open(_vars_path, 'r').read())
if deserialize:
return _vars_json.get(variable_name, {})
else:
return _vars_json.get(variable_name, "")
else:
raise
def send_slack_message(message, channel):
_web_hook_url = get_stored_variable('slack_web_hook')
post = {
"text": message,
"channel": channel
}
try:
json_data = json.dumps(post)
req = request.Request(
_web_hook_url,
data=json_data.encode('ascii'),
headers={'Content-Type': 'application/json'}
)
request.urlopen(req)
except request.HTTPError as em:
print('Failed to send slack messsage to the hook {hook}: {msg}, request: {req}'.format(
hook=_web_hook_url,
msg=str(em),
req=str(post)
))
Scheduler logs related to this event:
21-02-24 16:33:04,968] {kubernetes_executor.py:147} INFO - Event: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 had an event of type MODIFIED
[2021-02-24 16:33:04,968] {kubernetes_executor.py:202} INFO - Event: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 Pending
[2021-02-24 16:33:04,979] {kubernetes_executor.py:147} INFO - Event: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 had an event of type DELETED
[2021-02-24 16:33:04,979] {kubernetes_executor.py:197} INFO - Event: Failed to start pod mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3, will reschedule
[2021-02-24 16:33:05,406] {kubernetes_executor.py:354} INFO - Attempting to finish pod; pod_id: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3; state: up_for_reschedule; annotations: {'dag_id': 'mock_dag_limr', 'task_id': 'SetupMockSparkDwhJob', 'execution_date': '2019-09-10T00:00:00+00:00', 'try_number': '9'}
[2021-02-24 16:33:05,419] {kubernetes_executor.py:528} INFO - Changing state of (TaskInstanceKey(dag_id='mock_dag_limr', task_id='SetupMockSparkDwhJob', execution_date=datetime.datetime(2019, 9, 10, 0, 0, tzinfo=tzlocal()), try_number=9), 'up_for_reschedule', 'mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3', 'airflow', '173647183') to up_for_reschedule
[2021-02-24 16:33:05,422] {scheduler_job.py:1206} INFO - Executor reports execution of mock_dag_limr.SetupMockSparkDwhJob execution_date=2019-09-10 00:00:00+00:00 exited with status up_for_reschedule for try_number 9
However task stays in failed state (not what scheduler says)
When pod completes on its own (fails, exits with 0), callbacks are triggered correctly
What you expected to happen: on_failure_callback
is called regardless of how pod exists, including SIGTERM-based interruptions: pod eviction, pod deletion
https://github.com/apache/airflow/blob/2.0.1/airflow/models/taskinstance.py#L1149
But then it is unclear why finally
clause here does not run:
https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1422
How to reproduce it:
With Airflow 2.0.1 running KubernetesExecutor, execute kubectl delete ...
on any running task pod. Task operator should define on_failure_callback
. In order to check that it is/not called, send data from it to any external logging system
Anything else we need to know: Problem is persistent and only exists in 2.0.1 version
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 18 (17 by maintainers)
Commits related to this issue
- Execute ``on_failure_callback`` when SIGTERM is received (#15172) Currently, on_failure_callback is only called when a task finishes executing not while executing. When a pod is deleted, a SIGTERM i... — committed to apache/airflow by ephraimbuddy 3 years ago
- Execute ``on_failure_callback`` when SIGTERM is received (#15172) Currently, on_failure_callback is only called when a task finishes executing not while executing. When a pod is deleted, a SIGTERM is... — committed to apache/airflow by ephraimbuddy 3 years ago
Thanks @houqp for the insights! Really appreciate. Now I’m working on the SIGKILL and will keep you posted
Interesting, I was expecting the second SIGTERM would have resulted in the task subprocess to set its own state in through
handle_failure
becauseself.on_kill
callsself.task_runner.terminate()
, which is supposed to wait for the subprocess to exit:https://github.com/apache/airflow/blob/e7c642ba2a79ea13d6ef84b78242f6c313cd3457/airflow/task/task_runner/standard_task_runner.py#L108-L117
I think we should do this as an extra safe guard because in rare cases, the task sub process could crash any time after it received SIGTERM and before it updates its own task state. However, I think the state update logic should be set with an extra condition:
This is to handle the case where the task could have exited successfully right after the pod is getting killed but before local task job executes
handle_task_exit
.@kaxil , Yes. We both tried it and it didn’t work because the task was still in the RUNNING state. I have tried marking the task as Failed when AirflowException is raised and also tried a bunch of other things. I’m still on it but I think handling this the way we handle email_on_failure should fix it which is what my PR did.
I’m yet to test sending sigkill but will test it soon on my PR and I will also try this your suggestion @kaxil
I think this happened in 2.0.1 mainly because of the following trace
When you delete the POD, the KubernetesExecutor executes the following:
https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/executors/kubernetes_executor.py#L195-L200
i.e. it tried to reschedule your POD as evident by the logs in the Issue description too.
Which then executes the following and puts the TaskInstance Key to
result_queue
:https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/executors/kubernetes_executor.py#L350-L359
The TI is then marked with state
RESCHEDULE
(atleast according to executor events) in:https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/executors/kubernetes_executor.py#L522-L542
(4) Now all the above 3 events were happening from
KubernetesExecutor
point of view.At the same time when the POD was killed (sent SIGTERM), the Task Pod receives the SIGTERM and executes the following call since we override SIGTERM call and raises
AirflowException
(which matches your logs and stacktrace):https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/models/taskinstance.py#L1238-L1241
This
AirflowException
is handled here:https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/models/taskinstance.py#L1142-L1150
which then calls inside
handle_failure
https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/models/taskinstance.py#L1484-L1490
which does not run a failure_callback. This bug might have been introduced in https://github.com/apache/airflow/commit/efe163a1fddfd66fa402231906e96733efddf8af where we moved running callbacks in
LocalTaskJob
:https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/jobs/local_task_job.py#L123-L126
https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/jobs/local_task_job.py#L144-L153
I don’t see
LocalTaskJob
exit logs in your trace so I am not sure why that happened.Secondly, recently @jedcunningham changed (3) where we mark task as RESCHEDULED to FAILED in https://github.com/apache/airflow/pull/14810 – which means atleast the logging around that will be taken care off, we still need to investigate on why LocalTaskJob was not executed.