airflow: Multi Schedulers don't synchronize task instance state when using Triggerer
Apache Airflow version
2.2.1
What happened
I find some Deferrable Operators are marked failed unexpectedly. When I clear these tasks, some of them will success, but some of them will fail as before.Even I clear the same task, sometime it is marked success, sometime it is marked failed. I find this log in scheduler log
ERROR[0m - Executor reports task instance <TaskInstance: x [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?[0m
[[34m2021-12-14 10:22:20,208[0m] {[34mtaskinstance.py:[0m1706} ERROR[0m - Executor reports task instance <TaskInstance: x [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?[0m
[[34m2021-12-14 10:22:20,215[0m] {[34mtaskinstance.py:[0m1281} INFO[0m - Marking task as FAILED. dag_id=x, task_id=x, execution_date=20211212T170000, start_date=20211214T102218, end_date=20211214T102220[0m
What you expected to happen
When I clear a Deferrable Operator task, task final state must be the same instead of sometime it is marked success, sometime it is marked failed.
How to reproduce
Create a Deferrable Operator and this operator’s run function will yield immediately.Like this:
class ExampleTrigger(BaseTrigger):
async def run(self):
while datetime.now() > datetime.strptime("1900-01-01", "%Y-%m-%d"):
await asyncio.sleep(1)
yield TriggerEvent('success')
In my case, I just query a task’s state from another airflow database, and the execution_date(old version airflow) is prev_day [would yield immediately]
This task will be mark failed by scheduler randomly.
Operating System
Ubuntu 16.04
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
- One Triggerer process.
- Two Schedulers.
- One WebServer.
Anything else
I check the code of:
- scheduler
- trigger
- taskinstance
- timestamp of log
and figure out what happend in this condition:
Assume TA is a deferrable task.
- Scheduler loop 1 change TA task state: SCHEDULED -> QUEUED -> RUNNING -> DEFERRED, but this scheduler loop do not quit.
- Meanwhile Trigger change TA task state: DEFERRED -> SCHEDULED
- Scheduler loop 2 start, find the TA task is SCHEDULED and then change TA task state: SCHEDULED -> QUEUED
- Scheduler loop 1 start to run the _process_executor_events function to check Worker-Process return status, and fetch a WRONG state QUEUED from database, finally run into this code, and mark the task as failed.
- Scheduler loop 2 enqueue the airflow task run xxx command and then worker also fetch a failed task to execute.
In my opinion, I think there are two reasons cause this problem:
- the query in _executable_task_instances_to_queued function:
can not guarantee one task is executed only in one scheduler loop.
- The Trigger-Process can modify this task state to SCHEDULED asynchronously.
How to solve this problem
- Add a new column in task_instance table named is_still_processing default is False
- While query SCHEDULED task, add a filter like
TI.is_still_processing == False
at - Before enqueue task instance to executor queue, change is_still_processing to True in database.
- At the end of _executable_task_instances_to_queued loop:
change this task’s is_still_processing to False in database.
If I can guarantee one Deferrable Operator task is executed only in one scheduler loop and quit successfully,I can solve this problem. I think only Deferrable Operator task which can yield immediately have this problem.
or Just change task instance state to SUCCESS instead of SCHEDULED, but this Deferrable task’s next_method has no chance to be executed.
Any Suggestions? I need to solve this problem ASAP.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 23 (13 by maintainers)
@jskalasariya If you also use the deferrable task, you can fix this via https://github.com/apache/airflow/issues/20308#issuecomment-1005435631 temporarily. You can just try to change these codes and make some tests to checkout whether this problem is disappear. At this moment, In my product env: airflow version:
airflow components:
operating system:
machine resource:
database:
works as expected. By the way, if you want to use the deferrable operator’s defer_timeout parameter,you may have some dead-lock problems:
models/trigger.py[func submit_* clean_unused]
will hold row lock ANDjobs/scheduler_job.py[func check_trigger_timeouts])
will hold another row lock. I fix this dead-lock by addingwith_row_locks
,order by
and split query and update into two steps: