airflow: `on_failure_callback` on DAG level is not executed

Apache Airflow version: 2.0.1 and 2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.20.4

Environment:

  • Cloud provider or hardware configuration: AWS EKS
  • OS (e.g. from /etc/os-release): Debian GNU/Linux
  • Kernel (e.g. uname -a): Linux 5.4.117-58.216.amzn2.x86_64

What happened:

Airflow dag failed and on_failure_callback was not triggered. Logs were also not shown which may be related to issue https://github.com/apache/airflow/issues/13692.

In the worker pod logs I get the following error messages:

Failed to execute task local variable 'return_code' referenced before assignment.
Failed to execute task [Errno 2] No such file or directory: '/tmp/tmp7l296jgg'.                                                                                  
Task airflow.executors.celery_executor.execute_command[24d3f5c5-bf58-4aad-bf2a-c10b2781a2b2] raised unexpected: 
AirflowException('Celery command failed on host: 
Traceback (most recent call last):                                                                                                                                                                                  
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task                                                                                                              
    R = retval = fun(*args, **kwargs)                                                                                                                                                                               
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__                                                                                                      
    return self.run(*args, **kwargs)                                                                                                                                                                                
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command                                                                                         
    _execute_in_fork(command_to_exec)                                                                                                                                                                               
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork                                                                                        
    raise AirflowException('Celery command failed on host: ' + get_hostname())                                                                                                                                      
airflow.exceptions.AirflowException: Celery command failed on host: hotdoc-airflow-worker-0.hotdoc-airflow-worker.hotdoc-airflow-staging.svc.cluster.local

What you expected to happen:

I expected the callback function to be called and executed. It sounds like the null hostname issue contributed to this happening but I am not familiar enough with Airflow internals to say for sure. I had a dig through the source code and it looks like some queries are made to list out tasks and other metadata. https://github.com/apache/airflow/blob/b0f7f91fe29d1314b71c76de0f11d2dbe81c5c4a/airflow/models/dag.py#L822

How to reproduce it:

Create a dag with a function that fails and an error callback function

import sys
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

def on_failure(ctx):
    print('hello world')
    print(ctx)

def always_fails():
    sys.exit(1)

dag = DAG(
    dag_id='always_fails',
    description='dag that always fails',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021,7,12),
    on_failure_callback=on_failure
)

task = PythonOperator(task_id='test-error-notifier', python_callable=always_fails, dag=dag)

Run the dag and check if the on_failure_callback is called.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 2
  • Comments: 18 (9 by maintainers)

Most upvoted comments

UPD:

I have been testing the interaction and found that if we are passing the callable to the on_failure_callback DAG parameter directly, it doesn’t work. But if we are configuring the on_failure_callback using default_args dictionary, the callback is getting called.

Continue review…

So this too works fine, however, the logs end in : $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log as stated here: https://github.com/apache/airflow/blob/938510f2ca8dfbf95b332971f4b087653d444d0f/airflow/models/dag.py#L829-L830

I tested with your dag. Let me know what you think

The on_failure_callback is a task argument and not DAG level argument. The correct dag is:

import sys
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

def on_failure(ctx):
    print('hello world')
    print(ctx)

def always_fails():
    sys.exit(1)

dag = DAG(
    dag_id='always_fails',
    description='dag that always fails',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021,7,12),
)

task = PythonOperator(task_id='test-error-notifier', python_callable=always_fails,on_failure_callback=on_failure, dag=dag)