airflow: CronTriggerTimetable lost one task occasionally
Discussed in https://github.com/apache/airflow/discussions/27398
<div type='discussions-op-text'>Originally posted by AndrewTsao October 25, 2022
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
My Airflow version 2.4.0. I use CronTriggerTimetable in my DAG. I find it lost one task occasionally. I inspected schedule log,
It should be run at 2022-10-24T23:10:00.000+0800. BUT not run.
[2022-10-24T23:08:59.661+0800] {processor.py:768} INFO - DAG(s) dict_keys(['AFlow.ajob']) retrieved from /home/bob/airflow/dags/dag.py
[2022-10-24T23:08:59.685+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:08:59.684+0800] {dag.py:2573} INFO - Sync 1 DAGs
[2022-10-24T23:08:59.710+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:08:59.710+0800] {dag.py:3324} INFO - Setting next_dagrun for AFlow.ajob to 2022-10-24T15:10:00+00:00, run_after=2022-10-24T15:10:00+00:00
[2022-10-24T23:08:59.734+0800] {processor.py:178} INFO - Processing /home/bob/airflow/dags/dag.py took 0.351 seconds
[2022-10-24T23:09:29.855+0800] {processor.py:156} INFO - Started process (PID=16329) to work on /home/bob/airflow/dags/dag.py
[2022-10-24T23:09:29.856+0800] {processor.py:758} INFO - Processing file /home/bob/airflow/dags/dag.py for tasks to queue
[2022-10-24T23:09:29.857+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:09:29.857+0800] {dagbag.py:525} INFO - Filling up the DagBag from /home/bob/airflow/dags/dag.py
[2022-10-24T23:09:29.946+0800] {processor.py:768} INFO - DAG(s) dict_keys(['AFlow.ajob']) retrieved from /home/bob/airflow/dags/dag.py
[2022-10-24T23:09:29.969+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:09:29.968+0800] {dag.py:2573} INFO - Sync 1 DAGs
[2022-10-24T23:09:29.994+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:09:29.994+0800] {dag.py:3324} INFO - Setting next_dagrun for AFlow.ajob to 2022-10-24T15:10:00+00:00, run_after=2022-10-24T15:10:00+00:00
[2022-10-24T23:09:30.178+0800] {processor.py:178} INFO - Processing /home/bob/airflow/dags/dag.py took 0.327 seconds
[2022-10-24T23:10:00.368+0800] {processor.py:156} INFO - Started process (PID=16562) to work on /home/bob/airflow/dags/dag.py
[2022-10-24T23:10:00.369+0800] {processor.py:758} INFO - Processing file /home/bob/airflow/dags/dag.py for tasks to queue
[2022-10-24T23:10:00.370+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:10:00.370+0800] {dagbag.py:525} INFO - Filling up the DagBag from /home/bob/airflow/dags/dag.py
[2022-10-24T23:10:00.628+0800] {processor.py:768} INFO - DAG(s) dict_keys(['AFlow.ajob']) retrieved from /home/bob/airflow/dags/dag.py
[2022-10-24T23:10:00.648+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:10:00.647+0800] {dag.py:2573} INFO - Sync 1 DAGs
[2022-10-24T23:10:00.670+0800] {logging_mixin.py:117} INFO - [2022-10-24T23:10:00.670+0800] {dag.py:3324} INFO - Setting next_dagrun for AFlow.ajob to 2022-10-25T15:10:00+00:00, run_after=2022-10-25T15:10:00+00:00
[2022-10-24T23:10:00.693+0800] {processor.py:178} INFO - Processing /home/bob/airflow/dags/dag.py took 0.331 seconds
What you think should happen instead
I think it lost schedule infomation when reload dag file and reschedule dag.
How to reproduce
- Change airflow.cfg
ag_dir_list_interval = 10
. - Add DAG as follow,
from airflow import DAG
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.decorators import task
import pendulum as pl
with DAG(
dag_id = 'test-cron-2',
# schedule_interval='10 23 * * 2-3',
timetable=CronTriggerTimetable('* * * * *', timezone=pl.tz.get_local_timezone()), # At 01:00 on Wednesday
start_date=pl.today().add(days=-21),
tags=['example'],
) as dag1:
@task
def test():
print("run...")
test()
In scheduler log, We can find on that point, scheduler is reloading dag file.
Operating System
Centos 7.9
Versions of Apache Airflow Providers
airflow-code-editor 7.0.0
apache-airflow 2.4.0
apache-airflow-providers-celery 3.0.0
apache-airflow-providers-common-sql 1.1.0
apache-airflow-providers-ftp 3.1.0
apache-airflow-providers-http 4.0.0
apache-airflow-providers-imap 3.0.0
apache-airflow-providers-microsoft-psrp 2.0.0
apache-airflow-providers-microsoft-winrm 3.0.0
apache-airflow-providers-mysql 3.2.0
apache-airflow-providers-redis 3.0.0
apache-airflow-providers-samba 4.0.0
apache-airflow-providers-sftp 4.0.0
apache-airflow-providers-sqlite 3.2.0
apache-airflow-providers-ssh 3.1.0
Deployment
Virtualenv installation
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 40 (23 by maintainers)
For what it’s worth, prior to 2.4 I wrote my own timetable that acts almost exactly like
CronTriggerTimetable
, and I experienced a similar issue, so I’ll mention my findings here.In my case the problem was that there was a race condition between the DagProcessor and the Scheduler when calling
Timetable.next_dagrun_info
. Depending on when each thread evaluated the timetable, the DagProcessor could end up “bumping” the DAG’snext_dagrun_create_after
timestamp in the DB before the Scheduler had a chance to schedule the current interval. Then when the Scheduler would do it’s query to determine which DagRuns to schedule, it wouldn’t find the run that got “skipped”. In my case, I believe the bug was a call toDateTime.utcnow()
that I had to align to the previous interval boundary.Also worth noting that my bug only happened with
catchup=False
, but that could have just been a quirk with my implementation.We have observed the same issue with a UTC timezone, so I don’t think it is the issue, e.g.
Are there any tips on how to deal with this reliably, or should we just expect
CronTriggerTimetable
to silently fail every now and then? In that case, maybe it would be good to update the docs here to warn about the inconsistent behaviour?I spent some time to take a closer look at the implementation. The problem with
is that CronDataIntervalTimetable actually does not do that! The reason it seems to be more resillient is that catchup in that time table relies on the data interval start, not the trigger time. So say
For CronDataIntervalTimetable, since the last run covered 2–3am, the next run should cover 3–4am, which can be achieved without catchup (the further next covers 4–5am and is not due yet). Everything is fine. But for CronTriggerTimetable, the last run covered 3am, but the 4am should be skipped since that’s already in the past.
I think a reasonable logic would be to change the
catchup=False
logic to cover one schedule before the current time instead, so in the above scenario, the timetable would make the next run cover 4am, and only skip the 4am run if the current time is pas 5am.I can prepare a PR if that sounds reasonable (or anyone can, it’s just one line in
airflow.timetables
and fixing the corresponding test case).I’m closing this as it appears to have been fixed in 2.7.1 by #33404 but there was a typo in the description that allowed this Issue to stay open.
Hey @BohdanSemonov , if the fix works, could you please submit a PR? I think this issue is quite critical, and I also observe such behavior from CronTriggerTimetable
The bug occurs due to the fact that seconds and microseconds are not reset here:
start_time_candidates = [self._align_to_next(DateTime.utcnow())]
and a conditionif self._get_prev(next_time) != current:
in the method_align_to_next
choosesnext_time
insteadcurrent
, when the current DateTime matches with the next_dagrun before the DagProcessor will process the DAG. Accordingly, the date switching occurs earlier than the DAG launching.The bug fix is only one line:
start_time_candidates = [self._align_to_next(DateTime.utcnow().replace(second=0, microsecond=0))]
I’ve already tested the fix - it works properly.This is the logic to calculate the next run for
catchup=False
, for reference:The two
if
blocks should be fine since they match the logic forcatchup=True
. So I wonder if the problem is from themax
call—maybe under some edge cases thatmax
would wrongly select thenext(utcnow)
result (first line) instead of the actual schedule it should use—when the scheduler took too much time to schedule the previous run, perhaps?Also worth noting that
CronDataIntervalTimetable
(the old one) uses slightly different logic to account for the current time, and that timetable seems to be bug-free.cc: @uranusjr ? WDYT this looks plausibke I guess ^^
No. Not until it gets investigated and fixed.
Also observed in #28293
One commonality is all timezones are non-utc. Not sure if that’s indicative of something, but I think it’s worth noting.