dbt-spark: [CT-431] Support for Pyspark driver
Describe the feature
Add a forth connection option to the dbt-adapter. This forth connection would create a pyspark context and utilize the spark.sql(<sql>) function to execute sql statements.
Describe alternatives you’ve considered
The alternative is to run your own thrift server which is difficult to setup.
Who will this benefit?
A pyspark context gives you control over the spark application configuration and amount of resources to use in the spark cluster. It also enables the user to register custom pyspark UDF functions and to create custom views based on pyspark Dataframes.
- control spark job creation
- register custom pyspark UDF
- register custom views based on pypsark Dataframes
Are you interested in contributing this feature?
Yes for sure. I have an implementation of this feature which I would like to contribute to this project.
Here’s how it works from the user’s point of view.
Specify a new connection method pyspark
profiles.yml
project1:
target: dev
outputs:
dev:
type: spark
method: pyspark
python_module: spark.spark_context
The python_module points to a python file found in the PYTHONPATH. In this case spark/spark_context.py. The pyspark adapter will call the create_spark_context() function found in this file. The user can thus create their own spark context (either a local instance or one that leverages a cluster). This hook also lets you create custom pyspark UDF registrations.
spark/spark_context.py
def create_spark_context():
return SparkSession.builder.getOrCreate()
A second hook is available in the the sources. This hook lets the user register custom pyspark Dataframe as an sql view df.createOrReplaceTempView(<view name>). The user is free to create any Dataframe logic they need.
sources.yml
sources:
- name: custom
tables:
- name: raw_users
meta:
python_module: models.staging.raw_users
The python_module is loaded the same way except that the hook function has a different signature.
def create_dataframe(spark, start_time, end_time)
Using pyspark you can work around the limitations of SparkSQL when reading datafiles. For example using pyspark you can load the schema of the json you are going to read. Thus avoiding sparks schema discovery process and making sure that the data is read in the schema you want.
models/staging/raw_users.py
def create_dataframe(spark, start_time, end_time):
data = < load schema from a file>
schema = StructType.fromJson(json.loads(data))
df = (spark
.read
.schema(schema)
.format("json")
.load(f"{WAREHOUSE}/{YEAR}/{MONTH}/{DAY}/users/*.logs")
)
return df
The implementation of this feature consist of a new PysparkConnectionWrapper which executes sql statements via the spark context
def execute(self, sql, bindings=None):
self.result = self.spark.sql(sql)
And a new method on the SparkRelation which register pyspark dataframes as sql views.
def load_python_module(self, start_time, end_time):
path = self.meta.get('python_module')
module = importlib.import_module(path)
create_dataframe = getattr(module, "create_dataframe")
df = create_dataframe(spark, start_time, end_time)
df.createOrReplaceTempView(self.identifier)
Registration of pyspark views is initiated by a modified source macro
{% macro source(source_name, identifier, start_dt = None, end_dt = None) %}
{%- set relation = builtins.source(source_name, identifier) -%}
{%- if execute and (relation.source_meta.python_module or relation.meta.python_module) -%}
{%- do relation.load_python_module(start_dt, end_dt) -%}
{%- do return(relation.identifier) -%}
{% else -%}
{%- do return(relation) -%}
{% endif -%}
{% endmacro %}
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 3
- Comments: 22 (4 by maintainers)
Commits related to this issue
- support for pyspark connection method https://github.com/dbt-labs/dbt-spark/issues/305 — committed to cccs-jc/dbt-spark by deleted user 2 years ago
- support for pyspark connection method https://github.com/dbt-labs/dbt-spark/issues/305 — committed to cccs-jc/dbt-spark by deleted user 2 years ago
Sounds like a plan!
And the data frame is not used outside the class. I think it fits dbt’s standard workflow: after a cursor is closed not call any methods on the cursor anymore.