airflow: Variable.get inside of a custom Timetable breaks the Scheduler

Apache Airflow version

2.3.4

What happened

If you try to use Variable.get from inside of a custom Timetable, the Scheduler will break with errors like:

 scheduler | [2022-09-20 10:19:36,104] {variable.py:269} ERROR - Unable to retrieve variable from secrets backend (MetastoreBackend). Checking subsequent secrets backend.
 scheduler | Traceback (most recent call last):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 265, in get_variable_from_secrets
 scheduler | var_val = secrets_backend.get_variable(key=key)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
 scheduler | return func(*args, session=session, **kwargs)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/contextlib.py", line 126, in __exit__
 scheduler | next(self.gen)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 33, in create_session
 scheduler | session.commit()
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1435, in commit
 scheduler | self._transaction.commit(_to_root=self.future)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 829, in commit
 scheduler | self._prepare_impl()
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
 scheduler | self.session.dispatch.before_commit(self.session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__
 scheduler | fn(*args, **kw)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/sqlalchemy.py", line 341, in _validate_commit
 scheduler | raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
 scheduler | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
 scheduler | [2022-09-20 10:19:36,105] {plugins_manager.py:264} ERROR - Failed to import plugin /home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py
 scheduler | Traceback (most recent call last):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/plugins_manager.py", line 256, in load_plugins_from_plugin_directory
 scheduler | loader.exec_module(mod)
 scheduler | File "<frozen importlib._bootstrap_external>", line 850, in exec_module
 scheduler | File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
 scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 9, in <module>
 scheduler | class CustomTimetable(CronDataIntervalTimetable):
 scheduler | File "/home/tsanders/airflow_standalone_sqlite/plugins/custom_timetable.py", line 10, in CustomTimetable
 scheduler | def __init__(self, *args, something=Variable.get('something'), **kwargs):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/variable.py", line 138, in get
 scheduler | raise KeyError(f'Variable {key} does not exist')
 scheduler | KeyError: 'Variable something does not exist'
 scheduler | [2022-09-20 10:19:36,179] {scheduler_job.py:769} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
 scheduler | Traceback (most recent call last):
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 752, in _execute
 scheduler | self._run_scheduler_loop()
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 840, in _run_scheduler_loop
 scheduler | num_queued_tis = self._do_scheduling(session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 914, in _do_scheduling
 scheduler | self._start_queued_dagruns(session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1086, in _start_queued_dagruns
 scheduler | dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
 scheduler | return func(*args, **kwargs)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 179, in get_dag
 scheduler | self._add_dag_from_db(dag_id=dag_id, session=session)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagbag.py", line 254, in _add_dag_from_db
 scheduler | dag = row.dag
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/serialized_dag.py", line 209, in dag
 scheduler | dag = SerializedDAG.from_dict(self.data)  # type: Any
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1099, in from_dict
 scheduler | return cls.deserialize_dag(serialized_obj['dag'])
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1021, in deserialize_dag
 scheduler | v = _decode_timetable(v)
 scheduler | File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 189, in _decode_timetable
 scheduler | raise _TimetableNotRegistered(importable_string)
 scheduler | airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'custom_timetable.CustomTimetable' is not registered

Note that in this case, the Variable in question does exist, and the KeyError is a red herring.

If you add a default_var, things seem to work, though I wouldn’t trust it since there is clearly some context where it will fail to load the Variable and will always fall back to the default. Additionally, this still raises the UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS! error, which I assume is a bad thing.

What you think should happen instead

I’m not sure whether or not this should be allowed. In my case, I was able to work around the error by making all Timetable initializer args required (no default values) and pulling the Variable.get out into a wrapper function.

How to reproduce

custom_timetable.py

#!/usr/bin/env python3
from __future__ import annotations

from airflow.models.variable import Variable
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something=Variable.get('something'), **kwargs):
        self._something = something
        super().__init__(*args, **kwargs)


class CustomTimetablePlugin(AirflowPlugin):
    name = 'custom_timetable_plugin'
    timetables = [CustomTimetable]

test_custom_timetable.py

#!/usr/bin/env python3
import datetime

import pendulum
from airflow.decorators import dag, task
from custom_timetable import CustomTimetable


@dag(
    start_date=datetime.datetime(2022, 9, 19),
    timetable=CustomTimetable(cron='0 0 * * *', timezone=pendulum.UTC),
)
def test_custom_timetable():
    @task
    def a_task():
        print('hello')

    a_task()


dag = test_custom_timetable()


if __name__ == '__main__':
    dag.cli()
airflow variables set something foo
airflow dags trigger test_custom_timetable

Operating System

CentOS Stream 8

Versions of Apache Airflow Providers

None

Deployment

Other

Deployment details

I was able to reproduce this with:

  • Standalone mode, SQLite DB, SequentialExecutor
  • Self-hosted deployment, Postgres DB, CeleryExecutor

Anything else

Related: #21895

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 16 (16 by maintainers)

Most upvoted comments

Sorry, yes, my diff along with passing the session to Variable.get`. Let me think how to get that …

Oh it’s quite easy. To get access to the current session airflow.settings.Session() – that’ll return the thread-local active session, and that coupled with previous fix means it won’t try to commit.

I’d say this should be considered an user error. We can add a note in the documentation explaining that value retrievals (not just Variable but all db and config accesses in general) should be done lazily. This is just how Python works.

I tend to agree with @uranusjr with one twist. We should make it in the way that this kind of error is better handled - the error message should explicitly say that you should not make any database operation inside of the Custom Timetable. Adding documentation is not enough. People won’t read the documentation and we can expect more issues like that popping up. I think ti shoudl be possible to catch this error and turn it into more explanatory message.

This happens because we load timetable plugins in the scheduler process(decoding timetable during deserialization). So with Variable.get, we are using a new session different from the scheduler session. When that new session commits, it throws an error because of the different sessions. It doesn’t seem like a simple fix but I’m taking a look