airflow: AirflowException: Celery command failed on host: [...]

Apache Airflow version: 2.1.0

Kubernetes version (if you are using kubernetes) (use kubectl version): N/A (ECS)

Environment: Production

  • Cloud provider or hardware configuration: AWS ECS Fargate
  • OS (e.g. from /etc/os-release): Ubuntu
  • Kernel (e.g. uname -a):
  • Install tools: poetry
  • Others:

What happened:

Since Airflow 2.x we’ve seen Celery-related failures with exceptions like this being the most prominent: AirflowException: Celery command failed on host: ip-10-0-10-110.us-west-2.compute.internal.

What you expected to happen:

No Celery command failures during normal operation.

How to reproduce it:

Deploy an Airflow cluster with a Celery + Redis executor to an ECS Fargate cluster.

Anything else we need to know:

How often does this problem occur? Once? Every time etc?

~30 times per hour.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 5
  • Comments: 16 (9 by maintainers)

Most upvoted comments

Update: My particular issue is fixed in https://github.com/apache/airflow/pull/16860

How can I fix this? I’m using Airflow 2.1.2 on CentOS7 in a federated topology and all jobs failing.

Root cause is fact TASK args includes wrong --subdir valuer, passing value of Scheduler’s ${DAGS_HOME} instead of Workers ${DAG_HOME}. Scheduler and Worker are different host/domains so ${DAG_HOME} cannot be identical. In dagbag.py using settings.DAGS_FOLDER would fix the problem I think:

airflow/models/dagbag.py:122 dag_folder = dag_folder or settings.DAGS_FOLDER

Error

Aug 05 18:49:16 myfqdn airflow[4905]: [2021-08-05 18:49:16,782: INFO/ForkPoolWorker-8] Executing co
mmand in Celery: ['airflow', 'tasks', 'run', 'touch_file_mytest', 'runme', '2021-08-05T18:49:15.368167+00:00', '--local', '--poo
l', 'default_pool', '--subdir', '/home/_airflowservice@mydomain/dags/mytest/example.py']
Aug 05 18:49:16 myfqdn airflow[4905]: [2021-08-05 18:49:16,898: DEBUG/ForkPoolWorker-8] Calling callbacks: [<function default_action_log at 0x7f7cabef4c80>]
Aug 05 18:49:17 myfqdn airflow[4905]: [2021-08-05 18:49:17,143: DEBUG/ForkPoolWorker-8] Setting up DB connection pool (PID 5128)
Aug 05 18:49:17 myfqdn airflow[4905]: [2021-08-05 18:49:17,146: DEBUG/ForkPoolWorker-8] settings.prepare_engine_args(): Using NullPool
Aug 05 18:49:17 myfqdn airflow[4905]: [2021-08-05 18:49:17,148: INFO/ForkPoolWorker-8] Filling up the DagBag from /home/_airflowservice@mydomain/dags/mytest/example.py
Aug 05 18:49:17 myfqdn airflow[4905]: [2021-08-05 18:49:17,148: DEBUG/ForkPoolWorker-8] Calling callbacks: []
Aug 05 18:49:17 myfqdn airflow[4905]: [2021-08-05 18:49:17,148: ERROR/ForkPoolWorker-8] Failed to execute task dag_id could not be found: touch_file_mytest. Either the dag did not exist or it failed to parse..

CAUSE: Wrong --subdir value

$ airflow tasks test touch_file_mytest runme 2021-08-06T09:03:16.244034+00:00 --subdir /home/_airflowservice@schedulerdomain/dags/mytest/example.py

[2021-08-06 09:16:45,824] {dagbag.py:501} INFO - Filling up the DagBag from /home/_airflowservice@schedulerdomain/dags/mytest/example.py
Traceback (most recent call last):
  File "/home/_airflowservice@localdomain/.local/bin/airflow", line 8, in <module>  sys.exit(main())
  File "/home/_airflowservice@localdomain/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main args.func(args)
  File "/home/_airflowservice@localdomain/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command  return func(*args, **kwargs)
  File "/home/_airflowservice@localdomain/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper  return f(*args, **kwargs)
  File "/home/_airflowservice@localdomain/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 380, in task_test
    dag = dag or get_dag(args.subdir, args.dag_id)
  File "/home/_airflowservice@localdomain/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 193, in get_dag
    'parse.'.format(dag_id) airflow.exceptions.AirflowException: dag_id could not be found: touch_file_mytest. Either the dag did not exist or it failed to parse.

FIX: DROP INCORRECT --subdir arg

$ airflow tasks test touch_file_mytest runme 2021-08-06T09:03:16.244034+00:00

[2021-08-06 09:15:34,234] {dagbag.py:501} INFO - Filling up the DagBag from /home/_airflowservice@localdomain/dags
[2021-08-06 09:15:34,434] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: touch_file_mytest.runme 2021-08-06T09:03:16.244034+00:00 [failed]>
[2021-08-06 09:15:34,531] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: touch_file_mytest.runme 2021-08-06T09:03:16.244034+00:00 [failed]>
[2021-08-06 09:15:34,531] {taskinstance.py:1087} INFO ---------------------------------------------------------------------------------
[2021-08-06 09:15:34,531] {taskinstance.py:1088} INFO - Starting attempt 2 of 1
[2021-08-06 09:15:34,532] {taskinstance.py:1089} INFO ---------------------------------------------------------------------------------
[2021-08-06 09:15:34,532] {taskinstance.py:1107} INFO - Executing <Task(BashOperator): runme> on 2021-08-06T09:03:16.244034+00:00
[2021-08-06 09:15:35,461] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=touch_file_mytest
AIRFLOW_CTX_TASK_ID=runme
AIRFLOW_CTX_EXECUTION_DATE=2021-08-06T09:03:16.244034+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-06T09:03:16.244034+00:00
[2021-08-06 09:15:35,462] {subprocess.py:52} INFO - Tmp dir root location:
 /tmp
[2021-08-06 09:15:35,463] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'touch /tmp/airflow_mytest']
[2021-08-06 09:15:35,470] {subprocess.py:74} INFO - Output:
[2021-08-06 09:15:35,473] {subprocess.py:82} INFO - Command exited with return code 0
[2021-08-06 09:15:35,781] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=touch_file_mytest, task_id=runme, execution_date=20210806T090316, start_date=, end_date=20210806T091535

We have the same (or similar) setup, using Airflow on AWS ECS Fargate containers, and I had a very similar ambiguous issue, with the webserver container just exiting/crashing, without really providing any useful info in the logs. Specifically, it started happening after upgrading from Airflow v1.13 (where this setup worked fine) to Airflow v2.1. And the logs seemed to suggest that it was related to the webserver’s gunicorn worker processes (somehow).

I discovered that the problem was directly tied to the resources allocated to the container(s), along with the Airflow configuration values for these worker processes. In our case, the webserver was on a relatively small container and ECS task (1GB memory, 0.25 vCPU), because we try to run things pretty lean (you know, $$). It’s now been upped to 0.5 vCPU, but the same problem persisted, until I found this formula from someone else on the internet. The # desired workers should correlate to the CPU cores of the instance/container: 2 * num_cpu_cores + 1 (Airflow’s default is 4).

Since this Fargate task/container isn’t even allocated a single CPU, I updated this value AIRFLOW__WEBSERVER__WORKERS=2, and now the webserver container is stable! While this may not be the exact same problem, it might be worth increasing the resources you have allocated to the container, just to see if it’s related, or maybe see if there are similar Airflow configuration settings that might be the culprit.

I’m dealing with this too (but mine is 100% of the time on one system only).

After researching it looks like this is an old issue that seems to keep popping up after being “fixed”. These resources haven’t fixed my problem but there are some ideas that might be relevant to your setup:

I suspect you are also seeing Airflow JIRA issue If a task crashes, hostname not committed to db so logs not in the UI which is a symptom of the Celery command failed on host? I am.

Do you have one worker failing consistenly among other workers suceesding or are your failures on a worker which is also succeeding sometimes?