airflow: Tasks fail and do not log due to backend DB (dead?)lock
Apache Airflow version: 2.1.1
Kubernetes version (if you are using kubernetes) (use kubectl version
): 1.18
Environment:
- Cloud provider or hardware configuration: AWS hosting a Kube cluster
- OS: Ubuntu 19.10
- Kernel: 4.14.225-169.362.amzn2.x86_64
- Install tools:
- Others: MySQL 8.0.23 on RDS
What happened:
In an unpredictable fashion, some tasks are unable to start. They do not retry and they do not write to the shared log directory, but if I run kubectl logs <worker pod>
while it sits in Error state afterward, I can see:
[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> on host <WORKER POD>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 238, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 96, in _execute
pool=self.pool,
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1023, in check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 623, in refresh_from_db
ti = qry.with_for_update().first()
<SQLALCHEMY TRACE OMITTED FOR BREVITY>
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s
LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Full length log
[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> on host <WORKER POD>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 238, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 96, in _execute
pool=self.pool,
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 1023, in check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 623, in refresh_from_db
ti = qry.with_for_update().first()
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3429, in first
ret = list(self[0:1])
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__
return list(res)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s
LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Afterward, the task is marked as Failed. The issue is transient, and tasks can be manually rerun to try again.
What you expected to happen:
If a lock cannot be obtained, it should exit more gracefully and reschedule.
How to reproduce it:
You can trigger the non-graceful task failure by manually locking the row and then trying to run the task – it should work on any task.
- Connect to the MySQL instance backing Airflow
SET autocommit = OFF;
START TRANSACTION;
- Lock the row
SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = 'foobar'
AND task_instance.task_id = 'my_task'
AND task_instance.execution_date = '2021-07-12 00:00:00.000000'
LIMIT 1 FOR UPDATE;
- Try to run the task via the UI.
Anything else we need to know:
Ideally deadlock doesn’t ever occur and the task executes normally, however the deadlocks are seemingly random and I cannot replicate them. I hypothesized that somehow the scheduler was spinning up two worker pods at the same time, but if that were the case I would see two dead workers in Error
state by performing kubectl get pods
. Deadlock itself seems to occur on <1% of tasks, but it seems that deadlock itself consistently fails the task without retry.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 1
- Comments: 24 (16 by maintainers)
Commits related to this issue
- Avoid deadlock when rescheduling task The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task instances ... — committed to potiuk/airflow by potiuk 2 years ago
- Avoid deadlock when rescheduling task (#21362) The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task... — committed to apache/airflow by potiuk 2 years ago
- Avoid deadlock when rescheduling task (#21362) The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task i... — committed to apache/airflow by potiuk 2 years ago
- Avoid deadlock when rescheduling task (#21362) The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task i... — committed to apache/airflow by potiuk 2 years ago
- Avoid deadlock when rescheduling task (#21362) The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task i... — committed to apache/airflow by potiuk 2 years ago
Yes, tomorrow I’ll try to patch our staging instance and see if it helps. I’ll let you know the result day after.
Yes, actually quite often - we have big number of sensors using “reschedule” mode.
@patrickbrady-xaxis… It’s an interesting one and possibly worth considering to add back. Maybe you can create an issue for that and describe it as this seems like a valid case.
Just for the future reference - when you move to 2.2 you will need to chang it to “run_id” instead of “execution_date”.
Fortunately I haven’t seen this behavior in some time. If it returns I’ll gather what info I can.
@SamWheating @SIvaCoHan @easontm ^^
Pretty please 😃
@SamWheating just checked my config file history – we were actually running just one.