airflow: Scheduler crashes with psycopg2.errors.DeadlockDetected exception

Apache Airflow version

2.2.5 (latest released)

What happened

Customer has a dag that generates around 2500 tasks dynamically using a task group. While running the dag, a subset of the tasks (~1000) run successfully with no issue and (~1500) of the tasks are getting “skipped”, and the dag fails. The same DAG runs successfully in Airflow v2.1.3 with same Airflow configuration.

While investigating the Airflow processes, We found that both the scheduler got restarted with below error during the DAG execution.

[2022-04-27 20:42:44,347] {scheduler_job.py:742} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1256, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
    cursor.executemany(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 1646244 waits for ShareLock on transaction 3915993452; blocked by process 1640692.
Process 1640692 waits for ShareLock on transaction 3915992745; blocked by process 1646244.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (189873,4) in relation "task_instance"

This issue seems to be related to #19957

What you think should happen instead

This issue was observed while running huge number of concurrent task created dynamically by a DAG. Some of the tasks are getting skipped due to restart of scheduler with Deadlock exception.

How to reproduce

DAG file:

from propmix_listings_details import BUCKET, ZIPS_FOLDER, CITIES_ZIP_COL_NAME, DETAILS_DEV_LIMIT, DETAILS_RETRY, DETAILS_CONCURRENCY, get_api_token, get_values, process_listing_ids_based_zip
from airflow.utils.task_group import TaskGroup
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

date = '{{ execution_date }}'
email_to = ['example@airflow.com']
# Using a DAG context manager, you don't have to specify the dag property of each task

state = 'Maha'
with DAG('listings_details_generator_{0}'.format(state),
        start_date=datetime(2021, 11, 18),
        schedule_interval=None,
        max_active_runs=1,
        concurrency=DETAILS_CONCURRENCY,
        dagrun_timeout=timedelta(minutes=10),
        catchup=False  # enable if you don't want historical dag runs to run
        ) as dag:
    t0 = DummyOperator(task_id='start')

    with TaskGroup(group_id='group_1') as tg1:
        token = get_api_token()
        zip_list = get_values(BUCKET, ZIPS_FOLDER+state, CITIES_ZIP_COL_NAME)
        for zip in zip_list[0:DETAILS_DEV_LIMIT]:
            details_operator = PythonOperator(
                task_id='details_{0}_{1}'.format(state, zip),  # task id is generated dynamically
                pool='pm_details_pool',
                python_callable=process_listing_ids_based_zip,
                task_concurrency=40,
                retries=3,
                retry_delay=timedelta(seconds=10),
                op_kwargs={'zip': zip, 'date': date, 'token':token, 'state':state}
            )
            
    t0 >> tg1

Operating System

kubernetes cluster running on GCP linux (amd64)

Versions of Apache Airflow Providers

pip freeze | grep apache-airflow-providers

apache-airflow-providers-amazon==1!3.2.0 apache-airflow-providers-cncf-kubernetes==1!3.0.0 apache-airflow-providers-elasticsearch==1!2.2.0 apache-airflow-providers-ftp==1!2.1.2 apache-airflow-providers-google==1!6.7.0 apache-airflow-providers-http==1!2.1.2 apache-airflow-providers-imap==1!2.2.3 apache-airflow-providers-microsoft-azure==1!3.7.2 apache-airflow-providers-mysql==1!2.2.3 apache-airflow-providers-postgres==1!4.1.0 apache-airflow-providers-redis==1!2.0.4 apache-airflow-providers-slack==1!4.2.3 apache-airflow-providers-snowflake==2.6.0 apache-airflow-providers-sqlite==1!2.1.3 apache-airflow-providers-ssh==1!2.4.3

Deployment

Astronomer

Deployment details

Airflow v2.2.5-2 Scheduler count: 2 Scheduler resources: 20AU (2CPU and 7.5GB) Executor used: Celery Worker count : 2 Worker resources: 24AU (2.4 CPU and 9GB) Termination grace period : 2mins

Anything else

This issue happens in all the dag runs. Some of the tasks are getting skipped and some are getting succeeded and the scheduler fails with the Deadlock exception error.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 51 (43 by maintainers)

Commits related to this issue

Most upvoted comments

Thanks for REALLY detailed investigation @dstaple.

I finally had some time to take a look at this and I think your assesment was very correct.

However the solution you proposed is not good, because I think we DO want to run “SELECT FOR UPDATE” on DagRun table. The whole scheduling is based on the fact that DagRun row gets locked and no changes are happening to DagRun and any TaskInstances of that DagRun while Scheduler processes those task instances. And since local_task_run potentially changes the state of the task instance it runs (that’s why it locks it for update), if the whole task DagRun is currently “being processed” by any of the schedulers. we should hold-off with running the task before scheduler finishes this particular DagRun processing and releases the lock.

And in this case the “local_task_run” actually locks the DagRun table too (though I am not entirely sure why this is one thing that I do not understand completely - see below). So it does what it should but with one very little caveat - it locks the TaskInstance and DagRun in REVERSE ORDER comparing to what Scheduler does. This is actually the root cause of ALL Deadlocks (at least in Postgres, MySQL has it’s own fair share of other kinds of deadlocks) - non-consistent order. The deadlock appears when two threads want two (or more) resources and gets lock on them in reverse order. This is actually the only reason for any kind of deadlocks and your investigation was really nicely showing what’s going on.

The solution to that is simple - since we are going to get the DagRun lock in a moment anyway in “refresh_from_db”, we should simply get the lock on DagRun table FIRST. This should fix the problem as we will then perform lock grabbing in the same sequence in scheduler and task_run - > first DagRun, then TaskInstance. This is what my proposed #25266 does.

The only thing I do not know is WHY the TaskInstance.refresh_from_db actually does the JOIN query:

SELECT FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id ... FOR UPDATE

The original query in the code looks like this:

        qry = session.query(TaskInstance).filter(
            TaskInstance.dag_id == self.dag_id,
            TaskInstance.task_id == self.task_id,
            TaskInstance.run_id == self.run_id,
            TaskInstance.map_index == self.map_index,
        )

        if lock_for_update:
            for attempt in run_with_db_retries(logger=self.log):
                with attempt:
                    ti: Optional[TaskInstance] = qry.with_for_update().first()

And there is no obvious reason why the last line joins the dag_run table?

I hope someone else in this thread might shed some light on it, I have a suspicion, that SQLALchemy will add the join in case there is a ForeignKey with ONCASCADE with the dag_id (which we have) - but I could not find any reference or documentation that would point to such behaviour.

@RNHTTR - since you mentioned you can reproduce the issue - maybe you could apply my fix and see if it solves the problem (there is a bit of leap of faith with this change).

@ldacey @whitleykeith @argibbs @eitanme -> I spoke with some enlightened people 😃 (yeah talking about you @ashb and @bbovenzi ) -> and after the talk I have a hypothesis, that this is the new Grid view doing auto-refresh for a long running DAG.

There was a fix by @ashb https://github.com/apache/airflow/pull/24284 that is going to be released in 2.3.3 which decreases significantly a number of queries that are generated by the Grid view refresh. It’s a huge improvement and might impact both - load on the DB and possibly memory usage of the webserver - especially if there are almost continuously running dags and a number of people leaves the browser open with “auto-refresh” on the Grid View.

Is there a way some of you could test the hypothesis and see if there might be a correlation (requires a bit of coordination what your users do).

(BTW. If that’s it then Ash’s fix is coming in 2.3.3).

It seems that after having passed the use_row_level_locking to False, the problem has disappear on my side (with 3 schedulers). Maybe the doc should be updated because:

I would advise against doing this while running multiple schedulers – if you do then it is entirely possible that Airflow will not correctly respect configured concurrency limits for DAGs/Tasks/Pools. Edit: oh, or it will crash

I’m on it!

@dstaple - would be great if you check. I think we can merge it regardlless (it’s super easy to revert) - so there is no problem with some later checking. I also was not sure if the DELETE issue is the same. It could be (and I have the scenario in my head):

DELETE DagRun with CASCADE on TI - first creates lock on the DagRun an only THEN an the TaskInstamce - very similarly to what Scheduler does.

And in this case the fix above should also help so @RNHTTR I’d appreciate checking it 😃

The behavior was not changed after the above issue was filed, but the following warning was added to the SQLAlchemy documentation:

Ahhhhh. That Would indeed explain it. I tink then that my solution is actually the right approach 😃

Very useful Thanks. I will take a look at it shortly.

I am also encountering this issue. I collected some details about both queries involved in the deadlock, hopefully this is helpful.

Deployment details:

  • Airflow 2.2.5
  • KubernetesExecutor
  • A single Airflow scheduler is running.
  • Row level locking is enabled.
  • Scheduler parsing_processes = 5
  • Scheduler resources: 8 cores, 5 GB RAM
  • Database resources: 12 cores, 8 GB RAM (Postgres 11.3)
  • The problem only appears at scale (50-150 DAGs, several of which have hundreds of tasks).
  • The problem is not easily reproducible but is happening daily.

In the deadlocks there is an UPDATE statement deadlocking with a SELECT … FOR UPDATE.

Based on stack traces visible in the scheduler logs, the UPDATE originates from the main scheduler loop here: https://github.com/apache/airflow/blob/2.2.5/airflow/models/dagrun.py#L901-L910

Based on the database logs, the SELECT statement has the form:

SELECT task_instance.try_number AS task_instance_try_number, ...
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
        WHERE task_instance.dag_id = 'my_dag_id' AND task_instance.task_id = 'my_task_id' AND task_instance.run_id = 'sanitized_run_id_1'
         LIMIT 1 FOR UPDATE

Searching the Airflow source code, the query that looks most similar to the SELECT from the database error is in TaskInstance.refresh_from_db(): https://github.com/apache/airflow/blob/2.2.5/airflow/models/taskinstance.py#L714-L736

Example scheduler logs showing the origins of the UPDATE statement:

[2022-07-06 18:54:29,456] {{scheduler_job.py:753}} INFO - Exited execute loop
Traceback (most recent call last):
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (48513,18) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 246, in run
    self._execute()
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 726, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 807, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 890, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1147, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/dagrun.py", line 909, in schedule_tis
    .update({TI.state: State.SCHEDULED}, synchronize_session=False)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (48513,18) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'sanitized_dag_id_1', 'run_id_1': 'sanitized_run_id_1', 'task_id_1': 'sanitized_task_id_1'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

Example Postgres logs showing a complete SELECT … FOR UPDATE statement:

2022-07-06 18:54:25.816 UTC [100639] ERROR:  deadlock detected
2022-07-06 18:54:25.816 UTC [100639] DETAIL:  Process 100639 waits for ShareLock on transaction 527390039; blocked by process 99711.
    Process 99711 waits for ShareLock on transaction 527390130; blocked by process 100639.
    Process 100639: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_insta
    Process 99711: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.run_id = 'sanitized_run_id_2' AND task_instance.task_id IN ('sanitized_task_id_2', 'sanitized_task_id_3')
2022-07-06 18:54:25.816 UTC [100639] HINT:  See server log for query details.
2022-07-06 18:54:25.816 UTC [100639] CONTEXT:  while locking tuple (725,169) in relation "dag_run"
2022-07-06 18:54:25.816 UTC [100639] STATEMENT:  SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, [dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
    FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
    WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.task_id = 'sanitized_task_id_3' AND task_instance.run_id = 'sanitized_run_id_2'
     LIMIT 1 FOR UPDATE

Unfortunately we are not able to repro this on a test instance so I have not been able to try on newer Airflow versions, but based on the discussion on this thread it sounds like the issue is present until at least 2.3.2.

Are these logs from where you have set use_row_level_locking to False?

Indeed, it is ! My bad then for having set this param, thinking that postgres would allow it. Thanks for the help, and sorry if I wasted your time 🙏