dbt-spark: [CT-202] Workaround for some limitations due to `list_relations_without_caching` method

Describe the feature

I am currently facing an issue using DBT with Spark on AWS/Glue/EMR environment as discussed already in https://github.com/dbt-labs/dbt-spark/issues/215 (but already raised here https://github.com/dbt-labs/dbt-spark/issues/93).

The current issue is about the adapter’s method list_relations_without_caching:

https://github.com/dbt-labs/dbt/blob/HEAD/core/dbt/include/global_project/macros/adapters/common.sql#L240

which in the Spark Adapter implementation is:

https://github.com/dbt-labs/dbt-spark/blob/a8a85c54d10920af1c5efcbb4d2a51eb7cfcad11/dbt/include/spark/macros/adapters.sql#L133-L139

In this case you can see that the command show table extended in {{ relation }} like '*' is executed. It will force Spark to go through all the tables info in the schema (as Spark has not Information Schema Layer) in a sort of “discovery mode” and this approach produces two main issues:

  1. Bad performance: some environments can have hundreds or even thousands of tables generated not only by DBT but also by other processes in the same schema. In that case this operation can be very costly, especially when you have different DBT processes that run some updates at different times on a few tables.

  2. Instability, as I verified in AWS/Glue/EMR environment, where you can have views without “S3 Location” defined, like an Athena/Presto view, that will make crash a DBT process running SparkSql on EMR with errors like:

show table extended in my_schema like '*'
  ' with 62bb6394-b13a-4b79-92dd-2e0918831cf3
21/09/18 13:00:22 INFO SparkExecuteStatementOperation: Running query with 62bb6394-b13a-4b79-92dd-2e0918831cf3
21/09/18 13:01:03 INFO DAGScheduler: Asked to cancel job group 62bb6394-b13a-4b79-92dd-2e0918831cf3
21/09/18 13:01:03 ERROR SparkExecuteStatementOperation: Error executing query with 62bb6394-b13a-4b79-92dd-2e0918831cf3, currentState RUNNING,
org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: Can not create a Path from an empty string

Describe alternatives you’ve considered

I do not see the reason why DBT process should care of the “rest of the world” like the Athena views from before or tables created from other processes that are in the same schema.

So I can think ideally to replace the method:

show table extended in <schema> like '*'

with something like:

show table extended in <schema> like ('<table1>|<table2>|…')

where my <table1>, <table2>, etc. are determined automatically when I run a command like

dbt run --models my_folder

where my_folder contains the files: table1.sql, table2.sql, etc

but from the current method interface, only the schema params can be passed.

Two questions here: How can I infer automatically the name of the tables involved when a command like dbt run --models my_folder run and how can I pass them eventually to the list_relations_without_caching?

Additional context

I found it relevant for Spark on AWS environment but can be potentially a similar issue for other implementations.

Who will this benefit?

On DBT’s slack channel I talked to another used “affected” by similar issue, but probably whoever is going to use Spark in distributed environment can be affected by this (AWS and non).

Are you interested in contributing this feature?

Sure, both coding and testing.

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 41 (13 by maintainers)

Most upvoted comments

@jtcohen6 Thank you for the quick adjustments and 2 branch options!
Sorry for the delay, testing took longer than I thought.

I was able to complete testing and below are my findings and some related context around the testing. Additional Context & Final Thoughts to follow below testing/findings.

TL;DR

  • I’m definitely leaning towards Option2 as well
    • For everyone not interested in the entire thing haha
      • I know, holy wall of text, batman

Testing/Findings

  • All Testing / Findings do not include cluster startup time.
    • For every test of DBT Compile & DBT Run we manually started the cluster
      • once it was started & available then testing commenced
        • This was so no cluster startup time is included in the metadata calls timing test
  • All testing was done on an i3.2xlarge cluster size (61GB of memory, 4 CPUs)
    • 1 Worker
    • 1 Driver

dbt-spark v1.0.0 Code

show table extended in <schema> like '*'

  • In schemas that contain a few objects, performance concerns aren’t really existent; overhead is negligible
    • As objects in the schema grow, even if all are related to a DBT Project, this command starts to take longer
    • Spark Cluster size makes no difference here for execution time of show table extended in <schema> like '*'
  • Executing a single command at the beginning of DBT Run to populate the cache or catalog
  • I have attached a Screenshot showing the execution timings of show table extended in <schema> lilke '*' from the spark cluster as well as DBT command line for both DBT Compile & DBT Run
    • click to enlarge for better quality and readability current production dbt-spark metadata gather

dbt-spark wip-faster-caching-option1

show table extended in <schema> like <relation object|relation object|relation object|....> or as I will refer to it show table extended in <schema> like <list>

  • For any project in any schema this is a great step & more targeted metadata query
    • Only models related to the DBT Project are added to the like <list> for enumeration
  • Executing a subset of models or even a single model in the project
    • all models in the project are still added to the like <list> enumeration
      • Meaning, if we executed just our staging folder or a single model.sql
        • all 1,578 models are added to the like <list>
  • like <list> “limit” testing - Tested with 1,578 Models
    • No rejection from the spark cluster that the like <list> was to large
      • This is our current project size, it doesn’t mean that 1,579 will error, but it could
  • Execution timing on this command was less than 3 minutes for the 1,578 models
    • For Smaller Projects inside schemas that have objects not related to the DBT Project, this would be significantly faster metadata query
    • The schema this was tested in has 2,048 Tables and ~1,578 are related to the DBT Project
  • Definitely faster and more targeted than Current v1.0 Code Base
    • Again, executing a subsection of the Project, or a single model still incurs the lookup of all objects
  • I have attached a Screenshot showing the execution timings of show table extended in <schema> like <list> from the spark cluster as well as DBT command line for both DBT Compile and DBT Run
    • click to enlarge for better quality and readability option1 dbt-spark metadata gather

dbt-spark wip-faster-caching-option2

show tables in <schema> show views in <schema>

followed by describe extended <objects> for any “refs” inside the model.sql

  • Show Tables & Show Views in <schema> is lightning fast
    • Both commands on the schema in question execute in less than 10 seconds combined
    • describe extended <object> is quick as well
      • milliseconds execution for any single object
  • DBT Run though suffers in terms of overall execution compared to Option1 due to the describe event occurring every model
  • DBT Compile is the command that suffers the most with this approach
    • DBT Docs would as well due to needing to run describe extended 1 by 1 for these commands compared to like <list>|'*'
    • describe extended <object> doesn’t provide whether the column is nullable or not
      • data availability inconsistency between show table extended in <schema> like <list>|'*' and describe extended <object>
  • In regards to some principal concerns here: https://github.com/dbt-labs/dbt-spark/issues/296#issuecomment-1072211686
    • Tested/Verified External Object creation to make sure it still created Delta objects “as <path>”
    • Tested/Verified that “is_incremental()” models still executed properly
    • Tested/Verified that a merge statement was used
      • Our dbt_project.yml absolutely has +file_format: delta set though, maybe that’s entirely why this passed my testing
  • I have attached a Screenshot showing the execution timings of show tables in <schema> & show views in <schema> from the spark cluster as well as DBT command line for both DBT Compile and DBT Run option2 dbt-spark metadata gather

Additional Context

How we are executing DBT Projects/Models at Slickdeals

I understand this might not be the recommended or normal approach. Below is an example Screenshot of our implementation and notes following DBT Airflow DAG execution details

  • We have farmed out the execution of every single model to a single Airflow Task where every task calls DBT Run --select model.sql
    • model.sql is every individual model in the graph
    • Allows us to let Airflow handle execution state tracking
    • Allows us to parallelize execution of the entire graph at a very low cost/overhead via MWAA
      • other than the core calls that are executed for every “DBT Run” call
        • This is absolutely what is crushing our execution times & environment
        • we end up having to call show table extended in <schema> like '*' for every model (nearly 900+ times currently)
  • We have a limited DAG capacity though using MWAA
    • why this project isn’t entirely broken down into many projects
    • plus all models in this project handle applying replicated data transactions to our Datalake from on premise via the same code template used to generate all the models

I totally understand as well that our implementation might not be normal or recommended and is part, if not entirely, the reason we are experiencing problems. Of course, none of these are your problems but I greatly appreciate the work being put into making caching faster overall

Reasons why we went the above Implementation route

  • Execution State Tracking to restart failed models
    • There may be changes upstream or unexpected data that flows through the system or schema changes that we are not aware of and we wanted the ability to restart the DAG where it failed while letting other tasks to continue to execute and eventually complete their execution path.
  • Parallelization of task execution
    • As we are migrating towards cloud infrastructure, new tools and building an entirely new platform, we needed to be able to execute 100’s of steps in parallel as a lot of our DAGs only have limited dependencies and take the view of eventual consistency
      • I love FK’s and ensuring referential consistency but as items get more and more distributed that becomes a “secondary” task - say DBT Test
      • Current on premise “DAGs” run 100’s of steps in parallel and we needed like for like

Final Thoughts

show table extended in <schema> like '*'

  • Because our implementation calls DBT Run every time, we incur this cost every time
  • This command run by itself this call can take 5-8+ minutes, which isn’t terrible really
    • our implementation makes it terrible sadly
  • Regardless of implementation though, this could be more a more targeted cache query, especially for others use cases as well as schemas that contain objects unrelated to DBT DAG processing

show table extended in <schema> like <list>

  • I like it overall for DBT Compile & DBT Docs
  • Potential serious draw back due to the <list> could get too long to be executed and error
  • For DBT Run it’s still faster than like '*'
  • I think this fits the methodology that is recommended for executing DBT DAGs
    • probably how it is normally executed by most using DBT
      • Could be wrong though
  • Eventually the list will get so long where performance could “suffer” again?
    • Not sold on this, but maybe

show tables in <schema> show views in <schema> followed by describe extended <objects> for any “refs” inside the model.sql

  • I love this method overall, especially for our implementation!
    • In fact I’m trying to get it into a python wheel and uploaded to pypi so we can install it to fix some of our current production issues
    • I understand that this is purely a work in progress and there might be other drawbacks were missing or enabling by using this methodology, especially asap.
      • I understand all code is “as is” etc
  • This is of course the most targeted metadata query gathering and ideal for how we have implemented DBT
  • As you’ve outlined here https://github.com/dbt-labs/dbt-spark/issues/296#issuecomment-1072211686 , this method is faster in larger models and slower in small models
  • In that regards, you also mention in https://github.com/dbt-labs/dbt-spark/issues/296#issuecomment-1072211686 Principal risk of approach 2: is that you’d be missing is_delta
    • describe extended does have a Provider attribute available, maybe that can be used to reduce this risk? describe extended output
    • Not saying that you don’t have to run an additional query describe extended <object> to get these results, but could those then be cached?
      • Thereby allowing you to still do cache lookup checks?
        • Just thinking out loud here, maybe you still cannot and must run the describe extended <object> again

I don’t want you to have to finagle support for both Option1 and Option2

  • Overall I’m leaning towards Option2 heavily.
    • Slightly biased by the significant benefit it would have for our implementation of course
    • I still feel that option 2 overall is more targeted
  • Our implementation would still be faster with Option1
    • Option2 is exactly what we’d want and would like to with from our perspective
      • Open Source being what it is and all
  • I feel like Option 2 is the most performant
  • I’m definitely leaning towards Option2 as well

Again, thank you very much for the two options, and to everyone if you got this far kudos!

We have the same issue, but our problem is not focused on performance but rather on the loss of information. We are using iceberg’s upsert functionality and, when this error occurs, dbt assumes that the models do not exist and instead of doing an upsert it executes a drop and create resulting in the loss of the previous information. Has anyone experienced the same problem as us? @jtcohen6 do you have any thoughs on this?

Is there any progress with a fix? The solution jtcohen6 suggested really benefits our use-case… Is there any estimations?

Very interested in this fix – @jtcohen6 @TalkWIthKeyboard do you think there’s anything I could do to help with #433, or do you think it’s best to let y’all handle it?

We are also facing the same issue with iceberg. Ideally it should explicitly fail if the show tables command fails instead of retrying.

The failure is an intermittent one and get resolved in the rerun. However, there is an explicit loss of data because of the create or replace command.

We are facing this issue in dbt-spark 1.6.0

I’m working with a dbt Cloud customer who have an existing, long-lived Spark cluster and are keen to try dbt Core/Cloud with it. They have 10,000+ objects in Spark and are experiencing this issue. They are following along to see its resolution!

Yes, I think that’s right. Let’s aim to include this change in a v1.1 beta, and ask folks to test it out in their own projects to ensure it yields the desired speedups.

cc @VersusFacit: This is another cache-related performance question, similar to https://github.com/dbt-labs/dbt-snowflake/issues/83, where benchmarking (and anticipating edge cases) will be the name of the game.

The change proposed in this issue will significantly help in cases where dbt manages some objects, in large databases that also contain thousands of non-dbt objects. If we want to speed up cases where dbt manages thousands of objects, and a given invocation is only selecting a handful to run, we’ll need to tackle the larger question of selection-based caching: https://github.com/dbt-labs/dbt-core/issues/4688

Then the path forward seems to be the implementation of your original idea: to generate the like 'table_one|table_two|...' from DBT’s project information, is that right?