delta-rs: Broken filter for newly created delta table

Environment

Delta-rs version: โ€˜0.15.2โ€™

Binding: python

Environment:

  • OS: Mac

Bug

When creating a new delta table from a pandas dataframe, it appears that the filter predicate is broken for some expression

What happened: .to_pandas() and .to_pyarrow_dataset() return 0 data

What you expected to happen: The above functions should return the data reflected in the filter predicate

How to reproduce it: This is a large dataset that I cannot share, but please point me in any directions for how to debug this.

This is how I achieved my current results

from pyarrow import dataset as ds
from pyarrow.parquet import ParquetFile, ParquetDataset
import pyarrow.compute as pc
from deltalake import write_deltalake, convert_to_deltalake, DeltaTable
import pandas as pd

df = "some_internal_data"

write_deltalake("broken_nlrot.delta", df)

dt_broken = DeltaTable("broken_nlrot.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("terminal") == "NLROTTM")).shape
# returns (0, 19)

# Trying to read strait with pyarrow
ds.dataset("broken_nlrot.delta").to_table(filter=(pc.field("terminal") == "NLROTTM")).shape
# returns (773536, 19) which is the expected result

#similarly to_pandas also does not seem to work 
dt_broken.to_pandas(filters=[("terminal", "==", "NLROTTM")]).shape 
# returns (0,19)

# again with pyarrow we are fine
ParquetDataset("broken_nlrot.delta", filters=[("terminal", "==", "NLROTTM")]).read().shape
# returns (773536, 19) which is the expected result

#oddly enough it seems to work when I choose a different filter predicate I dont get zero rows, but still wrong resulst
dt_broken.to_pandas(filters=[("terminal", "==", "USSAVGC")]).shape
# (89406, 19) - Wrong

ds.dataset("broken_nlrot.delta").to_table(filter=(pc.field("terminal") == "USSAVGC")).shape
# (420029, 19) - Correct

# pulling the full data in pandas also seems to work
full = dt_broken.to_pandas()
full[full["terminal"] == "NLROTTM"].shape
# (773536, 19)

full[full["terminal"] == "USSAVGC"].shape
# (420029, 19)


# calling optimize seem to fix the problem, but this should not be needed in order for filters to work
dt.optimize.z_order(["terminal"])
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("terminal") == "NLROTTM")).shape
# returns (773536, 19)


Since this is returning partially correct results, I suspect maybe some row_group statistics being wrong, but then I would assume the calls from pyarrow would also return incorrect results

About this issue

  • Original URL
  • State: closed
  • Created 5 months ago
  • Comments: 15

Commits related to this issue

Most upvoted comments

@Hanspagh not sure, I think they originated from some defaults databricks does with spark-delta.

Fix is incoming btw

@Hanspagh found the culprit, there seems to be an empty row group in the parquet. Our function get_file_stats_from_metadata is checking whether the stats are set for each row group, but in this case the last row group is empty and has no stats set, so itโ€™s skipping to set stats

@Hanspagh I see the issue, the stats are empty on the add action for one of the files, will have to check why they are empty now and not before : )

Edit: Actually they were missing before as well, itโ€™s that now this results in a part expression which thinks there are null values ๐Ÿ˜•

So I managed to reproduce this. It only happens with large dataset 10_485_761 seems to be the magic number, I tried both with pyarrow 15, 13, 12 10, 9. With pyarrow 8 the procces seems to hang when I try to save a frame this big.

It looks as the filter overflows and only returns the rows that large than 10_485_760, since we get 1 for 10_485_761 and 2 for 10_485_762

10_485_760 is also oddly close to 1024**2 = 1_048_576

I hope this helps to figure out what is going on here. Let me know if you want me to provide more details

df = pd.DataFrame({"data": ["B"] * 10_485_760 })
write_deltalake("sample.delta", df, mode="overwrite")
dt_broken = DeltaTable("sample.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (10485760, 1)

df = pd.DataFrame({"data": ["B"] * 10_485_761 })
write_deltalake("sample.delta", df, mode="overwrite")
dt_broken = DeltaTable("sample.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (1, 1)

df = pd.DataFrame({"data": ["B"] * 10_485_762 })
write_deltalake("sample.delta", df, mode="overwrite")
dt_broken = DeltaTable("sample.delta")
dt_broken.to_pyarrow_dataset().to_table(filter=(pc.field("data") == "B")).shape
# (2, 1)