airflow: on_failure_callback does not seem to fire on pod deletion/eviction

Apache Airflow version: 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.16.x

Environment: KubernetesExecutor with single scheduler pod

What happened: On all previous versions we used (from 1.10.x to 2.0.0), evicting or deleting a running task pod triggered the on_failure_callback from BaseOperator. We use this functionality quite a lot to detect eviction and provide work carry-over and automatic task clear.

We’ve recently updated our dev environment to 2.0.1 and it seems that now on_failure_callback is only fired when pod completes naturally, i.e. not evicted / deleted with kubectl

Everything looks the same on task log level when pod is removed with kubectl delete pod...:

Received SIGTERM. Terminating subprocesses
Sending Signals.SIGTERM to GPID 16
Received SIGTERM. Terminating subprocesses.
Task received SIGTERM signal
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/li_operator.py", line 357, in execute
    self.operator_task_code(context)
  File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/yarn_jar_operator.py", line 62, in operator_task_code
    ssh_connection=_ssh_con
  File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/li_mapreduce_cluster_operator.py", line 469, in watch_application
    existing_apps=_associated_applications.keys()
  File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/li_mapreduce_cluster_operator.py", line 376, in get_associated_application_info
    logger=self.log
  File "/usr/local/lib/python3.7/site-packages/li_airflow_common/custom_operators/mapreduce/yarn_api/yarn_api_ssh_client.py", line 26, in send_request
    _response = requests.get(request)
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 76, in get
    return request('get', url, params=params, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1277, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1323, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1272, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1032, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 972, in send
    self.connect()
  File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
    conn = self._new_conn()
  File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1241, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
Marking task as FAILED. dag_id=mock_dag_limr, task_id=SetupMockScaldingDWHJob, execution_date=20190910T000000, start_date=20210224T162811, end_date=20210224T163044
Process psutil.Process(pid=16, status='terminated', exitcode=1, started='16:28:10') (16) terminated with exit code 1

But on_failure_callback is not triggered. For simplicity, let’s assume the callback does this:

def act_on_failure(context):
    send_slack_message(
        message=f"{context['task_instance_key_str']} fired failure callback",
        channel=get_stored_variable('slack_log_channel')
    )

def get_stored_variable(variable_name, deserialize=False):
    try:
        return Variable.get(variable_name, deserialize_json=deserialize)
    except KeyError:
        if os.getenv('PYTEST_CURRENT_TEST'):
            _root_dir = str(Path(__file__).parent)
            _vars_path = os.path.join(_root_dir, "vars.json")
            _vars_json = json.loads(open(_vars_path, 'r').read())
            if deserialize:
                return _vars_json.get(variable_name, {})
            else:
                return _vars_json.get(variable_name, "")
        else:
            raise

def send_slack_message(message, channel):
    _web_hook_url = get_stored_variable('slack_web_hook')
    post = {
        "text": message,
        "channel": channel
    }

    try:
        json_data = json.dumps(post)
        req = request.Request(
            _web_hook_url,
            data=json_data.encode('ascii'),
            headers={'Content-Type': 'application/json'}
        )

        request.urlopen(req)
    except request.HTTPError as em:
        print('Failed to send slack messsage to the hook {hook}: {msg}, request: {req}'.format(
            hook=_web_hook_url,
            msg=str(em),
            req=str(post)
        ))

Scheduler logs related to this event:

21-02-24 16:33:04,968] {kubernetes_executor.py:147} INFO - Event: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 had an event of type MODIFIED
[2021-02-24 16:33:04,968] {kubernetes_executor.py:202} INFO - Event: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 Pending
[2021-02-24 16:33:04,979] {kubernetes_executor.py:147} INFO - Event: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3 had an event of type DELETED
[2021-02-24 16:33:04,979] {kubernetes_executor.py:197} INFO - Event: Failed to start pod mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3, will reschedule
[2021-02-24 16:33:05,406] {kubernetes_executor.py:354} INFO - Attempting to finish pod; pod_id: mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3; state: up_for_reschedule; annotations: {'dag_id': 'mock_dag_limr', 'task_id': 'SetupMockSparkDwhJob', 'execution_date': '2019-09-10T00:00:00+00:00', 'try_number': '9'}
[2021-02-24 16:33:05,419] {kubernetes_executor.py:528} INFO - Changing state of (TaskInstanceKey(dag_id='mock_dag_limr', task_id='SetupMockSparkDwhJob', execution_date=datetime.datetime(2019, 9, 10, 0, 0, tzinfo=tzlocal()), try_number=9), 'up_for_reschedule', 'mockdaglimrsetupmocksparkdwhjob.791032759a764d8bae66fc7bd7ab2db3', 'airflow', '173647183') to up_for_reschedule
[2021-02-24 16:33:05,422] {scheduler_job.py:1206} INFO - Executor reports execution of mock_dag_limr.SetupMockSparkDwhJob execution_date=2019-09-10 00:00:00+00:00 exited with status up_for_reschedule for try_number 9

However task stays in failed state (not what scheduler says)

When pod completes on its own (fails, exits with 0), callbacks are triggered correctly

What you expected to happen: on_failure_callback is called regardless of how pod exists, including SIGTERM-based interruptions: pod eviction, pod deletion

Not sure really. We believe this code is executed since we get full stack trace

https://github.com/apache/airflow/blob/2.0.1/airflow/models/taskinstance.py#L1149

But then it is unclear why finally clause here does not run: https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1422

How to reproduce it:

With Airflow 2.0.1 running KubernetesExecutor, execute kubectl delete ... on any running task pod. Task operator should define on_failure_callback. In order to check that it is/not called, send data from it to any external logging system

Anything else we need to know: Problem is persistent and only exists in 2.0.1 version

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 18 (17 by maintainers)

Commits related to this issue

Most upvoted comments

Thanks @houqp for the insights! Really appreciate. Now I’m working on the SIGKILL and will keep you posted

Interesting, I was expecting the second SIGTERM would have resulted in the task subprocess to set its own state in through handle_failure because self.on_kill calls self.task_runner.terminate(), which is supposed to wait for the subprocess to exit:

https://github.com/apache/airflow/blob/e7c642ba2a79ea13d6ef84b78242f6c313cd3457/airflow/task/task_runner/standard_task_runner.py#L108-L117

We should probably add self.task_instance.state=State.FAILED in handle_task_exit if exit_code != 1. WDYT @houqp @ephraimbuddy ?

I think we should do this as an extra safe guard because in rare cases, the task sub process could crash any time after it received SIGTERM and before it updates its own task state. However, I think the state update logic should be set with an extra condition:

if self.task_instance.state not in State.finished():
    self.task_instance.state = State.FAILED

This is to handle the case where the task could have exited successfully right after the pod is getting killed but before local task job executes handle_task_exit.

@kaxil , Yes. We both tried it and it didn’t work because the task was still in the RUNNING state. I have tried marking the task as Failed when AirflowException is raised and also tried a bunch of other things. I’m still on it but I think handling this the way we handle email_on_failure should fix it which is what my PR did.

I’m yet to test sending sigkill but will test it soon on my PR and I will also try this your suggestion @kaxil

I think this happened in 2.0.1 mainly because of the following trace

When you delete the POD, the KubernetesExecutor executes the following:

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/executors/kubernetes_executor.py#L195-L200

i.e. it tried to reschedule your POD as evident by the logs in the Issue description too.

Which then executes the following and puts the TaskInstance Key to result_queue:

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/executors/kubernetes_executor.py#L350-L359

The TI is then marked with state RESCHEDULE (atleast according to executor events) in:

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/executors/kubernetes_executor.py#L522-L542

(4) Now all the above 3 events were happening from KubernetesExecutor point of view.

At the same time when the POD was killed (sent SIGTERM), the Task Pod receives the SIGTERM and executes the following call since we override SIGTERM call and raises AirflowException (which matches your logs and stacktrace):

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/models/taskinstance.py#L1238-L1241

This AirflowException is handled here:

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/models/taskinstance.py#L1142-L1150

which then calls inside handle_failure

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/models/taskinstance.py#L1484-L1490

which does not run a failure_callback. This bug might have been introduced in https://github.com/apache/airflow/commit/efe163a1fddfd66fa402231906e96733efddf8af where we moved running callbacks in LocalTaskJob:

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/jobs/local_task_job.py#L123-L126

https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/jobs/local_task_job.py#L144-L153

I don’t see LocalTaskJob exit logs in your trace so I am not sure why that happened.

Secondly, recently @jedcunningham changed (3) where we mark task as RESCHEDULED to FAILED in https://github.com/apache/airflow/pull/14810 – which means atleast the logging around that will be taken care off, we still need to investigate on why LocalTaskJob was not executed.