dask: dataframe merge returns incorrect result when data is from multi-partition parquet
Similar to issue #2237 , I am seeing dask dataframe merge result can be incorrect when data is from multi-partition parquet. pandas merge always produces correct results.
I have put my test data in https://github.com/ningsean/dask_merge_issue I am running a local cluster at localhost:9898
My setup is Python 3.6.1, with the following module versions.
dask.__version__
'0.15.2+25.ga1a75a7'
numpy.__version__
'1.13.1'
partd.__version__
'0.3.8'
pandas.__version__
'0.20.1'
Here is the code that shows the problem.
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
client = Client("localhost:9898")
ddf1 = dd.read_parquet("keys_133m.parq")
ddf1.columns
#Index(['KEY'], dtype='object')
ddf1.dtypes
#KEY int64
ddf2 = dd.read_parquet("key_ids_133m.parq")
ddf2.columns
#Index(['KEY', 'ID'], dtype='object')
ddf2.dtypes
#KEY int64
#ID int64
pandas inner merge result has about 500k rows
df = ddf1.compute().merge(ddf2.compute(),how='inner',on='KEY')
df.shape
#(505847, 2)
dask inner merge has only about 272k rows
ddf = ddf1.merge(ddf2,how='inner',on='KEY')
len(ddf)
#272051
One example of missing row in ddf is KEY==135337982:
#dask merge result does not contain the row
ddf.loc[ddf.KEY==135337982].compute()
#Empty DataFrame
#Columns: [KEY, ID]
#Index: []
#pandas merge result contains the row
df.loc[df.KEY==135337982]
# KEY ID
#122842 135337982 149107799
Single-partition parquet works
#use nlargest to write to a single-partition parquet
ddf1.nlargest(n=505850,columns='KEY').to_parquet('keys_one_partition.parq')
ddf2.nlargest(n=1960255,columns='KEY').to_parquet('key_ids_one_partition.parq')
ddf1_one = dd.read_parquet('keys_one_partition.parq')
ddf2_one = dd.read_parquet('key_ids_one_partition.parq')
ddf_one = ddf1_one.merge(ddf2_one,how='inner',on='KEY')
len(ddf_one)
#505847
df_one = ddf1_one.compute().merge(ddf2_one.compute(),how='inner',on='KEY')
df_one.shape
#(505847, 2)
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 21 (14 by maintainers)
Commits related to this issue
- Update README.md Link to issue https://github.com/dask/dask/issues/2730 — committed to ningsean/dask_merge_issue by ningsean 7 years ago
- Mod before truncate in shufle_group Fixes #2730 This caused some rows to be missed during task-based shuffles — committed to mrocklin/dask by mrocklin 6 years ago
- Mod before truncate in shufle_group Fixes #2730 This caused some rows to be missed during task-based shuffles — committed to mrocklin/dask by mrocklin 6 years ago
- Mod before truncate in shufle_group Fixes #2730 This caused some rows to be missed during task-based shuffles — committed to mrocklin/dask by mrocklin 6 years ago
- Mod before truncate in shufle_group (#3201) * Mod before truncate in shufle_group Fixes #2730 This caused some rows to be missed during task-based shuffles — committed to dask/dask by mrocklin 6 years ago
See https://github.com/dask/dask/pull/3201 for a fix
On Fri, Feb 23, 2018 at 9:47 AM, Hussain Sultan notifications@github.com wrote:
yea i will play around some more to reproduce this with synthetic data
Unfortunately we won’t be able to include these files in tests. We really do need something that can be generated automatically.
@mrocklin i am working on creating a reproducible example. I haven’t been able to reproduce this issue outside of the production environment though. Will update this thread.
@mrocklin @jcrist Anybody actively working on this one? Just ran into this too.