airflow: execution_timeout not enforced, task hangs up

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Version: 2.5.1 Run env: MWAA on AWS

Summary: Once every ~500-1000 runs approximately, the task hangs up infinitely until manually killed, not allowing any other task to be placed for this dag; and so its execution_timeout is not enforced. In my experience, it only happens on tasks that consume from Kafka using library confluent_kafka. The execution_timeout is enforced in other tasks.

Dag definition code:

# Dag Info
default_args = {
    "retries": 3,
    "on_failure_callback": on_failure_callback,
    "sla": timedelta(hours=2),
    "execution_timeout": timedelta(hours=4),
}


@dag(SERVICE_NAME,
     default_args=default_args,
     schedule_interval="*/5 * * * *",
     start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
     catchup=True,
     tags=['critical', 'dumper', 'kafka'],
     max_active_runs=1)
def process_records():
    ingest_from_kafka_and_save()

The ingest_from_kafka_and_save() contains code that consumes from Kafka, providing a callback function to the consumption (which I suspect may have something to do with the problem, since it happens asynchronously).

It’s hard to reproduce since it is temperamental and happens every once in a while. Audit Log does not show anything special - just seems the hang indefinitely. Consumption code itself works fine otherwise and it has been running for months in this and other dags that use it - but they also show the same behaviour.

What you think should happen instead

The execution_timeout should be enforced and the task should be killed so a new one could be placed.

How to reproduce

It is hard to reproduce, since it happens very unfrequently.

  • Create a dag with the definition in the “What happened” section
  • Add a function with a basic kafka consumption from a Kafka topic that consumes until end of topic partitions (or a max number of messages)
  • Leave it running and wait for the problem to happen

Operating System

MWAA on AWS

Versions of Apache Airflow Providers

–constraint “https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt” apache-airflow-providers-amazon apache-airflow-providers-snowflake==4.0.2 apache-airflow-providers-mysql==4.0.0 apache-airflow-providers-slack confluent-kafka==2.1.0

Deployment

Amazon (AWS) MWAA

Deployment details

Medium sized cluster 2.5.1 version, latest update applied 2 weeks ago.

Anything else

Unclear what triggers the error - but whatever the error, the task should be killed to enforce the execution_timeout. Seems like an internal thread management issue.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: open
  • Created 8 months ago
  • Comments: 18 (14 by maintainers)

Commits related to this issue

Most upvoted comments

It happend few times in my life that process did not die after SIGKILL and that was at times when the whole OS/installation got heavily broken)

giphy

Seems we know about the nature and have a plan how to resolve it, so let me pick this issue then.