airflow: Tasks intermittently get terminated with SIGTERM on K8s executor

Apache Airflow version:2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version):1.18.10

Environment:

  • Cloud provider or hardware configuration:Azure Kubernetes Service
  • OS (e.g. from /etc/os-release):Debian GNU/Linux 10
  • Kernel (e.g. uname -a):Linux airflow-k8s-scheduler-blabla-xvv5f 5.4.0-1039-azure #41~18.04.1-Ubuntu SMP Mon Jan 18 14:00:01 UTC 2021 x86_64 GNU/Linux
  • Install tools:
  • Others:

What happened: tasks keep failing with sigterm

Task fails due to sigterm before it can finish- [2021-03-08 20:39:14,503] {taskinstance.py:570} DEBUG - Refreshing TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [running]> from DB [2021-03-08 20:39:14,878] {taskinstance.py:605} DEBUG - Refreshed TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [running]> [2021-03-08 20:39:14,880] {base_job.py:219} DEBUG - [heartbeat] [2021-03-08 20:39:21,241] {taskinstance.py:570} DEBUG - Refreshing TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [running]> from DB [2021-03-08 20:39:21,670] {taskinstance.py:605} DEBUG - Refreshed TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [running]> [2021-03-08 20:39:21,672] {base_job.py:219} DEBUG - [heartbeat] [2021-03-08 20:39:27,736] {taskinstance.py:570} DEBUG - Refreshing TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [running]> from DB [2021-03-08 20:39:28,360] {taskinstance.py:605} DEBUG - Refreshed TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [None]> [2021-03-08 20:39:28,362] {local_task_job.py:188} WARNING - State of this instance has been externally set to None. Terminating instance. [2021-03-08 20:39:28,363] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 21 [2021-03-08 20:39:50,119] {taskinstance.py:1239} ERROR - Received SIGTERM. Terminating subprocesses. [2021-03-08 20:39:50,119] {taskinstance.py:570} DEBUG - Refreshing TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [running]> from DB [2021-03-08 20:39:51,328] {taskinstance.py:605} DEBUG - Refreshed TaskInstance <TaskInstance: xxxxx_xxxxxx_xxx 2021-03-08T19:20:00+00:00 [queued]> [2021-03-08 20:39:51,329] {taskinstance.py:1455} ERROR - Task received SIGTERM signal Traceback (most recent call last): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks result = self._execute_task(context, task_copy) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task result = task_copy.execute(context=context) File "/opt/airflow/mnt/dags/xyz/operators/xyz_data_sync_operator.py", line 73, in execute XyzUpsertClass(self.dest_conn_id, self.table, self.schema, src_res, drop_missing_columns=self.drop_missing_columns).execute() File "/opt/airflow/mnt/dags/xyz/operators/xyz_data_sync_operator.py", line 271, in execute cursor.execute( f"UPDATE {self.schema}.{self.table} SET {up_template} WHERE {pk_template}",row) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1241, in signal_handler raise AirflowException("Task received SIGTERM signal") airflow.exceptions.AirflowException: Task received SIGTERM signal [2021-03-08 20:39:51,409] {taskinstance.py:1862} DEBUG - Task Duration set to 1055.564798 [2021-03-08 20:39:51,410] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=xyz_sync task_id=xxxxx, execution_date=20210308T192000, start_date=20210308T202215, end_date=20210308T203951
This also happens with a successful task- [2021-03-08 21:46:01,466] {__init__.py:124} DEBUG - Preparing lineage inlets and outlets [2021-03-08 21:46:01,466] {__init__.py:168} DEBUG - inlets: [], outlets: [] [2021-03-08 21:46:01,467] {logging_mixin.py:104} INFO - {'conf': <airflow.configuration.AirflowConfigParser object at 0x7fc0f7f2a910>, 'dag': <DAG: canary_dag>, 'dag_run': <DagRun canary_dag @ 2021-03-08 20:10:00+00:00: scheduled__2021-03-08T20:10:00+00:00, externally triggered: False>, 'ds_nodash': '20210308', 'execution_date': DateTime(2021, 3, 8, 20, 10, 0, tzinfo=Timezone('+00:00')), 'inlets': [], 'macros': <module 'airflow.macros' from '/home/airflow/.local/lib/python3.7/site-packages/airflow/macros/__init__.py'>, 'next_ds': '2021-03-08', 'next_ds_nodash': '20210308', 'next_execution_date': DateTime(2021, 3, 8, 21, 10, 0, tzinfo=Timezone('UTC')), 'outlets': [], 'params': {}, 'prev_ds': '2021-03-08', 'prev_ds_nodash': '20210308', 'prev_execution_date': DateTime(2021, 3, 8, 19, 10, 0, tzinfo=Timezone('UTC')), 'prev_execution_date_success': <Proxy at 0x7fc0e3d0d780 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fc0e3d2ad40>>, 'prev_start_date_success': <Proxy at 0x7fc0e3d0d7d0 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7fc0e3ff8830>>, 'run_id': 'scheduled__2021-03-08T20:10:00+00:00', 'task': <Task(PythonOperator): print_the_context>, 'task_instance': <TaskInstance: canary_dag.print_the_context 2021-03-08T20:10:00+00:00 [running]>, 'task_instance_key_str': 'canary_dag__print_the_context__20210308', 'test_mode': False, 'ti': <TaskInstance: canary_dag.print_the_context 2021-03-08T20:10:00+00:00 [running]>, 'tomorrow_ds': '2021-03-09', 'tomorrow_ds_nodash': '20210309', 'ts': '2021-03-08T20:10:00+00:00', 'ts_nodash': '20210308T201000', 'ts_nodash_with_tz': '20210308T201000+0000', 'var': {'json': None, 'value': None}, 'yesterday_ds': '2021-03-07', 'yesterday_ds_nodash': '20210307', 'templates_dict': None} [2021-03-08 21:46:01,467] {logging_mixin.py:104} INFO - 2021-03-08 [2021-03-08 21:46:01,467] {python.py:118} INFO - Done. Returned value was: Whatever you return gets printed in the logs [2021-03-08 21:46:21,690] {__init__.py:88} DEBUG - Lineage called with inlets: [], outlets: [] [2021-03-08 21:46:21,691] {taskinstance.py:570} DEBUG - Refreshing TaskInstance <TaskInstance: canary_dag.print_the_context 2021-03-08T20:10:00+00:00 [running]> from DB [2021-03-08 21:46:32,042] {taskinstance.py:605} DEBUG - Refreshed TaskInstance <TaskInstance: canary_dag.print_the_context 2021-03-08T20:10:00+00:00 [running]> [2021-03-08 21:46:32,051] {taskinstance.py:1166} INFO - Marking task as SUCCESS. dag_id=canary_dag, task_id=print_the_context, execution_date=20210308T201000, start_date=20210308T214431, end_date=20210308T214632 [2021-03-08 21:46:32,051] {taskinstance.py:1862} DEBUG - Task Duration set to 120.361368 [2021-03-08 21:46:38,577] {taskinstance.py:570} DEBUG - Refreshing TaskInstance <TaskInstance: canary_dag.print_the_context 2021-03-08T20:10:00+00:00 [running]> from DB [2021-03-08 21:46:51,150] {taskinstance.py:605} DEBUG - Refreshed TaskInstance <TaskInstance: canary_dag.print_the_context 2021-03-08T20:10:00+00:00 [success]> [2021-03-08 21:46:51,152] {local_task_job.py:188} WARNING - State of this instance has been externally set to success. Terminating instance. [2021-03-08 21:46:51,153] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 22 [2021-03-08 21:46:59,668] {taskinstance.py:1239} ERROR - Received SIGTERM. Terminating subprocesses. [2021-03-08 21:46:59,669] {cli_action_loggers.py:84} DEBUG - Calling callbacks: [] [2021-03-08 21:46:59,757] {process_utils.py:66} INFO - Process psutil.Process(pid=22, status='terminated', exitcode=1, started='21:44:41') (22) terminated with exit code 1 [2021-03-08 21:46:59,758] {base_job.py:219} DEBUG - [heartbeat]

What you expected to happen: Tasks marked correctly instead of receiving sigterm

How to reproduce it: mostly occurs when a large number of dags (100+) are running on AKS

Anything else we need to know: DAGs that run longer or have more than 10-12 tasks (that may run long) seem to have a higher probability of this happening to them

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 16
  • Comments: 24 (8 by maintainers)

Most upvoted comments

tl;dr: set schedule_after_task_execution to false by either updating your airflow.cfg (recommended) or setting AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION to False.

We had the same issue and with the help of sentry to look through the whole stack trace, I found out why.

The buggy code block is taskinstance.py#L1182-L1201:

        # Recording SUCCESS
        self.end_date = timezone.utcnow()
        self.log.info(
            'Marking task as SUCCESS. '
            'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
            self.dag_id,
            self.task_id,
            self._date_or_empty('execution_date'),
            self._date_or_empty('start_date'),
            self._date_or_empty('end_date'),
        )
        self.set_duration()
        if not test_mode:
            session.add(Log(self.state, self))
            session.merge(self)

        session.commit()

        if not test_mode:
            self._run_mini_scheduler_on_child_tasks(session)

Looking at the code: after marking a task SUCCESS and commit, if it is not test mode, it will call a potentially expensive function _run_mini_scheduler_on_child_tasks. And local_task_job.py#L179-L199 will detect the task SUCCESS very soon and since the task is not running, it will terminate the process which might be still executing _run_mini_scheduler_on_child_tasks:

        if ti.state == State.RUNNING:
            ...
        elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'):
            self.log.warning(
                "State of this instance has been externally set to %s. " "Terminating instance.", ti.state
            )
            self.task_runner.terminate()

This is proven by the log @saurabhladhe shared (the line numbers diverge a bit because the log was logged by Airflow 2.0.1):

[2021-03-08 21:46:32,051] {taskinstance.py:1166} INFO - Marking task as SUCCESS. dag_id=canary_dag, task_id=print_the_context, execution_date=20210308T201000, start_date=20210308T214431, end_date=20210308T214632

...

[2021-03-08 21:46:51,152] {local_task_job.py:188} WARNING - State of this instance has been externally set to success. Terminating instance.

[2021-03-08 21:46:51,153] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 22

So the mitigation is to make _run_mini_scheduler_on_child_tasks cheap, which is an optimization controlled by schedule_after_task_execution and can be disabled.

    def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):

The proper fix would start from changing

        if not test_mode:
            session.add(Log(self.state, self))
            session.merge(self)

        session.commit()

        if not test_mode:
            self._run_mini_scheduler_on_child_tasks(session)

to

        if not test_mode:
            session.add(Log(self.state, self))
            session.merge(self)
            self._run_mini_scheduler_on_child_tasks(session)

        session.commit()

However, _run_mini_scheduler_on_child_tasks might got an OperationalError and roll back the session completely taskinstance.py#L1248. And we will falsely mark the task failure, though it is only the optimization failure. So I’d leave it to someone who knows the code better to fix it properly. Personally I’d suggest to remove this optimization completely.

@lidalei : awesome, thanks for confirming, do let me know if it does show up again, we will probably try this setting on our next deploy as well

I can confirm it solved our issues.

I am getting this issue in airflow 2.1.3 as well. All retires are getting sigkilled. Error : airflow.exceptions.AirflowException: Task received SIGTERM signal

I believe this is fixed with #16289.

@lidalei : awesome, thanks for confirming, do let me know if it does show up again, we will probably try this setting on our next deploy as well

Hello,

I am facing the same issue: airflow 2.1.3 (tested also with 2.1.2) backend: postgresql executor: LocalExecutor

I have modified the variables killed_task_cleanup_time and schedule_after_task_execution to resp. 100000 and False. I have also installed airflow as non root user and set the default run_as_user to be airflow.

My tasks are getting constantly killed in backfill mode with the traceback:

[2021-09-07 10:21:20,185] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 73011

Honestly, I am a bit discouraged at this point, could you help me please ? tks