airflow: Variable.get inside of a custom Timetable breaks the Scheduler
Apache Airflow version
2.3.4
What happened
If you try to use Variable.get
from inside of a custom Timetable, the Scheduler will break with errors like:
scheduler | [2022-09-20 10:19:36,104] {variable.py:269} ERROR - Unable to retrieve variable from secrets backend (MetastoreBackend). Checking subsequent secrets backend.
scheduler | Traceback (most recent call last):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 265, in get_variable_from_secrets
scheduler | var_val = secrets_backend.get_variable(key=key)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
scheduler | return func(*args, session=session, **kwargs)
scheduler | File "/opt/conda/envs/production/lib/python3.9/contextlib.py", line 126, in __exit__
scheduler | next(self.gen)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 33, in create_session
scheduler | session.commit()
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1435, in commit
scheduler | self._transaction.commit(_to_root=self.future)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 829, in commit
scheduler | self._prepare_impl()
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
scheduler | self.session.dispatch.before_commit(self.session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
scheduler | fn(*args, **kw)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/sqlalchemy.py", line 341, in _validate_commit
scheduler | raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
scheduler | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
scheduler | [2022-09-20 10:19:36,105] {plugins_manager.py:264} ERROR - Failed to import plugin /home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py
scheduler | Traceback (most recent call last):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/plugins_manager.py", line 256, in load_plugins_from_plugin_directory
scheduler | loader.exec_module(mod)
scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 9, in <module>
scheduler | class CustomTimetable(CronDataIntervalTimetable):
scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 10, in CustomTimetable
scheduler | def __init__(self, *args, something=Variable.get('something'), **kwargs):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 138, in get
scheduler | raise KeyError(f'Variable {key} does not exist')
scheduler | KeyError: 'Variable something does not exist'
scheduler | [2022-09-20 10:19:36,179] {scheduler_job.py:769} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
scheduler | Traceback (most recent call last):
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 752, in _execute
scheduler | self._run_scheduler_loop()
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 840, in _run_scheduler_loop
scheduler | num_queued_tis = self._do_scheduling(session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 914, in _do_scheduling
scheduler | self._start_queued_dagruns(session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1086, in _start_queued_dagruns
scheduler | dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 179, in get_dag
scheduler | self._add_dag_from_db(dag_id=dag_id, session=session)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 254, in _add_dag_from_db
scheduler | dag = row.dag
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/serialized_dag.py", line 209, in dag
scheduler | dag = SerializedDAG.from_dict(self.data) # type: Any
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1099, in from_dict
scheduler | return cls.deserialize_dag(serialized_obj['dag'])
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1021, in deserialize_dag
scheduler | v = _decode_timetable(v)
scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 189, in _decode_timetable
scheduler | raise _TimetableNotRegistered(importable_string)
scheduler | airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'custom_timetable.CustomTimetable' is not registered
Note that in this case, the Variable in question does exist, and the KeyError
is a red herring.
If you add a default_var
, things seem to work, though I wouldn’t trust it since there is clearly some context where it will fail to load the Variable and will always fall back to the default. Additionally, this still raises the UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
error, which I assume is a bad thing.
What you think should happen instead
I’m not sure whether or not this should be allowed. In my case, I was able to work around the error by making all Timetable initializer args required (no default values) and pulling the Variable.get
out into a wrapper function.
How to reproduce
custom_timetable.py
#!/usr/bin/env python3
from __future__ import annotations
from airflow.models.variable import Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.interval import CronDataIntervalTimetable
class CustomTimetable(CronDataIntervalTimetable):
def __init__(self, *args, something=Variable.get('something'), **kwargs):
self._something = something
super().__init__(*args, **kwargs)
class CustomTimetablePlugin(AirflowPlugin):
name = 'custom_timetable_plugin'
timetables = [CustomTimetable]
test_custom_timetable.py
#!/usr/bin/env python3
import datetime
import pendulum
from airflow.decorators import dag, task
from custom_timetable import CustomTimetable
@dag(
start_date=datetime.datetime(2022, 9, 19),
timetable=CustomTimetable(cron='0 0 * * *', timezone=pendulum.UTC),
)
def test_custom_timetable():
@task
def a_task():
print('hello')
a_task()
dag = test_custom_timetable()
if __name__ == '__main__':
dag.cli()
airflow variables set something foo
airflow dags trigger test_custom_timetable
Operating System
CentOS Stream 8
Versions of Apache Airflow Providers
None
Deployment
Other
Deployment details
I was able to reproduce this with:
- Standalone mode, SQLite DB, SequentialExecutor
- Self-hosted deployment, Postgres DB, CeleryExecutor
Anything else
Related: #21895
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 16 (16 by maintainers)
Sorry, yes, my diff along with passing the session to Variable.get`. Let me think how to get that …
Oh it’s quite easy. To get access to the current session
airflow.settings.Session()
– that’ll return the thread-local active session, and that coupled with previous fix means it won’t try to commit.I tend to agree with @uranusjr with one twist. We should make it in the way that this kind of error is better handled - the error message should explicitly say that you should not make any database operation inside of the Custom Timetable. Adding documentation is not enough. People won’t read the documentation and we can expect more issues like that popping up. I think ti shoudl be possible to catch this error and turn it into more explanatory message.
Same issue/discussion in https://github.com/apache/airflow/discussions/26533
This happens because we load timetable plugins in the scheduler process(decoding timetable during deserialization). So with
Variable.get
, we are using a new session different from the scheduler session. When that new session commits, it throws an error because of the different sessions. It doesn’t seem like a simple fix but I’m taking a look