airflow-provider-great-expectations: data_asset_name is not recognized in airflow-provider-great-expectations==0.2.0

Hi team, we are working on integrating GX with snowflake datasource into our data validation system via GreatExpectationsOperator .

We are planning to run some expectations validation against a snowflake table named test_sf_table . However, we are getting KeyError: 'data_asset_name test_sf_table is not recognized.' when running our DAG. We have try both upper and lower cases with and without schema, such as data_asset_name: <schema_name>.<table_name> .

Does anyone know what the issue could be? Or is there any configuration issue in my data_context_config, checkpoint_config? Any help would be greatly appreciated ~~

Detailed Info: we are using airflow-provider-great-expectations==0.2.0

datasource_config:

sf_url = f'snowflake://{username}:{password}@{account}.{region}/{database}/{schema}?warehouse={warehouse}&role={role}&application=great_expectations_oss'

sf_datasource_config = {
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "class_name": "SqlAlchemyExecutionEngine",
            "connection_string": sf_url,
        },
        "data_connectors": {
            "default_runtime_data_connector_name": {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": ["default_identifier_name"],
            },
            "default_inferred_data_connector_name": {
                "class_name": "InferredAssetSqlDataConnector",
                "include_schema_name": True,
                "included_tables": f"{schema}.test_sf_table".lower()
            },
        },
    }

data_context_config:

base_path = Path(__file__).parents[3]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")
snowflake_data_context_config = DataContextConfig(
    **{
        "config_version": 3.0,
        "datasources": {
            "my_snowflake_datasource": sf_datasource_config
        },
        "stores": {
            "expectations_store": {
                "class_name": "ExpectationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(ge_root_dir, "expectations"),
                },
            },
            "validations_store": {
                "class_name": "ValidationsStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "validations"
                    ),
                },
            },
            "evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
            "checkpoint_store": {
                "class_name": "CheckpointStore",
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "suppress_store_backend_id": True,
                    "base_directory": os.path.join(ge_root_dir, "checkpoints"),
                },
            },
        },
        "expectations_store_name": "expectations_store",
        "validations_store_name": "validations_store",
        "evaluation_parameter_store_name": "evaluation_parameter_store",
        "checkpoint_store_name": "checkpoint_store",
        "data_docs_sites": {
            "local_site": {
                "class_name": "SiteBuilder",
                "show_how_to_buttons": True,
                "store_backend": {
                    "class_name": "TupleFilesystemStoreBackend",
                    "base_directory": os.path.join(
                        ge_root_dir, "uncommitted", "data_docs", "local_site"
                    ),
                },
                "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
            }
        },
        "anonymous_usage_statistics": {
            "data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
            "enabled": True,
        },
        "notebooks": None,
        "concurrency": {"enabled": False},
    }
)

checkpoint_config:

snowflake_checkpoint_config = CheckpointConfig(
    **{
        "name": "test_sf_checkpoint",
        "config_version": 1.0,
        "template_name": None,
        "module_name": "great_expectations.checkpoint",
        "class_name": "Checkpoint",
        "run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
        "expectation_suite_name": "sf_test.demo",
        "action_list": [
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
            },
            {
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
            },
            {
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction", "site_names": []},
            },
        ],
        "evaluation_parameters": {},
        "runtime_configuration": {},
        "validations": [
            {
                "batch_request": {
                    "datasource_name": "my_snowflake_datasource",
                    "data_connector_name": "default_inferred_data_connector_name",
                    "data_asset_name": "test_sf_table".lower(),
                    "data_connector_query": {"index": -1},
                },
            }
        ],
        "profilers": [],
        "ge_cloud_id": None,
        "expectation_suite_ge_cloud_id": None,
    }
)

operator:

ge_snowflake_validation = GreatExpectationsOperator(
    task_id="test_snowflake_validation",
    data_context_config=snowflake_data_context_config,
    checkpoint_config=snowflake_checkpoint_config
)

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 18 (18 by maintainers)

Most upvoted comments

@zhangchi1 something I’m seeing in this latest set of configs is first in the datasource config, you have: "table_name": '"MY_TABLE"', with double quotes around the table name (and I don’t think you need the table_name param at all), then in the checkpoint conf you have "data_asset_name": 'MY_TABLE', with no double quotes. I think this may be causing your table from the data_asset_name to not be looked up correctly.

thanks @denimalpaca , I’m trying it right now.

Also in your CheckpointConfig, in the batch_request, I’d try removing the .lower() in the data_asset_name and removing the “data_connector_query”, neither seem to be needed. (Just trying to see any potential issues here, I know I’ve had problems with the config files before).

In the meantime, like I mentioned in the previous thread. I realized that it was able to connect to my snowflake, and did a lot of table scans (including tables outside our my database and schema specified in the sf_url). Do you know why this is happening or there is a way to only scan my specified database and schema? I feel like maybe this could also be an issue, since there are tons of tables in the snowflake and it could exceed the limit?

Let me quote part of my previous reply:

I noticed in the doc you linked it says “SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:” and it does not look like you’re using a SimpleSqlalchemyDatasource. Not sure if that’s the issue, but it may be part of it, and may be why it’s doing the full scan still.

I have not used the SimpleSqlalchemyDatasource before, but you may need to use that instead of Datasource in the line you have now in your data source config: "class_name": "Datasource",

Hi @zhangchi1 - my mistake. It sounds like the issue is most likely not a lowercase table name issue.

I’m not sure what you mean by this:

I tried the above configs, but still got the keyerror. @talagluck I’m wondering if the issue could be https://github.com/great-expectations/great_expectations/issues/6260 ? Since the great_expectations airflow operator hasn’t upgraded with the latest great_expectations version?

What version of great_expectations are you running? More importantly, do you get the same error when running this outside of Airflow with the same configs?

Yeah, that documentation is fine for the operator, too. I just saw in the other doc you linked, it specifically showcased SimpleSqlAlchemyDatasource, and that’s why I’m also asking which version of Great Expectations proper you’re running, too. You may have to upgrade that to use the other datasource. I’d also ask about that specific question of tables on the GX slack, you’ll likely get a more detailed answer there.

@talagluck might be able to help here as well.

The data_asset_name should be just the table name in this case. Which version of Great Expectations are you using? I see you’re using a newer version of the provider. You can also try using the default checkpoint the provider builds for you and see if that works. An issue may be that you have a different connection in your Airflow Connections than in your GE datasource, this will be resolved if you let the operator build a connection for you.

I noticed in the doc you linked it says “SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:” and it does not look like you’re using a SimpleSqlalchemyDatasource. Not sure if that’s the issue, but it may be part of it, and may be why it’s doing the full scan still.

Also in your CheckpointConfig, in the batch_request, I’d try removing the .lower() in the data_asset_name and removing the "data_connector_query", neither seem to be needed. (Just trying to see any potential issues here, I know I’ve had problems with the config files before).