airflow: Long running tasks being killed with CeleryKubernetesExecutor
Discussed in https://github.com/apache/airflow/discussions/24462
UPDATED with logs from @karakanb after 2.3.2 migration
<div type='discussions-op-text'>Originally posted by karakanb May 26, 2022
Apache Airflow version
2.3.2 (oiriginaly 2.2.5)
What happened
Hi there, this one is a bit of a weird one to reproduce, but I’ll try my best to giive as much information as possible.
General Information
First of all, here’s some list information:
- Airflow version: v2.2.5
- Deployed on k8s with the user-community helm chart:
- 2 scheduler pods
- 5 worker pods
- 1 flower pod
- 2 web pods
- Using managed Redis from DigitalOcean
- Executor: CeleryKubernetesExecutor
- Deployed on DigitalOcean Managed Kubernetes
- Uses DigitalOcean Managed Postgres
- I am using the official Airflow Docker images
- There are no spikes in the DB metrics, Kubernetes cluster, or anything else that I could find.
These are my relevant env variables:
AIRFLOW__CELERY__WORKER_AUTOSCALE=8,4
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT=64800
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags/repo/
AIRFLOW__CORE__EXECUTOR=CeleryKubernetesExecutor
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=15
AIRFLOW__CORE__PARALLELISM=30
AIRFLOW__CORE__SECURE_MODE=True
AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH=repo
AIRFLOW__KUBERNETES__DELETE_WORKER_PODS=True
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM=airflow-v2-logs
AIRFLOW__KUBERNETES__NAMESPACE=airflow
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE=/opt/airflow/pod_templates/pod_template.yaml
AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE=20
AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=/opt/airflow/logs/scheduler
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=120
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=False
AIRFLOW__WEBSERVER__RBAC=True
AIRFLOW__WEBSERVER__WORKER_CLASS=gevent
AIRFLOW_HOME=/opt/airflow
AIRFLOW_INSTALLATION_METHOD=apache-airflow
AIRFLOW_PIP_VERSION=21.3.1
AIRFLOW_USER_HOME_DIR=/home/airflow
AIRFLOW_VERSION=2.2.5
The issue that I will be describing here started happening a week ago after I have moved from KubernetesExecutor to CeleryKubernetesExecutor, so it must have something to do with it.
Problem Statement
I have some DAGs that have some long-running tasks: be it sensors that take hours to complete, or large SQL queries that take a very long time. Given that the sensors are waiting hours in many cases, we use reschedule
for the sensors; however, the long running SQL queries cannot be executed that way unfortunately, therefore the tasks stay open.
Here’s a sample log to show how the logs look when a query is executed successfully:
[2022-05-26, 05:25:41 ] {cursor.py:705} INFO - query: [SELECT * FROM users WHERE...]
[2022-05-26, 05:57:22 ] {cursor.py:729} INFO - query execution done
Here’s a sample log for a task that started at 2022-05-26, 05:25:37
, that actually demonstrates the problem where the task runs for a longer time:
[2022-05-26, 05:57:22 ] {cursor.py:705} INFO - query: [----- CREATE OR REPLACE TABLE table1 AS WITH users AS ( ...]
[2022-05-26, 06:59:41 ] {taskinstance.py:1033} INFO - Dependencies not met for <TaskInstance: mycompany.task1 scheduled__2022-05-25T01:00:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state.
[2022-05-26, 06:59:41 ] {taskinstance.py:1033} INFO - Dependencies not met for <TaskInstance: mycompany.task1 scheduled__2022-05-25T01:00:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
[2022-05-26, 06:59:41 ] {local_task_job.py:99} INFO - Task is not able to be run
Apparently, when the task runs for a longer time, it is being killed. It is not just happening with a single instance time, but with many others, therefore it is not an operator-specific issue. There are no timeouts, and no additional configuration defined on the individual tasks.
Some additional interesting observations:
- For all those tasks that are killed, I am seeing the same log:
Task is not able to be run
- For these tasks, the retry counts are going above the
retries
being set for the DAG as well.- The DAG has 3 retries configured, and there’ll be usually 4 instances running.
- This smells like a race condition somewhere, but not sure.
Unfortunately, I don’t have the scheduler logs, but I am on the lookout for them.
As I have mentioned, this has only started happening after I switched to CeleryKubernetesExecutor
. I’d love to investigate this further, and it is causing a lot of pain now so I might need to get back to Kubernetes Executor, but I really don’t want to given that KubernetesExecutor is much slower than CeleryKubernetesExecutor
due to git clone
happening on every task.
Let me know if I can provide additional information, I am trying to find more patterns and details around this so that we can fix this issue, so any leads around what should be looked at is much appreciated.
More info from the discussion:
@pingzh I don’t have the zombies_killed metric in my /metrics endpoint, not sure.
@MattiaGallegati thanks a lot for the information. I haven’t observed the issue for the past 3 days after the upgrade, I’ll keep observing and report here.
I am seeing the issue much rarer than before, but it still happens after the upgrade. Here’s one that has failed:
*** Reading local file: /opt/airflow/logs/dag_id=company/run_id=scheduled__2022-06-13T01:00:00+00:00/task_id=my_task_id/attempt=1.log
[2022-06-14, 04:10:00 ] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [queued]>
[2022-06-14, 04:10:00 ] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [queued]>
[2022-06-14, 04:10:00 ] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-06-14, 04:10:00 ] {taskinstance.py:1357} INFO - Starting attempt 1 of 4
[2022-06-14, 04:10:00 ] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-06-14, 04:10:00 ] {taskinstance.py:1377} INFO - Executing <Task(SnowflakeOperator): my_task_id> on 2022-06-13 01:00:00+00:00
[2022-06-14, 04:10:00 ] {standard_task_runner.py:52} INFO - Started process 982 to run task
[2022-06-14, 04:10:00 ] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'company', 'my_task_id', 'scheduled__2022-06-13T01:00:00+00:00', '--job-id', '182516', '--raw', '--subdir', 'DAGS_FOLDER/dag_v3.py', '--cfg-path', '/tmp/tmpckf3rysy', '--error-file', '/tmp/tmpzqc4fc0m']
[2022-06-14, 04:10:00 ] {standard_task_runner.py:80} INFO - Job 182516: Subtask my_task_id
[2022-06-14, 04:10:00 ] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:525: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
[2022-06-14, 04:10:01 ] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:525: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
[2022-06-14, 04:10:01 ] {task_command.py:370} INFO - Running <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]> on host airflow-v2-worker-5.airflow-v2-worker.airflow.svc.cluster.local
[2022-06-14, 04:10:01 ] {taskinstance.py:1569} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=company
AIRFLOW_CTX_TASK_ID=my_task_id
AIRFLOW_CTX_EXECUTION_DATE=2022-06-13T01:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-06-13T01:00:00+00:00
[2022-06-14, 04:10:01 ] {snowflake.py:118} INFO - Executing: <some sql statement here>
[2022-06-14, 04:10:01 ] {base.py:68} INFO - Using connection ID 'my-connection-id' for task execution.
[2022-06-14, 04:10:01 ] {connection.py:257} INFO - Snowflake Connector for Python Version: 2.7.8, Python Version: 3.8.13, Platform: Linux-5.10.0-0.bpo.9-amd64-x86_64-with-glibc2.2.5
[2022-06-14, 04:10:01 ] {connection.py:876} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-06-14, 04:10:01 ] {connection.py:894} INFO - Setting use_openssl_only mode to False
[2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement here>]
[2022-06-14, 04:10:02 ] {cursor.py:734} INFO - query execution done
[2022-06-14, 04:10:02 ] {snowflake.py:324} INFO - Running statement: <some sql statement here>
[2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement here>]
[2022-06-14, 04:10:02 ] {cursor.py:734} INFO - query execution done
[2022-06-14, 04:10:02 ] {snowflake.py:334} INFO - Statement execution info - {'status': 'Statement executed successfully.'}
[2022-06-14, 04:10:02 ] {snowflake.py:338} INFO - Rows affected: 1
[2022-06-14, 04:10:02 ] {snowflake.py:339} INFO - Snowflake query id: <some-uuid-here>
[2022-06-14, 04:10:02 ] {snowflake.py:324} INFO - Running statement: <some sql statement here>
[2022-06-14, 04:10:02 ] {cursor.py:710} INFO - query: [<some sql statement here>]
[2022-06-14, 04:10:03 ] {cursor.py:734} INFO - query execution done
[2022-06-14, 04:10:03 ] {snowflake.py:334} INFO - Statement execution info - {'status': 'some_table already exists, statement succeeded.'}
[2022-06-14, 04:10:03 ] {snowflake.py:338} INFO - Rows affected: 1
[2022-06-14, 04:10:03 ] {snowflake.py:339} INFO - Snowflake query id: <some-uuid-here>
[2022-06-14, 04:10:03 ] {snowflake.py:324} INFO - Running statement: <some sql statement here>
[2022-06-14, 04:10:03 ] {cursor.py:710} INFO - query: [<some sql statement here>]
[2022-06-14, 04:10:08 ] {cursor.py:734} INFO - query execution done
[2022-06-14, 04:10:08 ] {snowflake.py:334} INFO - Statement execution info - {'number of rows deleted': 562}
[2022-06-14, 04:10:08 ] {snowflake.py:338} INFO - Rows affected: 562
[2022-06-14, 04:10:08 ] {snowflake.py:339} INFO - Snowflake query id: <some-uuid-here>
[2022-06-14, 04:10:08 ] {snowflake.py:324} INFO - Running statement: <some sql statement here>
[2022-06-14, 04:10:08 ] {cursor.py:710} INFO - query: [<some sql statement here>]
[2022-06-14, 04:16:29 ] {taskinstance.py:1149} INFO - Dependencies not met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
[2022-06-14, 04:16:29 ] {taskinstance.py:1149} INFO - Dependencies not met for <TaskInstance: company.my_task_id scheduled__2022-06-13T01:00:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state.
[2022-06-14, 04:16:29 ] {local_task_job.py:101} INFO - Task is not able to be **run**
What you think should happen instead
The tasks should keep running until they are finished.
How to reproduce
I really don’t know, sorry. I have tried my best to explain the situation above.
Operating System
Debian GNU/Linux 10 (buster)
Versions of Apache Airflow Providers
(Updated from the original post)
apache-airflow==2.3.2
apache-airflow-providers-amazon==3.4.0
apache-airflow-providers-celery==2.1.4
apache-airflow-providers-cncf-kubernetes==4.0.2
apache-airflow-providers-docker==2.7.0
apache-airflow-providers-elasticsearch==3.0.3
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-google==7.0.0
apache-airflow-providers-grpc==2.0.4
apache-airflow-providers-hashicorp==2.2.0
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-microsoft-azure==3.9.0
apache-airflow-providers-microsoft-mssql==2.0.1
apache-airflow-providers-mysql==2.2.3
apache-airflow-providers-odbc==2.0.4
apache-airflow-providers-postgres==4.1.0
apache-airflow-providers-redis==2.0.4
apache-airflow-providers-sendgrid==2.0.4
apache-airflow-providers-sftp==2.6.0
apache-airflow-providers-slack==4.2.3
apache-airflow-providers-snowflake==2.7.0
apache-airflow-providers-sqlite==2.1.3
apache-airflow-providers-ssh==2.3.0
google-cloud-orchestration-airflow==1.3.1
Deployment
Other 3rd-party Helm chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 15 (12 by maintainers)
Seen this issue with CeleryKubernetesExecutor when restarting the Celery worker pod. After restart, some long running task instances can be set as
up_for_retry
and rerun successfully while other task instances failed and its dagrun failed too. Due to this i cannot restart worker pods without draining the worker pods first.This should be reproduceable and i’m investigating on it.
cc: @karakanb