airflow: Workers silently crash after memory build up

Apache Airflow version: 2.0.2

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.18.15

Environment:

  • Cloud provider or hardware configuration: AWS, ec2 servers deployed by kops
  • OS (e.g. from /etc/os-release): Ubuntu 20.04
  • Kernel (e.g. uname -a): Linux 5.4.0-1024-aws # 24-Ubuntu
  • Install tools: Dockerfile
  • Others: Custom Dockerfile (not official airflow image from dockerhub) Celery Workers

What happened:

Memory usage builds up on our celery worker pods until they silently crash. Resource usage flat lines and no logs are created by the worker. The process is still running and Celery (verified via ping and flower) thinks the workers are up and running. No tasks are finished by Airflow, the schedulers are running fine and still logging appropriately but the workers are doing nothing. Workers do not accept any tasks and inflight jobs hang. They do not log an error message and the pod is not restarted as the process hasn’t crashed. Our workers do not all crash at the same time, it happens over a couple of hours even if they were all restarted at the same time, so it seems to be related to how many jobs the worker has done/logs/other-non-time event.

I believe this is related to the logs generated by the workers, Airflow appears to be reading in the existing log files to memory. Memory usage drops massively when the log files are deleted and then resume to build up again.

There doesn’t appear to be a definite upper limit of memory that the pod hits when it crashes, but its around the 8 or 10GB mark (there is 14 available to the pods but they dont hit that).

Log size on disk correlates to more memory usage by a worker pod than one with smaller log size on disk.

What you expected to happen:

If the worker has crashed/ceased functioning it should either be able to log an appropriate message if the process is up or crash cleanly and be able to be restarted. Existing log files should not contribute to the memory usage of the airflow process either. Celery should also be able to detect that the worker is no longer functional.

How to reproduce it:

Run an airflow cluster with 40+ DAGs with several hundred tasks in total in an environment that has observable metrics, we use k8s with Prometheus. We have 5x worker pods. Monitor the memory usage of the worker containers/pods over time as well as the size of the airflow task logs. The trend should only increase.

Anything else we need to know:

This problem occurs constantly, after a clean deployment and in multiple environments. The official Airflow docker image contains a log-cleaner so its possible this has been avoided but in general 15 days default would be far too long. Our workers crash between 2 or 3 days. Resorting to an aggressive log cleaning script has mitigated the problem for us but without proper error logs or reasons for the crash it hard to be definite that we are safe.

This is our airflow.cfg logging config, we aren’t doing anything radical just storing in a bucket.

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.

# remote_logging = $ENABLE_REMOTE_LOGGING
# remote_log_conn_id = s3conn
# remote_base_log_folder = $LOGS_S3_BUCKET
# encrypt_s3_logs = False

remote_logging = True
remote_log_conn_id = s3conn
remote_base_log_folder = $AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER
encrypt_s3_logs = False

# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
logging_config_class =

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs

# Name of handler to read task instance logs.
# Default to use file task handler.
# task_log_reader = file.task
task_log_reader = task

Here is a memory usage graph of a crashed worker pod, the flat line is when it is in a crashed state and then restarted. There is also a big cliff on the right of the graph at about 0900 on June 29th where I manually cleaned the log files from the disk.

Crashed airflow worker

The last few log lines before it crashed:

Jun 25, 2021 @ 04:28:01.831 | [2021-06-25 03:28:01,830: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[5f802ffb-d5af-40ae-9e99-5e0501bf7d1c]

 Jun 25, 2021 @ 04:27:36.769 | [2021-06-25 03:27:36,769: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[737d4310-c6ae-450f-889a-ffee53e94d33]

  Jun 25, 2021 @ 04:27:25.565 | [2021-06-25 03:27:25,564: WARNING/ForkPoolWorker-13] Running <TaskInstance: a_task_name 2021-06-25T02:18:00+00:00 [queued]> on host airflow-worker-3.airflow-worker.airflow.svc.cluster.local

  Jun 25, 2021 @ 04:27:25.403 | [2021-06-25 03:27:25,402: INFO/ForkPoolWorker-13] Filling up the DagBag from /usr/local/airflow/dags/a_dag.py

  Jun 25, 2021 @ 04:27:25.337 | [2021-06-25 03:27:25,337: INFO/ForkPoolWorker-13] Executing command in Celery: ['airflow', 'tasks', 'run', 'task_name_redacted', 'task, '2021-06-25T02:18:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/a_dag.py']

  Jun 25, 2021 @ 04:27:25.327 | [2021-06-25 03:27:25,326: INFO/ForkPoolWorker-13] Task airflow.executors.celery_executor.execute_command[4d9ee684-4ae3-41d2-8a00-e8071179a1b1] succeeded in 5.212706514168531s: None

  Jun 25, 2021 @ 04:27:24.980 | [2021-06-25 03:27:24,979: INFO/ForkPoolWorker-13] role_arn is None

   Jun 25, 2021 @ 04:27:24.968 | [2021-06-25 03:27:24,968: INFO/ForkPoolWorker-13] No credentials retrieved from Connection

   Jun 25, 2021 @ 04:27:24.968 | [2021-06-25 03:27:24,968: INFO/ForkPoolWorker-13] Creating session with aws_access_key_id=None region_name=None

   Jun 25, 2021 @ 04:27:24.954 | [2021-06-25 03:27:24,953: INFO/ForkPoolWorker-13] Airflow Connection: aws_conn_id=s3conn

  Jun 25, 2021 @ 04:27:20.610 | [2021-06-25 03:27:20,610: WARNING/ForkPoolWorker-13] Running <TaskInstance: task_name_redacted 2021-06-25T03:10:00+00:00 [queued]> on host airflow-worker-3.airflow-worker.airflow.svc.cluster.local

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 21 (9 by maintainers)

Most upvoted comments

@ephraimbuddy I am trying it this week

If you want us to try 2.1.2 as well we can definitely do that.