airflow: Multi Schedulers don't synchronize task instance state when using Triggerer

Apache Airflow version

2.2.1

What happened

I find some Deferrable Operators are marked failed unexpectedly. When I clear these tasks, some of them will success, but some of them will fail as before.Even I clear the same task, sometime it is marked success, sometime it is marked failed. I find this log in scheduler log

ERROR - Executor reports task instance <TaskInstance: x [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
[2021-12-14 10:22:20,208] {taskinstance.py:1706} ERROR - Executor reports task instance <TaskInstance: x [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
[2021-12-14 10:22:20,215] {taskinstance.py:1281} INFO - Marking task as FAILED. dag_id=x, task_id=x, execution_date=20211212T170000, start_date=20211214T102218, end_date=20211214T102220

What you expected to happen

When I clear a Deferrable Operator task, task final state must be the same instead of sometime it is marked success, sometime it is marked failed.

How to reproduce

Create a Deferrable Operator and this operator’s run function will yield immediately.Like this:

class ExampleTrigger(BaseTrigger):
    async def run(self):
        while datetime.now() > datetime.strptime("1900-01-01", "%Y-%m-%d"):
            await asyncio.sleep(1)
        yield TriggerEvent('success')

In my case, I just query a task’s state from another airflow database, and the execution_date(old version airflow) is prev_day [would yield immediately]

This task will be mark failed by scheduler randomly.

Operating System

Ubuntu 16.04

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

  1. One Triggerer process.
  2. Two Schedulers.
  3. One WebServer.

Anything else

I check the code of:

  1. scheduler
  2. trigger
  3. taskinstance
  4. timestamp of log

and figure out what happend in this condition:

Assume TA is a deferrable task.

  1. Scheduler loop 1 change TA task state: SCHEDULED -> QUEUED -> RUNNING -> DEFERRED, but this scheduler loop do not quit. image
  2. Meanwhile Trigger change TA task state: DEFERRED -> SCHEDULED
  3. Scheduler loop 2 start, find the TA task is SCHEDULED and then change TA task state: SCHEDULED -> QUEUED
  4. Scheduler loop 1 start to run the _process_executor_events function to check Worker-Process return status, and fetch a WRONG state QUEUED from database, finally run into this code, and mark the task as failed. image
  5. Scheduler loop 2 enqueue the airflow task run xxx command and then worker also fetch a failed task to execute. image

In my opinion, I think there are two reasons cause this problem:

  1. the query in _executable_task_instances_to_queued function: image can not guarantee one task is executed only in one scheduler loop.
  2. The Trigger-Process can modify this task state to SCHEDULED asynchronously.

How to solve this problem

  1. Add a new column in task_instance table named is_still_processing default is False
  2. While query SCHEDULED task, add a filter like TI.is_still_processing == False at image
  3. Before enqueue task instance to executor queue, change is_still_processing to True in database.
  4. At the end of _executable_task_instances_to_queued loop: image change this task’s is_still_processing to False in database.

If I can guarantee one Deferrable Operator task is executed only in one scheduler loop and quit successfully,I can solve this problem. I think only Deferrable Operator task which can yield immediately have this problem.

or Just change task instance state to SUCCESS instead of SCHEDULED, but this Deferrable task’s next_method has no chance to be executed.

Any Suggestions? I need to solve this problem ASAP.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 23 (13 by maintainers)

Most upvoted comments

@jskalasariya If you also use the deferrable task, you can fix this via https://github.com/apache/airflow/issues/20308#issuecomment-1005435631 temporarily. You can just try to change these codes and make some tests to checkout whether this problem is disappear. At this moment, In my product env: airflow version:

2.2.0

airflow components:

  1. four schedulers
  2. three triggerers
  3. two web servers

operating system:

Ubuntu16.04

machine resource:

64G memory 6Core(2slots * 3Core) NUMA Disabled

database:

percona-server: 8.0.17 in k8s cluster as Deployment

works as expected. By the way, if you want to use the deferrable operator’s defer_timeout parameter,you may have some dead-lock problems: models/trigger.py[func submit_* clean_unused] will hold row lock AND jobs/scheduler_job.py[func check_trigger_timeouts]) will hold another row lock. I fix this dead-lock by adding with_row_locks, order by and split query and update into two steps: image