airflow: `on_failure_callback` on DAG level is not executed
Apache Airflow version: 2.0.1 and 2.1.1
Kubernetes version (if you are using kubernetes) (use kubectl version
): 1.20.4
Environment:
- Cloud provider or hardware configuration: AWS EKS
- OS (e.g. from /etc/os-release): Debian GNU/Linux
- Kernel (e.g.
uname -a
): Linux 5.4.117-58.216.amzn2.x86_64
What happened:
Airflow dag failed and on_failure_callback
was not triggered.
Logs were also not shown which may be related to issue https://github.com/apache/airflow/issues/13692.
In the worker pod logs I get the following error messages:
Failed to execute task local variable 'return_code' referenced before assignment.
Failed to execute task [Errno 2] No such file or directory: '/tmp/tmp7l296jgg'.
Task airflow.executors.celery_executor.execute_command[24d3f5c5-bf58-4aad-bf2a-c10b2781a2b2] raised unexpected:
AirflowException('Celery command failed on host:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command
_execute_in_fork(command_to_exec)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())
airflow.exceptions.AirflowException: Celery command failed on host: hotdoc-airflow-worker-0.hotdoc-airflow-worker.hotdoc-airflow-staging.svc.cluster.local
What you expected to happen:
I expected the callback function to be called and executed.
It sounds like the null
hostname issue contributed to this happening but I am not familiar enough with Airflow internals to say for sure. I had a dig through the source code and it looks like some queries are made to list out tasks and other metadata.
https://github.com/apache/airflow/blob/b0f7f91fe29d1314b71c76de0f11d2dbe81c5c4a/airflow/models/dag.py#L822
How to reproduce it:
Create a dag with a function that fails and an error callback function
import sys
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
def on_failure(ctx):
print('hello world')
print(ctx)
def always_fails():
sys.exit(1)
dag = DAG(
dag_id='always_fails',
description='dag that always fails',
schedule_interval=None,
catchup=False,
start_date=datetime(2021,7,12),
on_failure_callback=on_failure
)
task = PythonOperator(task_id='test-error-notifier', python_callable=always_fails, dag=dag)
Run the dag and check if the on_failure_callback
is called.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 2
- Comments: 18 (9 by maintainers)
UPD:
I have been testing the interaction and found that if we are passing the callable to the
on_failure_callback
DAG parameter directly, it doesn’t work. But if we are configuring theon_failure_callback
using default_args dictionary, the callback is getting called.Continue review…
So this too works fine, however, the logs end in :
$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
as stated here: https://github.com/apache/airflow/blob/938510f2ca8dfbf95b332971f4b087653d444d0f/airflow/models/dag.py#L829-L830I tested with your dag. Let me know what you think
The
on_failure_callback
is a task argument and not DAG level argument. The correct dag is: