airflow: Tasks fail and do not log due to backend DB (dead?)lock

Apache Airflow version: 2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.18

Environment:

  • Cloud provider or hardware configuration: AWS hosting a Kube cluster
  • OS: Ubuntu 19.10
  • Kernel: 4.14.225-169.362.amzn2.x86_64
  • Install tools:
  • Others: MySQL 8.0.23 on RDS

What happened:

In an unpredictable fashion, some tasks are unable to start. They do not retry and they do not write to the shared log directory, but if I run kubectl logs <worker pod> while it sits in Error state afterward, I can see:

[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> on host <WORKER POD>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 238, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 96, in _execute
    pool=self.pool,
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1023, in check_and_change_state_before_execution
    self.refresh_from_db(session=session, lock_for_update=True)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 623, in refresh_from_db
    ti = qry.with_for_update().first()
 
<SQLALCHEMY TRACE OMITTED FOR BREVITY>

  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s
 LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Full length log
[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> on host <WORKER POD>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 238, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 96, in _execute
    pool=self.pool,
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1023, in check_and_change_state_before_execution
    self.refresh_from_db(session=session, lock_for_update=True)
  File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 623, in refresh_from_db
    ti = qry.with_for_update().first()
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3429, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__
    return list(res)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s
 LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

Afterward, the task is marked as Failed. The issue is transient, and tasks can be manually rerun to try again.

What you expected to happen:

If a lock cannot be obtained, it should exit more gracefully and reschedule.

How to reproduce it:

You can trigger the non-graceful task failure by manually locking the row and then trying to run the task – it should work on any task.

  1. Connect to the MySQL instance backing Airflow
  2. SET autocommit = OFF;
  3. START TRANSACTION;
  4. Lock the row
SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = 'foobar'
	AND task_instance.task_id = 'my_task'
	AND task_instance.execution_date = '2021-07-12 00:00:00.000000'
 LIMIT 1 FOR UPDATE;
  1. Try to run the task via the UI.

Anything else we need to know:

Ideally deadlock doesn’t ever occur and the task executes normally, however the deadlocks are seemingly random and I cannot replicate them. I hypothesized that somehow the scheduler was spinning up two worker pods at the same time, but if that were the case I would see two dead workers in Error state by performing kubectl get pods. Deadlock itself seems to occur on <1% of tasks, but it seems that deadlock itself consistently fails the task without retry.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 24 (16 by maintainers)

Commits related to this issue

Most upvoted comments

Question @tulanowski - > would it be possible that you patch your Airflow instance with the fix (it’s really simple) and see if it helps? It just passed all the tests in the latest fixup, so it should be safe to copy.

Yes, tomorrow I’ll try to patch our staging instance and see if it helps. I’ll let you know the result day after.

Also - just to confirm - do you happen to have some tasks that throw AirflowRescheduleException occasionally ?

Yes, actually quite often - we have big number of sensors using “reschedule” mode.

@patrickbrady-xaxis… It’s an interesting one and possibly worth considering to add back. Maybe you can create an issue for that and describe it as this seems like a valid case.

Just for the future reference - when you move to 2.2 you will need to chang it to “run_id” instead of “execution_date”.

Fortunately I haven’t seen this behavior in some time. If it returns I’ll gather what info I can.

@SamWheating @SIvaCoHan @easontm ^^

Pretty please 😃

@SamWheating just checked my config file history – we were actually running just one.