airflow: Running tasks marked as skipped on DagRun timeout
Apache Airflow version
2.5.2
What happened
Users are experiencing the following:
- A DAG begins to run
- Task(s) go into running state, as expected
- The DagRun times out, marking any currently running task as SKIPPED
- Because tasks are not mark as failed the
on_failure_callback
never gets revoked
Here are some example logs:
[2023-03-22, 16:30:02 PDT] {local_task_job.py:266} WARNING - DagRun timed out after 4:00:02.394287.
[2023-03-22, 16:30:07 PDT] {local_task_job.py:266} WARNING - DagRun timed out after 4:00:07.447373.
[2023-03-22, 16:30:07 PDT] {local_task_job.py:272} WARNING - State of this instance has been externally set to skipped. Terminating instance.
[2023-03-22, 16:30:07 PDT] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 8515. PIDs of all processes in the group: [8515]
What you think should happen instead
Once a DagRun times out, tasks that are currently in RUNNING should be marked as FAILED and downstream tasks should be marked as UPSTREAM_FAILED
How to reproduce
The following DAG will cause this intermittently
import time
import logging
from airflow.decorators import dag, task
from airflow.utils.dates import datetime, timedelta
@task
def task_1():
import random
pulses = random.randint(5, 10)
for i in range(pulses):
logging.info(f"pulsing: pulse...{i}")
time.sleep(4)
@task
def task_2():
import random
pulses = random.randint(10, 20)
for i in range(pulses):
logging.info(f"pulsing: pulse...{i}")
time.sleep(5)
@task
def downstream_finished_task():
logging.info("task finished")
time.sleep(20)
@dag(dag_id="dagrun_interval_test",
schedule_interval="*/5 * * * *",
start_date=datetime(2023, 3, 23),
dagrun_timeout=timedelta(seconds=30),
catchup=False)
def my_dag():
return [task_1(), task_2()] >> downstream_finished_task()
dag = my_dag()
- Running tasks marked skipped
- Downstream left with
no status
See screenshot
Operating System
MacOS
Versions of Apache Airflow Providers
N/A
Deployment
Astronomer
Deployment details
Airflow Version 2.5.2
Anything else
Every time a DagRun times out
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 22 (15 by maintainers)
Can this be re-opened? We also encountered this, and were very surprised that the
on_failure_callback
was not fired, because it only runs on task failure, but the task that was running when the timeout was hit was skipped not failed.First, that behavior seems wrong: if a task is taking too long and hits the
dagrun_timeout
, I would expect that task (as well as the DAG) to fail.Second, @hussein-awala wrote,
But what is the “dag failure callback”? I don’t see a callback like that in these docs: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html
(Do you mean the
sla_miss_callback
? i.e. set the DAG’s SLA to the same as the DAG’sdagrun_timeout
?)A DAG-level failure callback would be very nice to have.
Thank you.
I agree with @pankajkoti
I think we should re-open this issue
I agree with @wolfier . If a task was running, I feel, then it could proceed to
Failed
/Shutdown
instead ofSkipped
. Wouldn’tSkipped
mean that it was never attempted or went toRunning
state at all?Looking at our definitions for the states: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances
Failed
/Shutdown
sounds more reasonable than theSkipped
state.I believe the question is what does it mean when a dagrun times out.
If dagrun timeout means “I need everything to stop including the task instances” then forcing task termination is appropriate. I don’t agree with setting the ending state as skipped if a task was in the running state since the task in the middle of execution.
Looking at @RNHTTR’s PR, I see the logic is to mark all tasks that are unfinished to skipped.
Instead, I think it should be more refined.
The scheduled and queued state should be set to skipped IF that was their first attempt (checking try number). Though this may not work for sensors that are rescheduled and are in the middle of being scheduled / queued.
The rest of the states should be set to failed because they imply the task instance was attempted. Tasks that are attempted should be failed.
It is worth noting that the PR was written and released for an Airflow version (see 2.0.0) where the active daguns are determined by task instances instead of the dagrun state, as pointed out by issues/13407, for Airflow 2.0.0. In Airflow 2.6.x, active dagruns are determined by the state of the dagrun and not the task instances states. This means that it does not matter which state the running task ends up as, skipped, failed, or even running.
Referring back to the question I posed earlier, depending on what it means when the dagrun times out, the state of a running task should reflect that definition.
Should we scope this issue to adding
on_skipped_callback
? @erdos2n is that a suitable solution for your use case?Since the task didn’t fail, I don’t see the need to run the failure callback in every stopped task, the dag failure callback is enough to handle this case, where we can check if the run failed due to timeout, and select skipped tasks in metadata to do what we need to do. WDYT?
@eladkal Maybe they should not be set to fail, but they should also not be set to skipped. The task was not skipped, technically speaking.
An issue that has come up is that a user wants an alert for a specific task failure, so they don’t want to set the
on_failure_callback
on the dag level. That specific task gets marked skipped on adagrun_timeout
and theon_failure_callback
isn’t triggered.I believe it’s worth discussing either marking these tasks that are stopped mid-run as FAILED or introducing a new state into the task instance.
I’m curious if
SHUTDOWN
makes more sense in this instance. It seems to fit what is occurring more than skipped.https://github.com/apache/airflow/blob/main/airflow/utils/state.py#L42
Thoughts?