airflow: `SQLColumnCheckOperator` failures after upgrading to `common-sql==1.3.0`

Apache Airflow Provider(s)

common-sql

Versions of Apache Airflow Providers

apache-airflow-providers-google==8.2.0 apache-airflow-providers-http==4.0.0 apache-airflow-providers-salesforce==5.0.0 apache-airflow-providers-slack==5.1.0 apache-airflow-providers-snowflake==3.2.0

Issue: apache-airflow-providers-common-sql==1.3.0

Apache Airflow version

2.4.3

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Astronomer

Deployment details

No response

What happened

Problem occurred when upgrading from common-sql=1.2.0 to common-sql=1.3.0

Getting a KEY_ERROR when running a unique_check and null_check on a column.

1.3.0 log: Screen Shot 2022-11-28 at 2 01 20 PM

1.2.0 log: Screen Shot 2022-11-28 at 2 00 15 PM

What you think should happen instead

Potential causes:

  • seems to be indexing based on the test query column COL_NAME instead of the table column STRIPE_ID
  • the record from the test changed types went from a tuple to a list of dictionaries.
  • no tolerance is specified for these tests, so .get('tolerance') looks like it will cause an error without a default specified like .get('tolerance', None)

Expected behavior:

  • these tests continue to pass with the upgrade
  • tolerance is not a required key.

How to reproduce

from datetime import datetime
from airflow import DAG

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator

my_conn_id = "snowflake_default"

default_args={"conn_id": my_conn_id}

with DAG(
    dag_id="airflow_providers_example",
    schedule=None,
    start_date=datetime(2022, 11, 27),
    default_args=default_args,
) as dag:

    create_table = SnowflakeOperator(
        task_id="create_table",
        sql=""" CREATE OR REPLACE TABLE testing AS (
                        SELECT
                            1 AS row_num,
                            'not null' AS field

                        UNION ALL

                        SELECT
                            2 AS row_num,
                            'test' AS field

                        UNION ALL

                        SELECT
                            3 AS row_num,
                            'test 2' AS field
                    )""",
    )

    column_checks = SQLColumnCheckOperator(
        task_id="column_checks",
        table="testing",
        column_mapping={
            "field": {"unique_check": {"equal_to": 0}, "null_check": {"equal_to": 0}}
        },
    )

    create_table >> column_checks

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 16 (15 by maintainers)

Commits related to this issue

Most upvoted comments

My vote is towards standardizing Snowflake’s hook with other DBApi Hooks, and having 4.0.2 being “the” breaking change for deep usages.

OK. @denimalpaca @kazanzhy.

I know why it is happening - it is Snowflake-only thing. And I am not 100% sure yet how to fix it in a good way.

The root cause of the problem is this line in Snowflake Hook’s run() method:

 with closing(conn.cursor(DictCursor)) as cur: 

The result of it is that after the #26944 change, get_records() and get_first() changed the result type to return dictionaries and not sequences (previously each of those methods had their own implementations and did not use run() method, so they used “standard” sensor).

Simply speaking: SnowflakeHook’s run() method is not standard - instead of sequence of sequences, it returns sequence of dicts. This is absoluately not standard behaviour for DBApI - this is simply creative aproach of snowflake python connector to return Dicts: https://docs.snowflake.com/en/user-guide/python-connector-api.html#cursor

I think the best fix is:

  • as of common-sql 1.3.1 we don not need to use DictCursor any more. We keep “.description” field in the hook and we can use it to convert the return value in Operator from Sequences to Dics (see below)
  • we standardise SnowflakeHook’s run() method to return sequences by default (breaking change - but only for “deep” usages for where hook is used by custom operators or taskflow cases)
  • we keep the SnowflakeOperator behaviour to return Dicts (when do_xcom_push = True) to avoid breaking changes at “task dependency” level (we can easily convert the output by combining description + results from the Hook into return value of the SnowflakeOperator.

This would be almost identical change to what I’ve done for Databricks last week (only Databricks internals were different - no DictCursor and it returned Tuples rather than Dicts):

https://airflow.apache.org/docs/apache-airflow-providers-databricks/stable/index.html#breaking-changes

The DatabricksSqlHook is now conforming to the same semantics as all the other DBApiHook implementations and returns the same kind of response in its run method. Previously (pre 4.* versions of the provider, the Hook returned Tuple of (“cursor description”, “results”) which was not compatible with other DBApiHooks that return just “results”. After this change (and dependency on common.sql >= 1.3.1), The DatabricksSqlHook returns now “results” only. The description can be retrieved via last_description field of the hook after run method completes.

That would require Snowflake 5.0.0 breaking change release, and basically making whole 4.0.* yanked. Or probably better - we could relase 4.0.2 with adding this breaking change and yanking 4.0.0, 4.0.1 as “wrong”. Making 4.0.2 the “real breaking” change we actually wanted (or started to want) to make in 4.0* . 4.0.* is very fresh and has not been released yet in constraints.

The end result will be that we will finally (I hope) standardise the semantics of DBApiHook.run(). So far, each of the hooks could change it (and some did as we see) - this change seems to finally got all the run() methods to behave in the same way.

WDYT Everyone?

We had the same issue on the Astro SDK side, which we had to fix by https://github.com/astronomer/astro-sdk/commit/0c64f486be2dd7c0a176ef3b43fed7544844b87d

I do not know yet 😃 . I am travelling most of today and I had no time to look closely as I was releasing 1.3.1 + other providers… I will see later today but maybe you can see in the meantime what would be the possible reason.