dask: to_parquet failing on pyarrow master when different partitions result in different parquet schema
In pyarrow master, we made a change to append_row_groups to require equal schema’s when appending (https://issues.apache.org/jira/browse/ARROW-8062,
https://github.com/apache/arrow/pull/7180). But that apparently causes test failures in the dask test suite:
2020-05-26T06:57:29.5507643Z =================================== FAILURES ===================================
2020-05-26T06:57:29.5509731Z _ test_to_parquet_pyarrow_w_inconsistent_schema_by_partition_fails_by_default __
2020-05-26T06:57:29.5509974Z
2020-05-26T06:57:29.5512116Z tmpdir = local('/tmp/pytest-of-root/pytest-0/test_to_parquet_pyarrow_w_inco0')
2020-05-26T06:57:29.5512377Z
2020-05-26T06:57:29.5514934Z def test_to_parquet_pyarrow_w_inconsistent_schema_by_partition_fails_by_default(tmpdir):
2020-05-26T06:57:29.5515252Z check_pyarrow()
2020-05-26T06:57:29.5515584Z
2020-05-26T06:57:29.5515752Z df = pd.DataFrame(
2020-05-26T06:57:29.5515998Z {"partition_column": [0, 0, 1, 1], "strings": ["a", "b", None, None]}
2020-05-26T06:57:29.5516182Z )
2020-05-26T06:57:29.5516369Z
2020-05-26T06:57:29.5516616Z ddf = dd.from_pandas(df, npartitions=2)
2020-05-26T06:57:29.5516806Z > ddf.to_parquet(str(tmpdir), engine="pyarrow", partition_on=["partition_column"])
2020-05-26T06:57:29.5516894Z
2020-05-26T06:57:29.5519255Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/io/tests/test_parquet.py:965:
2020-05-26T06:57:29.5519532Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-05-26T06:57:29.5520327Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/core.py:3811: in to_parquet
2020-05-26T06:57:29.5525402Z return to_parquet(self, path, *args, **kwargs)
2020-05-26T06:57:29.5528566Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py:461: in to_parquet
2020-05-26T06:57:29.5529800Z out = out.compute(**compute_kwargs)
2020-05-26T06:57:29.5530394Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/base.py:166: in compute
2020-05-26T06:57:29.5530739Z (result,) = compute(self, traverse=False, **kwargs)
2020-05-26T06:57:29.5531158Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/base.py:444: in compute
2020-05-26T06:57:29.5539865Z results = schedule(dsk, keys, **kwargs)
2020-05-26T06:57:29.5543266Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/threaded.py:84: in get
2020-05-26T06:57:29.5545361Z **kwargs
2020-05-26T06:57:29.5547293Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/local.py:486: in get_async
2020-05-26T06:57:29.5549251Z raise_exception(exc, tb)
2020-05-26T06:57:29.5550734Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/local.py:316: in reraise
2020-05-26T06:57:29.5552431Z raise exc
2020-05-26T06:57:29.5553073Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/local.py:222: in execute_task
2020-05-26T06:57:29.5553366Z result = _execute_task(task, data)
2020-05-26T06:57:29.5553764Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/core.py:121: in _execute_task
2020-05-26T06:57:29.5553938Z return func(*(_execute_task(a, cache) for a in args))
2020-05-26T06:57:29.5554268Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/utils.py:30: in apply
2020-05-26T06:57:29.5554462Z return func(*args, **kwargs)
2020-05-26T06:57:29.5554815Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py:763: in write_metadata
2020-05-26T06:57:29.5554989Z _meta.append_row_groups(parts[i][0]["meta"])
2020-05-26T06:57:29.5555156Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-05-26T06:57:29.5555299Z
2020-05-26T06:57:29.5555451Z > ???
2020-05-26T06:57:29.5555610Z E RuntimeError: AppendRowGroups requires equal schemas.
2020-05-26T06:57:29.5555722Z
2020-05-26T06:57:29.5555859Z pyarrow/_parquet.pyx:687: RuntimeError
2020-05-26T06:57:29.5556038Z ___________________ test_timeseries_nulls_in_schema[pyarrow] ___________________
2020-05-26T06:57:29.5556155Z
2020-05-26T06:57:29.5556469Z tmpdir = local('/tmp/pytest-of-root/pytest-0/test_timeseries_nulls_in_schem0')
2020-05-26T06:57:29.5556765Z engine = 'pyarrow'
2020-05-26T06:57:29.5556881Z
2020-05-26T06:57:29.5557045Z def test_timeseries_nulls_in_schema(tmpdir, engine):
2020-05-26T06:57:29.5557218Z # GH#5608: relative path failing _metadata/_common_metadata detection.
2020-05-26T06:57:29.5557373Z tmp_path = str(tmpdir.mkdir("files"))
2020-05-26T06:57:29.5557554Z tmp_path = os.path.join(tmp_path, "../", "files")
2020-05-26T06:57:29.5557698Z
2020-05-26T06:57:29.5557871Z ddf2 = (
2020-05-26T06:57:29.5558194Z dask.datasets.timeseries(start="2000-01-01", end="2000-01-03", freq="1h")
2020-05-26T06:57:29.5558363Z .reset_index()
2020-05-26T06:57:29.5558526Z .map_partitions(lambda x: x.loc[:5])
2020-05-26T06:57:29.5558701Z )
2020-05-26T06:57:29.5558847Z ddf2 = ddf2.set_index("x").reset_index().persist()
2020-05-26T06:57:29.5559172Z ddf2.name = ddf2.name.where(ddf2.timestamp == "2000-01-01", None)
2020-05-26T06:57:29.5559356Z
2020-05-26T06:57:29.5559509Z > ddf2.to_parquet(tmp_path, engine=engine)
2020-05-26T06:57:29.5559623Z
2020-05-26T06:57:29.5560119Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/io/tests/test_parquet.py:2011:
2020-05-26T06:57:29.5560347Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-05-26T06:57:29.5560754Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/core.py:3811: in to_parquet
2020-05-26T06:57:29.5561517Z return to_parquet(self, path, *args, **kwargs)
2020-05-26T06:57:29.5562388Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py:461: in to_parquet
2020-05-26T06:57:29.5562518Z out = out.compute(**compute_kwargs)
2020-05-26T06:57:29.5562799Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/base.py:166: in compute
2020-05-26T06:57:29.5562906Z (result,) = compute(self, traverse=False, **kwargs)
2020-05-26T06:57:29.5563189Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/base.py:444: in compute
2020-05-26T06:57:29.5563309Z results = schedule(dsk, keys, **kwargs)
2020-05-26T06:57:29.5563699Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/threaded.py:84: in get
2020-05-26T06:57:29.5563815Z **kwargs
2020-05-26T06:57:29.5564074Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/local.py:486: in get_async
2020-05-26T06:57:29.5564193Z raise_exception(exc, tb)
2020-05-26T06:57:29.5564468Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/local.py:316: in reraise
2020-05-26T06:57:29.5564589Z raise exc
2020-05-26T06:57:29.5564881Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/local.py:222: in execute_task
2020-05-26T06:57:29.5565003Z result = _execute_task(task, data)
2020-05-26T06:57:29.5565286Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/core.py:121: in _execute_task
2020-05-26T06:57:29.5565410Z return func(*(_execute_task(a, cache) for a in args))
2020-05-26T06:57:29.5565688Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/utils.py:30: in apply
2020-05-26T06:57:29.5565789Z return func(*args, **kwargs)
2020-05-26T06:57:29.5566173Z opt/conda/envs/arrow/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py:763: in write_metadata
2020-05-26T06:57:29.5566299Z _meta.append_row_groups(parts[i][0]["meta"])
2020-05-26T06:57:29.5566410Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-05-26T06:57:29.5566474Z
2020-05-26T06:57:29.5566570Z > ???
2020-05-26T06:57:29.5566675Z E RuntimeError: AppendRowGroups requires equal schemas.
2020-05-26T06:57:29.5566733Z
2020-05-26T06:57:29.5566831Z pyarrow/_parquet.pyx:687: RuntimeError
I when doing FileMetadata.append_row_groups, it seems to make sense that we require the schemas to be the same, because afterwards there is only a single schema.
Looking at the failures in dask, it are cases where the different partitions of the dask dataframe result in a different pyarrow/parquet schema (and specifically a partition with all nulls in an object dtype (string) column).
Do you see a solution for this on the dask side? (in principle there is the meta dtypes, but here of course it’s the typical problem that “object” dtype is not specific enough …).
Or should we revert back to being less strict on Arrow’s side?
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 40 (38 by maintainers)
So the first test is specifying a
dataset={"validate_schema": True},inread_parquet, and the second test is not. So that clarifies the difference.Thus, forget my “insight” from above. Such datasets written with incosistent schemas in the different parquet files, right now read fine by default. This was actually fixed by @rjzamora in https://github.com/dask/dask/pull/5307, by ensuring to pass
validate_schema=Falsewhen not specified by the user (with similar comments about the problem of object dtype partitions with all nulls … https://github.com/dask/dask/pull/5307#issuecomment-525787069)