airflow-provider-great-expectations: data_asset_name is not recognized in airflow-provider-great-expectations==0.2.0
Hi team, we are working on integrating GX with snowflake datasource into our data validation system via GreatExpectationsOperator
.
We are planning to run some expectations validation against a snowflake table named test_sf_table . However, we are getting KeyError: 'data_asset_name test_sf_table is not recognized.'
when running our DAG. We have try both upper and lower cases with and without schema, such as data_asset_name: <schema_name>.<table_name>
.
Does anyone know what the issue could be? Or is there any configuration issue in my data_context_config, checkpoint_config? Any help would be greatly appreciated ~~
Detailed Info:
we are using
airflow-provider-great-expectations==0.2.0
datasource_config:
sf_url = f'snowflake://{username}:{password}@{account}.{region}/{database}/{schema}?warehouse={warehouse}&role={role}&application=great_expectations_oss'
sf_datasource_config = {
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": sf_url,
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"include_schema_name": True,
"included_tables": f"{schema}.test_sf_table".lower()
},
},
}
data_context_config:
base_path = Path(__file__).parents[3]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")
snowflake_data_context_config = DataContextConfig(
**{
"config_version": 3.0,
"datasources": {
"my_snowflake_datasource": sf_datasource_config
},
"stores": {
"expectations_store": {
"class_name": "ExpectationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(ge_root_dir, "expectations"),
},
},
"validations_store": {
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(
ge_root_dir, "uncommitted", "validations"
),
},
},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"checkpoint_store": {
"class_name": "CheckpointStore",
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"suppress_store_backend_id": True,
"base_directory": os.path.join(ge_root_dir, "checkpoints"),
},
},
},
"expectations_store_name": "expectations_store",
"validations_store_name": "validations_store",
"evaluation_parameter_store_name": "evaluation_parameter_store",
"checkpoint_store_name": "checkpoint_store",
"data_docs_sites": {
"local_site": {
"class_name": "SiteBuilder",
"show_how_to_buttons": True,
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": os.path.join(
ge_root_dir, "uncommitted", "data_docs", "local_site"
),
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
}
},
"anonymous_usage_statistics": {
"data_context_id": "abcdabcd-1111-2222-3333-abcdabcdabcd",
"enabled": True,
},
"notebooks": None,
"concurrency": {"enabled": False},
}
)
checkpoint_config:
snowflake_checkpoint_config = CheckpointConfig(
**{
"name": "test_sf_checkpoint",
"config_version": 1.0,
"template_name": None,
"module_name": "great_expectations.checkpoint",
"class_name": "Checkpoint",
"run_name_template": "%Y%m%d-%H%M%S-test-sf-checkpoint",
"expectation_suite_name": "sf_test.demo",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction", "site_names": []},
},
],
"evaluation_parameters": {},
"runtime_configuration": {},
"validations": [
{
"batch_request": {
"datasource_name": "my_snowflake_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "test_sf_table".lower(),
"data_connector_query": {"index": -1},
},
}
],
"profilers": [],
"ge_cloud_id": None,
"expectation_suite_ge_cloud_id": None,
}
)
operator:
ge_snowflake_validation = GreatExpectationsOperator(
task_id="test_snowflake_validation",
data_context_config=snowflake_data_context_config,
checkpoint_config=snowflake_checkpoint_config
)
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 18 (18 by maintainers)
@zhangchi1 something I’m seeing in this latest set of configs is first in the datasource config, you have:
"table_name": '"MY_TABLE"',
with double quotes around the table name (and I don’t think you need thetable_name
param at all), then in the checkpoint conf you have"data_asset_name": 'MY_TABLE',
with no double quotes. I think this may be causing your table from thedata_asset_name
to not be looked up correctly.Let me quote part of my previous reply:
I have not used the
SimpleSqlalchemyDatasource
before, but you may need to use that instead ofDatasource
in the line you have now in your data source config:"class_name": "Datasource",
Hi @zhangchi1 - my mistake. It sounds like the issue is most likely not a lowercase table name issue.
I’m not sure what you mean by this:
What version of great_expectations are you running? More importantly, do you get the same error when running this outside of Airflow with the same configs?
Yeah, that documentation is fine for the operator, too. I just saw in the other doc you linked, it specifically showcased
SimpleSqlAlchemyDatasource
, and that’s why I’m also asking which version of Great Expectations proper you’re running, too. You may have to upgrade that to use the other datasource. I’d also ask about that specific question of tables on the GX slack, you’ll likely get a more detailed answer there.@talagluck might be able to help here as well.
The
data_asset_name
should be just the table name in this case. Which version of Great Expectations are you using? I see you’re using a newer version of the provider. You can also try using the default checkpoint the provider builds for you and see if that works. An issue may be that you have a different connection in your Airflow Connections than in your GE datasource, this will be resolved if you let the operator build a connection for you.I noticed in the doc you linked it says “SimpleSqlalchemyDatasource supports a number of configuration options to assist you with the introspection of your SQL database:” and it does not look like you’re using a
SimpleSqlalchemyDatasource
. Not sure if that’s the issue, but it may be part of it, and may be why it’s doing the full scan still.Also in your
CheckpointConfig
, in thebatch_request
, I’d try removing the.lower()
in the data_asset_name and removing the"data_connector_query"
, neither seem to be needed. (Just trying to see any potential issues here, I know I’ve had problems with the config files before).