datafusion: Incorrect results in datafusion

Describe the bug I came upon a bug while querying my custom Parquet dataset, which causes DataFusion to produce incoherent and incorrect results.

I tested my dataset in various ways, all of which produced the desired results:

  • reading parquet files using python pandas, then merging and filtering the data there
  • encoding into CSV, and reading the data with DataFusion
  • creating an SQLite database using the provided CSV files, and using the same queries there

To Reproduce Steps to reproduce the behavior:

  1. Download all the code and data I used for testing:

issue_data.zip

Inside there are the Parquet files and CSVs with exactly the same data (also, there’s an sqlite database created from the provided CSV files).

  1. Use the instructions included in README.md to reproduce the issue:

The query, that fails when querying Parquet files with datafusion-cli:

-- 1. Distinct stop names
SELECT DISTINCT stop_name FROM stop INNER JOIN trip ON tid = trip_tid WHERE line = '176' ORDER BY stop_name NULLS LAST;

Change only in where from line to trip_line produces the desired results.

Expected behavior Should produce these 27 rows:

Bartnicza
Bazyliańska
Bolesławicka
Brzezińska
Budowlana
Choszczówka
Chłodnia
Daniszewska
Fabryka Pomp
Insurekcji
Marcelin
Marywilska-Las
Ołówkowa
PKP Płudy
PKP Żerań
Parowozowa
Pelcowizna
Polnych Kwiatów
Raciborska
Rembielińska
Sadkowska
Smugowa
Starego Dębu
Zyndrama z Maszkowic
os.Marywilska
Śpiewaków
None

Query 1 from README.md (mentioned above) produces this incorrect set of 33 rows:

+----------------------+
| stop_name            |
+----------------------+
| Bartnicza            |
| Bazyliańska          |
| Bolesławicka         |
| Brzezińska           |
| Budowlana            |
| Choszczówka          |
| Chłodnia             |
| Cygańska             |
| Czołgistów           |
| Daniszewska          |
| Fabryka Pomp         |
| Insurekcji           |
| Majerankowa          |
| Marcelin             |
| Marywilska-Las       |
| Ołówkowa             |
| PKP Falenica         |
| PKP Płudy            |
| PKP Żerań            |
| Parowozowa           |
| Pelcowizna           |
| Polnych Kwiatów      |
| Raciborska           |
| Rembielińska         |
| Rokosowska           |
| Sadkowska            |
| Smugowa              |
| Starego Dębu         |
| Zbójna Góra          |
| Zyndrama z Maszkowic |
| os.Marywilska        |
| Śpiewaków            |
|                      |
+----------------------+

Additional context Datafusion version:

$ datafusion-cli --version
DataFusion 5.1.0

My guess Since the Parquet files have encoded NULLs, and reading the CSV files with datafusion-cli gets rid of those, my best bet is on the usage of NULLs and some weir behavior when joining.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 5
  • Comments: 27 (24 by maintainers)

Most upvoted comments

@alamb this reads like a detective 🕵️‍♂️

The plot thickens! 🕵️

Regarding parquet predicate pruning, amusingly in this case, I think row group pruning actually helps avoid the problem. As you may recall, when a filter is applied like this

select * from stops_parquet where trip_tid=54788307;

The answer is correct (stop_name is "RONDO ZESŁAŃCÓW SYBERYJSKICH"):

+---------------------+----------+-----------+------------------------------+
| time                | trip_tid | trip_line | stop_name                    |
+---------------------+----------+-----------+------------------------------+
| 2021-11-15 00:00:00 | 54788307 | 186       | RONDO ZESŁAŃCÓW SYBERYJSKICH |
+---------------------+----------+-----------+------------------------------+

However, when I disable pruning then the wrong answer comes out!

+---------------------+----------+-----------+-----------+
| time                | trip_tid | trip_line | stop_name |
+---------------------+----------+-----------+-----------+
| 2021-11-15 00:00:00 | 54788307 | 186       | Armatnia  |
+---------------------+----------+-----------+-----------+

I added some debugging, and verified that the query does in fact skip several row groups:

Row Group[0], col  ReportStopRecord: pruned = true
Row Group[1], col  ReportStopRecord: pruned = false
Row Group[2], col  ReportStopRecord: pruned = false
Row Group[3], col  ReportStopRecord: pruned = false
Row Group[4], col  ReportStopRecord: pruned = false
Row Group[5], col  ReportStopRecord: pruned = false
Row Group[6], col  ReportStopRecord: pruned = false
Row Group[7], col  ReportStopRecord: pruned = false
Row Group[8], col  ReportStopRecord: pruned = false
Row Group[9], col  ReportStopRecord: pruned = true
Row Group[10], col  ReportStopRecord: pruned = false
Row Group[11], col  ReportStopRecord: pruned = false
Row Group[12], col  ReportStopRecord: pruned = false
Row Group[13], col  ReportStopRecord: pruned = false
Row Group[14], col  ReportStopRecord: pruned = false
Row Group[15], col  ReportStopRecord: pruned = false
Row Group[16], col  ReportStopRecord: pruned = false
Row Group[17], col  ReportStopRecord: pruned = false
Row Group[18], col  ReportStopRecord: pruned = false
Row Group[19], col  ReportStopRecord: pruned = false
Row Group[20], col  ReportStopRecord: pruned = false
Row Group[21], col  ReportStopRecord: pruned = false
Row Group[22], col  ReportStopRecord: pruned = false
Row Group[23], col  ReportStopRecord: pruned = false
Row Group[24], col  ReportStopRecord: pruned = false
Row Group[25], col  ReportStopRecord: pruned = false
Row Group[26], col  ReportStopRecord: pruned = false
Row Group[27], col  ReportStopRecord: pruned = false
Row Group[28], col  ReportStopRecord: pruned = false
Row Group[29], col  ReportStopRecord: pruned = false
Row Group[30], col  ReportStopRecord: pruned = false
Row Group[31], col  ReportStopRecord: pruned = false
Row Group[32], col  ReportStopRecord: pruned = true
Row Group[33], col  ReportStopRecord: pruned = false
Row Group[34], col  ReportStopRecord: pruned = true

I reran the queries from @franeklubi at 2e918184c502a40a041fb0163702cb6ab8de0af9:

Setup

CREATE EXTERNAL TABLE stop (time TEXT, trip_tid TEXT, trip_line TEXT, stop_name TEXT) STORED AS CSV WITH HEADER ROW LOCATION '/Users/alamb/Downloads/issue_data/csvs/stop.csv';
CREATE EXTERNAL TABLE trip (tid TEXT, line TEXT, base_day TEXT) STORED AS CSV WITH HEADER ROW LOCATION '/Users/alamb/Downloads/issue_data/csvs/trip.csv';

CREATE EXTERNAL TABLE stop_parquet STORED AS PARQUET LOCATION '/Users/alamb/Downloads/issue_data/parquets/stops/';
CREATE EXTERNAL TABLE trip_parquet STORED AS PARQUET LOCATION '/Users/alamb/Downloads/issue_data/parquets/trips/';

Now the results are consistent when using csv and parquet:

Parquet

❯ SELECT DISTINCT stop_name FROM stop_parquet INNER JOIN trip_parquet ON tid = trip_tid WHERE line = '176' ORDER BY stop_name NULLS LAST;
+----------------------+
| stop_name            |
+----------------------+
| Bartnicza            |
| Bazyliańska          |
| Bolesławicka         |
| Brzezińska           |
| Budowlana            |
| Choszczówka          |
| Chłodnia             |
| Daniszewska          |
| Fabryka Pomp         |
| Insurekcji           |
| Marcelin             |
| Marywilska-Las       |
| Ołówkowa             |
| PKP Płudy            |
| PKP Żerań            |
| Parowozowa           |
| Pelcowizna           |
| Polnych Kwiatów      |
| Raciborska           |
| Rembielińska         |
| Sadkowska            |
| Smugowa              |
| Starego Dębu         |
| Zyndrama z Maszkowic |
| os.Marywilska        |
| Śpiewaków            |
|                      |
+----------------------+
27 rows in set. Query took 0.042 seconds.

Csv

❯ SELECT DISTINCT stop_name FROM stop INNER JOIN trip ON tid = trip_tid WHERE line = '176' ORDER BY stop_name NULLS LAST;
+----------------------+
| stop_name            |
+----------------------+
|                      |
| Bartnicza            |
| Bazyliańska          |
| Bolesławicka         |
| Brzezińska           |
| Budowlana            |
| Choszczówka          |
| Chłodnia             |
| Daniszewska          |
| Fabryka Pomp         |
| Insurekcji           |
| Marcelin             |
| Marywilska-Las       |
| Ołówkowa             |
| PKP Płudy            |
| PKP Żerań            |
| Parowozowa           |
| Pelcowizna           |
| Polnych Kwiatów      |
| Raciborska           |
| Rembielińska         |
| Sadkowska            |
| Smugowa              |
| Starego Dębu         |
| Zyndrama z Maszkowic |
| os.Marywilska        |
| Śpiewaków            |
+----------------------+
27 rows in set. Query took 0.116 seconds.

Interestingly, the CSV results don’t seem to have the NULLS LAST 🤔

I tested the fix from @yordan-pavlov in https://github.com/apache/arrow-rs/pull/1130 against my reproducer and with #1130 it now gets the correct answer ❤️ so that seems like progress

FYI https://github.com/apache/arrow-rs/pull/1110 runs into a similar issue, that appears to be fixed by switching to ComplexObjectArrayReader instead of ArrowArrayReader. I will have a poke around tomorrow if I have time, and see if I can spot what is going wrong.

Oh weird, but then a direct query

select * from stops_parquet where trip_tid=54788307;
+---------------------+----------+-----------+------------------------------+
| time                | trip_tid | trip_line | stop_name                    |
+---------------------+----------+-----------+------------------------------+
| 2021-11-15 00:00:00 | 54788307 | 186       | RONDO ZESŁAŃCÓW SYBERYJSKICH |
+---------------------+----------+-----------+------------------------------+

Seems to get the correct answer 🤔

hi @franeklubi thanks for the detailed sharing.

to simplify bug reproduction, can you help me understand the difference between parquet and csv data? specifically:

❯ CREATE EXTERNAL TABLE stop STORED AS PARQUET LOCATION './parquets/stops';
0 rows in set. Query took 0.001 seconds.
❯ select count(*) from stop;
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 33254           |
+-----------------+
1 row in set. Query took 0.007 seconds.
❯ CREATE EXTERNAL TABLE stop (time TEXT, trip_tid TEXT, trip_line TEXT, stop_name TEXT) STORED AS CSV LOCATION './csvs/stop.csv';
0 rows in set. Query took 0.000 seconds.
❯ select count(*) from stop;
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 33255           |
+-----------------+
1 row in set. Query took 0.014 seconds.

they seem to have different number of rows.

Same thing applies to trips data.

You could try using ComplexObjectArrayReader to decode the column instead of ArrowArrayReader. This might help narrow down if the bug lies in the RLE decoding or something higher up in ArrowArrayReader. Alternatively you could see if the issue occurs with https://github.com/apache/arrow-rs/pull/1082 which replaces ArrowArrayReader with an alternative that shares more with the PrimitiveArrayReader/ComplexObjectArrayReader implementations, again this might help narrow down the origin.

Not at a computer at the moment, otherwise would try myself

I will probably file a ticket in arrow-rs shortly with the slimmed down reproducer

This discrepancy looks like it comes out of VariableLenDictionaryDecoder which seems to have been introduced by @yordan-pavlov in https://github.com/apache/arrow-rs/pull/384

I have not studied the code enough yet to fully understand what it is doing, and I need to attend to some other items now. If anyone has ideas (cc @tustvold ) on where to look next I would appreciate it

I have a smaller reproducer and am trying to narrow down where the problem is but probably won’t be able to work on this until next week sometime at the earliest

In case anyone else is interested, here is the repo: repro.zip

cargo run --bin datafusion-cli -- -f repro.sql | grep 54788307

It should print out

| 2021-11-15 00:00:00 | 54788307 | 186       | RONDO ZESŁAŃCÓW SYBERYJSKICH |

But actually prints out

| 2021-11-15 00:00:00 | 54788307 | 186       | Armatnia

I spent some time looking at the parquet data and the csv data, and it looks to me like there may be something wrong with the parquet reader.

Specifically, I just ran a query that did a select *

dump_parquet.sql:

CREATE EXTERNAL TABLE stops_parquet
STORED AS PARQUET
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/parquets/stops';

show columns from stops_parquet;

CREATE EXTERNAL TABLE trips_parquet
STORED AS PARQUET
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/parquets/trips';

show columns from trips_parquet;

select * from stops_parquet order by time, trip_tid, trip_line, stop_name;
select * from trips_parquet order by tid, line, base_day;

and dump_csv.sql:

CREATE EXTERNAL TABLE stops_csv (time timestamp, trip_tid bigint, trip_line TEXT, stop_name TEXT)
STORED AS CSV WITH HEADER ROW
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/csvs/stop.csv';

show columns from stops_csv;

CREATE EXTERNAL TABLE trips_csv (tid bigint, line TEXT, base_day date)
STORED AS CSV WITH HEADER ROW
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/csvs/trip.csv';

show columns from trips_csv;

select * from stops_csv order by time, trip_tid, trip_line, stop_name;
select * from trips_csv order by tid, line, base_day;

Like this:

~/Software/arrow-datafusion/target/debug/datafusion-cli -f dump_csv.sql  > dump_csv.txt
~/Software/arrow-datafusion/target/debug/datafusion-cli -f dump_parquet.sql  > dump_parquet.txt

The results are here: dump_csv.txt dump_parquet.txt

And a quick visual diff shows they aren’t the same

The first few lines of dump_csv.txt look like

+---------------------+----------+-----------+------------------------------+
| time                | trip_tid | trip_line | stop_name                    |
+---------------------+----------+-----------+------------------------------+
| 2021-11-15 05:00:00 | 54761677 | N64       |                              |
| 2021-11-15 05:00:00 | 54778942 | 204       |                              |
| 2021-11-15 05:00:00 | 54788307 | 186       | RONDO ZESŁAŃCÓW SYBERYJSKICH |
| 2021-11-15 05:00:00 | 54788967 | N41       |                              |
| 2021-11-15 05:00:00 | 54788988 | N41       |                              |
| 2021-11-15 05:00:00 | 54802937 | 104       |                              |

While the first few lines of dump_parquet.txt l look like:

+---------------------+----------+-----------+------------------------------+
| time                | trip_tid | trip_line | stop_name                    |
+---------------------+----------+-----------+------------------------------+
| 2021-11-15 00:00:00 | 54761677 | N64       |                              |
| 2021-11-15 00:00:00 | 54778942 | 204       |                              |
| 2021-11-15 00:00:00 | 54788307 | 186       | Armatnia                     |
| 2021-11-15 00:00:00 | 54788967 | N41       |                              |
| 2021-11-15 00:00:00 | 54788988 | N41       |                              |
| 2021-11-15 00:00:00 | 54802937 | 104       |                              |

(note that the stop name is different)

However, when I look for that mismatched line trip_tid=54788307 in the data using pandas it does match with the csv:

Here is the raw data in csv:

$ grep 54788307 issue_data/csvs/stop.csv
2021-11-15 00:00:00,54788307,186,RONDO ZESŁAŃCÓW SYBERYJSKICH

Here is what comes out when using pandas:

Python 3.8.12 (default, Oct 13 2021, 06:42:42) 
[Clang 13.0.0 (clang-1300.0.29.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pandas as pd
import pandas as pd
>>> df = pd.read_parquet('issue_data/parquets/stops/2021-11.parquet')
df = pd.read_parquet('issue_data/parquets/stops/2021-11.parquet')
>>> df.to_csv('/tmp/2021-11.csv')
df.to_csv('/tmp/2021-11.csv')
>>> 
$ grep 54788307 /tmp/2021-11.csv 
16591,2021-11-15 00:00:00,54788307.0,186,RONDO ZESŁAŃCÓW SYBERYJSKICH

I’ll try and check this out in more detail tomorrow. Thanks for the investigation @Jimexist

seems like these values are absent in parquet readings:

❯ select stop_name from stop_csv except select stop_name from stop_parquet order by stop_name;
+------------------------+
| stop_name              |
+------------------------+
|                        |
| CH Promenada           |
| Centrum                |
| Ciołkosza              |
| Kanał Gocławski        |
| Marszałkowska          |
| Marysin                |
| Metro Politechnika     |
| Metro Świętokrzyska    |
| Odkryta                |
| Okularowa              |
| Poligonowa             |
| Przyczółek Grochowski  |
| Rezedowa               |
| Rozbrat                |
| Saska                  |
| Zajezdnia Ostrobramska |
| Zamieniecka            |
| pl.Bankowy             |
| pl.Konstytucji         |
| pl.Na Rozdrożu         |
| stop_name              |
| Łysakowska             |
+------------------------+
23 rows in set. Query took 0.019 seconds.

and the scanning process was invalid:

❯ select distinct stop_name from stop_parquet where stop_name = 'Odkryta';
+-----------+
| stop_name |
+-----------+
| Odkryta   |
+-----------+
1 row in set. Query took 0.007 seconds.

notice the Odkryta row would be missing in the statement below.

❯ select distinct stop_name from stop_parquet;
+------------------------------+
| stop_name                    |
+------------------------------+
|                              |
| Bystra                       |
| Dobosza                      |
| Urbanistów                   |
| Pelcowizna                   |
| PUSTELNIK                    |
| Strzeleckiego                |
| Bełdan                       |
| Kijowska                     |
| Świątynia Opatrzności Bożej  |
| Białołęka-Ratusz             |
| Armatnia                     |
| Muranowska                   |
| Rokosowska                   |
| Choszczówka                  |
| os.Marywilska                |
| Metro Stokłosy               |
| Leśnej Polanki               |
| Ćwiklińskiej                 |
| Nowodwory                    |
| Stawki                       |
| PKP Falenica                 |
| Bazyliańska                  |
| Bartnicza                    |
| ARMATNIA                     |
| Metro Ratusz-Arsenał         |
| Ćmielowska                   |
| Myśliborska                  |
| Oś Królewska                 |
| RONDO ZESŁAŃCÓW SYBERYJSKICH |
| pl.Inwalidów                 |
| Opaczewska                   |
| Rudzka                       |
| Nowolipie                    |
| Polnych Kwiatów              |
| PKP Żerań                    |
| Sarmacka                     |
| rondo Zesłańców Syberyjskich |
| Leszno                       |
| Gorzykowska                  |
| Miła                         |
| pl.Narutowicza               |
| pl.Zawiszy                   |
| Majerankowa                  |
| Daniszewska                  |
| PKP Olszynka Grochowska      |
| Stare Miasto                 |
| Cm.Wolski                    |
| Metro Stadion Narodowy       |
| Małych Dębów                 |
| Marywilska-Las               |
| Powsinek                     |
| Branickiego                  |
| Vogla                        |
| Olesin                       |
| Ceramiczna                   |
| CH Marki                     |
| Dw.Wileński                  |
| Hala Kopińska                |
| Inflancka                    |
| Klaudyny                     |
| Chłodna                      |
| Zbójna Góra                  |
| Czołgistów                   |
| Sadkowska                    |
| Insurekcji                   |
| Brzezińska                   |
| Ołówkowa                     |
| Smugowa                      |
| Dąbrówka Wiślana             |
| Żerań FSO                    |
| Kino Femina                  |
| gen.Zajączka                 |
| Wola-Ratusz                  |
| Bohomolca                    |
| Chłodnia                     |
| Starego Dębu                 |
| Raciborska                   |
| Rembielińska                 |
| Wałbrzyska-Cmentarz          |
| EC Żerań                     |
| os.Potok                     |
| Metro Księcia Janusza        |
| Dw.Gdański                   |
| Mennica                      |
| Norblin                      |
| Sienna                       |
| Gwiaździsta                  |
| Sobocka                      |
| Marymont-Potok               |
| Śpiewaków                    |
| Fabryka Pomp                 |
| METRO RATUSZ-ARSENAŁ         |
| Osiedle                      |
| Szczęśliwice                 |
| Szwedzka                     |
| pl.Starynkiewicza            |
| Wawelska                     |
| Dzika                        |
| Budowlana                    |
| PKP Płudy                    |
| Bolesławicka                 |
| Marcelin                     |
| Milenijna                    |
| Park Praski                  |
| Wolności                     |
| Smocza                       |
| pl.Wilsona                   |
| Cygańska                     |
| Parowozowa                   |
| Zyndrama z Maszkowic         |
| Parafialna                   |
+------------------------------+
112 rows in set. Query took 0.008 seconds.

Given the investigation above I believe this might be related to the single distinct to group by optimization or the hash aggregation steps.

cc @Dandandan @houqp