great_expectations: Memory leak when using GX in Azure Databricks
Describe the bug We are using GX to validate a relevant number of dataframes in parallel on Azure Databricks. Every time a new table is validated, memory usage increases and is never released. Eventually, when the driver’s memory fills up, the process fails. (Even if the workers have free memory). At the moment, the only temporary solution found is to reset the cluster from time to time.
To Reproduce Steps to reproduce the behavior:
- Execute multiples validations simultaneously (f.e 1k dataframes in batches of 30. 2 validations for each dataframe) The code below is a parametrized notebook executed N times (N dataframes), in our particular case, launched from Azure Data Factory. It validates if the new data has some similarity with the previous day.
dbutils.widgets.text("table_name",'None')
dbutils.widgets.text('execution_date', 'None')
table_name = dbutils.widgets.get("table_name")
execution_date = dbutils.widgets.get('execution_date')
import datetime as dt
import json
import datetime
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
context = gx.get_context()
dataframe_datasource = context.sources.add_or_update_spark(
name="Spark_in_memory_datasource",
)
date = dt.datetime.strptime(execution_date, '%Y-%m-%d').date()
yesterday_date = date - dt.timedelta(days=1)
yesterday_date_str =yesterday_date.strftime('%Y-%m-%d')
year, month, day = execution_date.split('-')
df = spark.read.parquet(f"/mnt/data/landing/{table_name}/{year}/{month}/{day}/*") #read parquet
yesterdayYear, yesterdayMonth, yesterdayDay = yesterday_date_str.split('-')
previousDf = spark.read.parquet(f"/mnt/data/landing/{table_name}/{yesterdayYear}/{yesterdayMonth}/{yesterdayDay}/*") #read parquet
dataframe_asset = dataframe_datasource.add_dataframe_asset(
name=getRunId(),
dataframe=df,
)
batch_request = dataframe_asset.build_batch_request()
expectation_suite_name = f"{datetime.datetime.utcnow().strftime('%d%m%Y%H%M%S')}_valTest_test"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
df_ge = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
YesterdayRows = previousDf.count()
df_ge.expect_table_row_count_to_be_between(min_value = YesterdayRows * 0.9, max_value = YesterdayRows * 1.1)
yesterdayColumns = previousDf.columns
df_ge.expect_table_columns_to_match_set(column_set = yesterdayColumns, exact_match = True)
df_ge.save_expectation_suite(discard_failed_expectations=False)
my_checkpoint_name = expectation_suite_name + "_checkpoint"
checkpoint = SimpleCheckpoint(
name = my_checkpoint_name,
config_version=1.0,
class_name="SimpleCheckpoint",
run_name_template= "%Y%m%d-%H%M%S-" + my_checkpoint_name,
data_context=context,
)
context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = context.run_checkpoint(
checkpoint_name=my_checkpoint_name,
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
)
- Check Ganglia UI. See how memory is never released. https://learn.microsoft.com/es-es/azure/databricks/clusters/clusters-manage#ganglia-metrics
Expected behavior Memory should not be increased indefinitely unitl validations are not completed (is what it seems to be happening when running expectations). Memory should be released as the validations are completed so the memory remains constant or is decreased.
Environment (please complete the following information):
- Operating System: Databricks Runtime 12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12).
- Great Expectations Version: [0.16.12] (same issue with previous versions)
Additional context
Current behavior:
As we can see in the image shown below, memory is not released when validations are completed (when CPU usage is lower)
If the allocated memory in the driver is not enough, it will fail (Swap). When cluster is Idle, memory is not released.
Expected behavior:
As we can see in the image shown below (sample from an standard data ingestion process), memory is released or stays constant. If extra CPU/Memory is needed due to extra workload, data and computation is distributed across workers.
Solution Attempts We have enabled garbage collector and manually executed it. Deleted multiple variables like context or dataframe_datasource apart from loaded dataframes. Clear cache. Try a sequential execution. Simplify notebook (the code provided is already the simplified version).
Nothing worked. (Just restarting cluster)
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 23 (10 by maintainers)
@gerardperezbismart Thank you very much for the code examples – it looks like we are not even passing the dataframe, and the memory problem happens. That tells me that we should be able to reproduce locally. If we cannot and/or cannot reproduce in Databricks with AWS, then it is possible that the problem rests with the Azure backing of Databricks – we do not know at this point, but we will start diagnosing by experimenting/troubleshooting locally. We will keep you posted. Thanks again.