cudf: [BUG] dask-cudf, `map_partitions()`, `ValueError: The columns in the computed data do not match the columns in the provided metadata`

Describe the bug cuGraph has a case where multiple graph objects, created one after the other, result in an error when the amount of data used is small enough to result in empty partitions. The error seems to occur when a map_partitions() call is used to copy the dataframe in each partition, but it’s still unknown if that call leads to the root cause. The error is shown in the mre_output.txt attached file below. In the MRE, after the first “graph” object is created, a subsequent call to create another fails with:

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['src', 'dst', 'value']
  Missing: []

The code, which is based on actual cugraph code to create a graph object, creates a copy of the input dask DataFrame, modifies it (which includes copying and dropping columns), then saves it for use later. It appears that the second call is somehow referencing DataFrames modified by the first call, resulting in the error.

Steps/Code to reproduce bug An attempt at a minimal reproducer is attached to this issue. It is based on code used in cuGraph, but I suspect it is not truly minimal. We will continue to work on minimizing it, but we’ll provide it now to hopefully get the investigation started.

To run the minimal reproducer, simply run CUDA_VISIBLE_DEVICES=0 python dask_cudf_mre.py. We’ve been running it restricted to a single GPU to simulate the CI environment where the failure is occurring.

mre_output.txt

Expected behavior The reproducer script should run to completion without an exception being raised. This can be seen when any of the following are changed in the script:

  • Remove this call: ddf = ddf.map_partitions(lambda df: df.copy())
  • Replace the call to map_paritions() above with ddf = ddf.copy() NOTE: for some reason, this does not help when done in cugraph.
  • Add a call to explicitly delete the G1 object (del G1) before creating G2
  • Create the input dataframe from a cudf DataFrame instead of reading a file from disk.

The expected output is also attached. It was generated by adding del G1 as described above. mre_expected_output.txt

Environment overview (please complete the following information) conda environment with dask 2023.6.0 or later

Environment details

(rapids) root@bfd6e155776f:/Projects/exp# conda list dask
# packages in environment at /opt/conda/envs/rapids:
#
# Name                    Version                   Build  Channel
dask                      2023.6.0                 pypi_0    pypi
dask-core                 2023.3.2           pyhd8ed1ab_0    conda-forge
dask-cudf                 23.8.0                   pypi_0    pypi
dask-glm                  0.2.0                      py_1    conda-forge
dask-labextension         6.1.0              pyhd8ed1ab_0    conda-forge
dask-ml                   2023.3.24          pyhd8ed1ab_1    conda-forge
raft-dask                 23.8.0                   pypi_0    pypi
(rapids) root@bfd6e155776f:/Projects/exp# conda list distributed
# packages in environment at /opt/conda/envs/rapids:
#
# Name                    Version                   Build  Channel
distributed               2023.6.0                 pypi_0    pypi

dask_cudf_mre.py

cc @VibhuJawa @jnke2016

About this issue

  • Original URL
  • State: closed
  • Created 10 months ago
  • Comments: 15 (13 by maintainers)

Most upvoted comments

In a call @rjzamora suggested using token in the map_partitions call to help with clean up issues. We think this will fix the issue

ddf_copy = ddf_copy.map_partitions(lambda df: df.copy(), token='custom-'+str(random.random()))