airflow: CronTriggerTimetable lost one task occasionally

Discussed in https://github.com/apache/airflow/discussions/27398

<div type='discussions-op-text'>

Originally posted by AndrewTsao October 25, 2022

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

My Airflow version 2.4.0. I use CronTriggerTimetable in my DAG. I find it lost one task occasionally. I inspected schedule log,

It should be run at 2022-10-24T23:10:00.000+0800. BUT not run.

[2022-10-24T23:08:59.661+0800] {processor.py:768} INFO - DAG(s) dict_keys(['AFlow.ajob']) retrieved from /home/bob/airflow/dags/dag.py
[2022-10-24T23:08:59.685+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:08:59.684+0800] {dag.py:2573} INFO - Sync 1 DAGs
[2022-10-24T23:08:59.710+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:08:59.710+0800] {dag.py:3324} INFO - Setting next_dagrun for AFlow.ajob to 2022-10-24T15:10:00+00:00, run_after=2022-10-24T15:10:00+00:00
[2022-10-24T23:08:59.734+0800] {processor.py:178} INFO - Processing /home/bob/airflow/dags/dag.py took 0.351 seconds
[2022-10-24T23:09:29.855+0800] {processor.py:156} INFO - Started process (PID=16329) to work on /home/bob/airflow/dags/dag.py
[2022-10-24T23:09:29.856+0800] {processor.py:758} INFO - Processing file /home/bob/airflow/dags/dag.py for tasks to queue
[2022-10-24T23:09:29.857+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:09:29.857+0800] {dagbag.py:525} INFO - Filling up the DagBag from /home/bob/airflow/dags/dag.py
[2022-10-24T23:09:29.946+0800] {processor.py:768} INFO - DAG(s) dict_keys(['AFlow.ajob']) retrieved from /home/bob/airflow/dags/dag.py
[2022-10-24T23:09:29.969+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:09:29.968+0800] {dag.py:2573} INFO - Sync 1 DAGs
[2022-10-24T23:09:29.994+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:09:29.994+0800] {dag.py:3324} INFO - Setting next_dagrun for AFlow.ajob to 2022-10-24T15:10:00+00:00, run_after=2022-10-24T15:10:00+00:00
[2022-10-24T23:09:30.178+0800] {processor.py:178} INFO - Processing /home/bob/airflow/dags/dag.py took 0.327 seconds
[2022-10-24T23:10:00.368+0800] {processor.py:156} INFO - Started process (PID=16562) to work on /home/bob/airflow/dags/dag.py
[2022-10-24T23:10:00.369+0800] {processor.py:758} INFO - Processing file /home/bob/airflow/dags/dag.py for tasks to queue
[2022-10-24T23:10:00.370+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:10:00.370+0800] {dagbag.py:525} INFO - Filling up the DagBag from /home/bob/airflow/dags/dag.py
[2022-10-24T23:10:00.628+0800] {processor.py:768} INFO - DAG(s) dict_keys(['AFlow.ajob']) retrieved from /home/bob/airflow/dags/dag.py
[2022-10-24T23:10:00.648+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:10:00.647+0800] {dag.py:2573} INFO - Sync 1 DAGs
[2022-10-24T23:10:00.670+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:10:00.670+0800] {dag.py:3324} INFO - Setting next_dagrun for AFlow.ajob to 2022-10-25T15:10:00+00:00, run_after=2022-10-25T15:10:00+00:00
[2022-10-24T23:10:00.693+0800] {processor.py:178} INFO - Processing /home/bob/airflow/dags/dag.py took 0.331 seconds

What you think should happen instead

I think it lost schedule infomation when reload dag file and reschedule dag.

How to reproduce

  1. Change airflow.cfg ag_dir_list_interval = 10.
  2. Add DAG as follow,
from airflow import DAG
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.decorators import task
import pendulum as pl

with DAG(
    dag_id = 'test-cron-2',
    # schedule_interval='10 23 * * 2-3',
    timetable=CronTriggerTimetable('* * * * *', timezone=pl.tz.get_local_timezone()),  # At 01:00 on Wednesday
    start_date=pl.today().add(days=-21),
    tags=['example'],
) as dag1:
    @task
    def test():
        print("run...")

    test()

20221025132642

In scheduler log, We can find on that point, scheduler is reloading dag file.

20221025133158

Operating System

Centos 7.9

Versions of Apache Airflow Providers

airflow-code-editor                      7.0.0
apache-airflow                           2.4.0
apache-airflow-providers-celery          3.0.0
apache-airflow-providers-common-sql      1.1.0
apache-airflow-providers-ftp             3.1.0
apache-airflow-providers-http            4.0.0
apache-airflow-providers-imap            3.0.0
apache-airflow-providers-microsoft-psrp  2.0.0
apache-airflow-providers-microsoft-winrm 3.0.0
apache-airflow-providers-mysql           3.2.0
apache-airflow-providers-redis           3.0.0
apache-airflow-providers-samba           4.0.0
apache-airflow-providers-sftp            4.0.0
apache-airflow-providers-sqlite          3.2.0
apache-airflow-providers-ssh             3.1.0

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

</div>

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 40 (23 by maintainers)

Most upvoted comments

For what it’s worth, prior to 2.4 I wrote my own timetable that acts almost exactly like CronTriggerTimetable, and I experienced a similar issue, so I’ll mention my findings here.

In my case the problem was that there was a race condition between the DagProcessor and the Scheduler when calling Timetable.next_dagrun_info. Depending on when each thread evaluated the timetable, the DagProcessor could end up “bumping” the DAG’s next_dagrun_create_after timestamp in the DB before the Scheduler had a chance to schedule the current interval. Then when the Scheduler would do it’s query to determine which DagRuns to schedule, it wouldn’t find the run that got “skipped”. In my case, I believe the bug was a call to DateTime.utcnow() that I had to align to the previous interval boundary.

Also worth noting that my bug only happened with catchup=False, but that could have just been a quirk with my implementation.

We have observed the same issue with a UTC timezone, so I don’t think it is the issue, e.g.

timetable=CronTriggerTimetable("*/15 * * * *", timezone="UTC")

Are there any tips on how to deal with this reliably, or should we just expect CronTriggerTimetable to silently fail every now and then? In that case, maybe it would be good to update the docs here to warn about the inconsistent behaviour?

I spent some time to take a closer look at the implementation. The problem with

I think the reason is because CronDataIntervalTimetable runs a last DAG run if it missed even when catchup=False

is that CronDataIntervalTimetable actually does not do that! The reason it seems to be more resillient is that catchup in that time table relies on the data interval start, not the trigger time. So say

  • You have a cron
  • Last run on 3am
  • Current time 4:05

For CronDataIntervalTimetable, since the last run covered 2–3am, the next run should cover 3–4am, which can be achieved without catchup (the further next covers 4–5am and is not due yet). Everything is fine. But for CronTriggerTimetable, the last run covered 3am, but the 4am should be skipped since that’s already in the past.

I think a reasonable logic would be to change the catchup=False logic to cover one schedule before the current time instead, so in the above scenario, the timetable would make the next run cover 4am, and only skip the 4am run if the current time is pas 5am.

I can prepare a PR if that sounds reasonable (or anyone can, it’s just one line in airflow.timetables and fixing the corresponding test case).

I’m closing this as it appears to have been fixed in 2.7.1 by #33404 but there was a typo in the description that allowed this Issue to stay open.

Hey @BohdanSemonov , if the fix works, could you please submit a PR? I think this issue is quite critical, and I also observe such behavior from CronTriggerTimetable

The bug occurs due to the fact that seconds and microseconds are not reset here: start_time_candidates = [self._align_to_next(DateTime.utcnow())] and a condition if self._get_prev(next_time) != current: in the method _align_to_next chooses next_time instead current, when the current DateTime matches with the next_dagrun before the DagProcessor will process the DAG. Accordingly, the date switching occurs earlier than the DAG launching.

The bug fix is only one line: start_time_candidates = [self._align_to_next(DateTime.utcnow().replace(second=0, microsecond=0))] I’ve already tested the fix - it works properly.

This is the logic to calculate the next run for catchup=False, for reference:

start_time_candidates = [self._align_to_next(DateTime.utcnow())]
if last_automated_data_interval is not None:
    start_time_candidates.append(self._get_next(last_automated_data_interval.end))
if restriction.earliest is not None:
    start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates)

The two if blocks should be fine since they match the logic for catchup=True. So I wonder if the problem is from the max call—maybe under some edge cases that max would wrongly select the next(utcnow) result (first line) instead of the actual schedule it should use—when the scheduler took too much time to schedule the previous run, perhaps?

Also worth noting that CronDataIntervalTimetable (the old one) uses slightly different logic to account for the current time, and that timetable seems to be bug-free.

cc: @uranusjr ? WDYT this looks plausibke I guess ^^

No. Not until it gets investigated and fixed.

Also observed in #28293

One commonality is all timezones are non-utc. Not sure if that’s indicative of something, but I think it’s worth noting.