airflow: execution_timeout not enforced, task hangs up
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Version: 2.5.1 Run env: MWAA on AWS
Summary: Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed, not allowing any other task to be placed for this dag; and so its execution_timeout
is not enforced.
In my experience, it only happens on tasks that consume from Kafka using library confluent_kafka
. The execution_timeout
is enforced in other tasks.
Dag definition code:
# Dag Info
default_args = {
"retries": 3,
"on_failure_callback": on_failure_callback,
"sla": timedelta(hours=2),
"execution_timeout": timedelta(hours=4),
}
@dag(SERVICE_NAME,
default_args=default_args,
schedule_interval="*/5 * * * *",
start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
catchup=True,
tags=['critical', 'dumper', 'kafka'],
max_active_runs=1)
def process_records():
ingest_from_kafka_and_save()
The ingest_from_kafka_and_save()
contains code that consumes from Kafka, providing a callback function to the consumption (which I suspect may have something to do with the problem, since it happens asynchronously).
It’s hard to reproduce since it is temperamental and happens every once in a while. Audit Log does not show anything special - just seems the hang indefinitely. Consumption code itself works fine otherwise and it has been running for months in this and other dags that use it - but they also show the same behaviour.
What you think should happen instead
The execution_timeout
should be enforced and the task should be killed so a new one could be placed.
How to reproduce
It is hard to reproduce, since it happens very unfrequently.
- Create a dag with the definition in the “What happened” section
- Add a function with a basic kafka consumption from a Kafka topic that consumes until end of topic partitions (or a max number of messages)
- Leave it running and wait for the problem to happen
Operating System
MWAA on AWS
Versions of Apache Airflow Providers
–constraint “https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt” apache-airflow-providers-amazon apache-airflow-providers-snowflake==4.0.2 apache-airflow-providers-mysql==4.0.0 apache-airflow-providers-slack confluent-kafka==2.1.0
Deployment
Amazon (AWS) MWAA
Deployment details
Medium sized cluster 2.5.1 version, latest update applied 2 weeks ago.
Anything else
Unclear what triggers the error - but whatever the error, the task should be killed to enforce the execution_timeout
.
Seems like an internal thread management issue.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: open
- Created 8 months ago
- Comments: 18 (14 by maintainers)
Commits related to this issue
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 Partially addresses #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to hterik/airflow by hterik 8 months ago
- Change AirflowTaskTimeout to inherit BaseException (#35653) Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to apache/airflow by hterik 4 months ago
- Change AirflowTaskTimeout to inherit BaseException (#35653) Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 (cherry picked f... — committed to apache/airflow by hterik 4 months ago
- Change AirflowTaskTimeout to inherit BaseException (#35653) Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 (cherry picked f... — committed to apache/airflow by hterik 4 months ago
- Change AirflowTaskTimeout to inherit BaseException (#35653) Code that normally catches Exception should not implicitly ignore interrupts from AirflowTaskTimout. Fixes #35644 #35474 — committed to abhishekbhakat/my_airflow by hterik 4 months ago
Seems we know about the nature and have a plan how to resolve it, so let me pick this issue then.