airflow: Airflow schedules tasks in wrong timezone with an MSSQL Metadata DB on a non-UTC server

Apache Airflow version

2.2.2

What happened

Airflow schedules a task an hour earlier than expected, when using an MSSQL metadata database where the DB server is set to the CET timezone. The screenshot below shows the DAG starting an hour before the end of the data interval.

image

What you expected to happen

Airflow schedules the task at the correct time in UTC.

How to reproduce

It’s hard to describe a complete reproducible method since it relies on having an MSSQL Server with particular settings.

A relevant DAG would be a simple as:

with DAG(
    dag_id="example_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="0 9 * * 1-5",
) as dag:
    task = DummyOperator(task_id="dummy")

And Airflow config of:

default_timezone = utc

This DAG would then be scheduled an hour earlier than expected.

Operating System

Redhat UBI 8

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

Airflow scheduler and webserver each running in a docker container based on Redhat UBI 8. Metadata DB is MSSQL Server running on a Windows Server where the server timezone is CET.

Anything else

In our installation, the problem is happening for any DAG with a UTC based schedule.

I believe the root cause is this line of code: https://github.com/apache/airflow/blob/6405d8f804e7cbd1748aa7eed65f2bbf0fcf022e/airflow/models/dag.py#L2872

On MSSQL, func.now() appears to correspond to GETDATE(), which returns the current time in the timezone of the DB server. But next_dagrun_create_after is stored in the database as UTC (in a datetime2 column, which doesn’t include timezone information). So this line of code is equivalent to “Is the current time in CET before the next creation time in UTC?”, meaning that a DAG that should start at 09:00 UTC starts at 09:00 CET instead, one hour early.

I can verify that func.now() returns CET with the SQLAlchemy code engine.execute(sa.select([sa.func.now()])).fetchall().

I think the correct way to get the current time in UTC on MSSQL is GETUTCDATE().

We ran Airflow 1.10 previously without seeing this problem. From what I can tell, in that version the date comparison is done on the application side rather than in the DB.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 3
  • Comments: 26 (23 by maintainers)

Most upvoted comments

Hi, We are also encountering the same issue with airflow. We are using SQLServer backend which is not running on a UTC timezone. Since the db query that creates dagruns that need to be scheduled uses CURRENT TIMESTAMP, we are seeing scheduling lag of 7 hours since database runs in timezone which is 7 hours behind UTC. Any ETA for when the fix will be available?

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB’s version of “now” in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what “now” is and pass it as a literal to the DB?

Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872

I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179

I’m not sure if there is a particular reason why func.now() is needed in the first two instances?

@mattinbits thanks for the internal fix just to clarify do you mean replacing func.now() with func.GETUTCDATE() instead of func.GETDATE()? When I made those changes you suggested with func.GETUTCDATE() it worked.

@uranusjr is it the case then that you prefer a solution that continues to rely on the database to get the current timestamp, rather than adopt timezone.utcnow() and calculated the current timestamp within the application? Changing to calculate it on the application side would be the simpler fix, and would be consistent with the other places in the codebase that already work this way.

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB’s version of “now” in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what “now” is and pass it as a literal to the DB?

Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872

I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179

I’m not sure if there is a particular reason why func.now() is needed in the first two instances?

I think db-specific case will be better (if simple). We already have ~500 deps in Airflow total (including transitive) and while adding one more seems like no-biggie, adding a ‘util’ in Airlfow seems to be more “straightforward”.

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB’s version of “now” in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what “now” is and pass it as a literal to the DB? Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872 I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179 I’m not sure if there is a particular reason why func.now() is needed in the first two instances?

@mattinbits thanks for the internal fix just to clarify do you mean replacing func.now() with func.GETUTCDATE() instead of func.GETDATE()? When I made those changes you suggested with func.GETUTCDATE() it worked.

You’re right, it was mistake in my initial comment.

@potiuk As an initial fix for our internal system, I replaced func.now() with func.GETDATE() in an internal fork which is enough to fix it for SQL Server. This is not enough for cross-database compatibility of course. I have considered creating something similar to the functionality of sqlalchemy-utc to airflow.utils.sqlalchemy, but I had second thoughts whether this additional complexity is necessary. Do we need to rely on the DB’s version of “now” in this case? Can we instead use timezone.utcnow(), i.e. let the application server decide what “now” is and pass it as a literal to the DB?

Here are two places where func.now() is used in a filter, exposing this issue: https://github.com/apache/airflow/blob/main/airflow/models/dagrun.py#L294 https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2872

I can see in other places that timezone.utcnow() is used: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L293 https://github.com/apache/airflow/blob/main/airflow/models/trigger.py#L179

I’m not sure if there is a particular reason why func.now() is needed in the first two instances?

Thanks for this, was racking my brain trying to figure out why my DAGs were not running at the right time. I was about to accept that I had gone insane. Changing func.now() to func.GETUTCDATE() in dag.py and dagrun.py worked for me to get my schedules to run at the expected time.

still no solution? I have the same problem which causes my dags not to be executed