cudf: [BUG] Intermittent failures in `groupby` cumulative scans when keys contain nulls.

Describe the bug

Running

from itertools import count

import cudf
import dask_cudf
import numpy as np
import rmm
from cudf.testing._utils import assert_eq

if __name__ == "__main__":
    state = np.random.get_state()
    rmm.reinitialize(pool_allocator=False)
    for i in count():
        oldstate = np.random.get_state()
        size = 10_000
        gdf_original = cudf.DataFrame(
            {
                "xx": np.random.randint(0, 5, size=size),
                "x": np.random.normal(size=size),
                "y": np.random.normal(size=size),
            },
        )

        # insert nulls into the key column at random.
        gdf_original["xx"] = gdf_original.xx.mask(
            np.random.choice([True, False], size=gdf_original.xx.shape)
        )
        pdf = gdf_original.to_pandas(nullable=False)
        gdf = gdf_original
        gdf_grouped = gdf.groupby("xx")
        cudf_result = gdf_grouped.cumsum()

        # Although we don't look at the data, this seems pretty
        # crucial to provoking the issue.
        # Notice this never touches the gdf_original data! And the
        # computation is _done_ by the time we're here, so one
        # suspicion is that there is garbage left lying around for the
        # next iteration.
        ddf = dask_cudf.from_cudf(cudf.from_pandas(pdf), npartitions=5).persist()
        ddf_grouped = ddf.groupby("xx")
        dask_cudf_result = ddf_grouped.cumsum().compute(scheduler="sync")

        pandas_result = pdf.groupby("xx").cumsum()
        print(i)
        # This occasionally fails
        assert_eq(cudf_result, pandas_result, check_dtype=False)

        # This was for checking when we removed the reindex call in groupby._mimic_pandas_order

        # mask = ~pdf.xx.isna()
        # pandas_result_no_nulls = pandas_result.loc[mask]
        # assert_eq(cudf_result.sort_index(), pandas_result_no_nulls, check_dtype=False)

with a recent enough cudf nightly sometimes produces assertion errors when checking correctness. The way this exhibits is that a few entries in the grouped dataframe columns are marked as NULL when they should not be. Post-mortem debugging, if one re-executes the offending bad code it tends to produce the correct result. One normally has to run a few times, interrupting the script to see the failure.

This has sometimes been causing the nightly actions to fail, the first is https://github.com/rapidsai/cudf/actions/runs/5065561229/jobs/9094279670 which is the first nightly that contained #13372.

Some existing investigation with @shwina provides the following information:

  1. #13389 fixed this code so that it would run at all, and introduces a dataframe.reindex call inside _mimic_pandas_order. If we remove the reindex call (applying this patch

    diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py
    index b7faed1dfc..e1a84897f4 100644
    --- a/python/cudf/cudf/core/groupby/groupby.py
    +++ b/python/cudf/cudf/core/groupby/groupby.py
    @@ -13,6 +13,7 @@ import numpy as np
     import pandas as pd
    
     import cudf
    +import cudf._lib as libcudf
     from cudf._lib import groupby as libgroupby
     from cudf._lib.null_mask import bitmask_or
     from cudf._lib.reshape import interleave_columns
    @@ -2290,12 +2291,12 @@ class GroupBy(Serializable, Reducible, Scannable):
                 ri = cudf.RangeIndex(0, len(self.obj))
                 result.index = cudf.Index(ordering)
                 # This reorders and expands
    -            result = result.reindex(ri)
    +            # result = result.reindex(ri)
             else:
                 # Just reorder according to the groupings
                 result = result.take(ordering.argsort())
    -        # Now produce the actual index we first thought of
    -        result.index = self.obj.index
    +            # Now produce the actual index we first thought of
    +            result.index = self.obj.index
             return result
    

    And uncommenting the alternate error checking code in the bug script, we do not see failures.

  2. reindex goes through join and hence gather. However, we also tried reimplementing the reordering using scatter, like so:

    diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py
    index b7faed1dfc..64d9ef2a8f 100644
    --- a/python/cudf/cudf/core/groupby/groupby.py
    +++ b/python/cudf/cudf/core/groupby/groupby.py
    @@ -13,6 +13,7 @@ import numpy as np
     import pandas as pd
    
     import cudf
    +import cudf._lib as libcudf
     from cudf._lib import groupby as libgroupby
     from cudf._lib.null_mask import bitmask_or
     from cudf._lib.reshape import interleave_columns
    @@ -2287,10 +2288,18 @@ class GroupBy(Serializable, Reducible, Scannable):
                 # Scan aggregations with null/nan keys put nulls in the
                 # corresponding output rows in pandas, to do that here
                 # expand the result by reindexing.
    -            ri = cudf.RangeIndex(0, len(self.obj))
    -            result.index = cudf.Index(ordering)
    -            # This reorders and expands
    -            result = result.reindex(ri)
    +            null_result_columns = [
    +                cudf.core.column.column_empty_like(
    +                    c, masked=True, newsize=len(self.obj)
    +                )
    +                for c in result._data.columns
    +            ]
    +            new_result_columns = libcudf.copying.scatter(
    +                [*result._data.columns], ordering, null_result_columns
    +            )
    +            result = result._from_columns_like_self(
    +                new_result_columns, column_names=result._column_names
    +            )
             else:
                 # Just reorder according to the groupings
                 result = result.take(ordering.argsort())
    

    and still observe the bug.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 16 (16 by maintainers)

Commits related to this issue

Most upvoted comments

That seems to do the trick for me, @davidwendt

I can confirm this appears to fix the bug for me, too. Let’s open a PR. @davidwendt Would you do the honors?

@shwina I wasn’t able to reproduce that RuntimeError with a single null, but @wence-'s reproducer that alternates null/valid does fail consistently after a small number of iterations (usually 2 or 4) for me.

Let’s consider the calling context for single_lane_block_sum_reduce (in valid_if_n_kernel):

__global__ void valid_if_n_kernel(InputIterator1 begin1,
                                  InputIterator2 begin2,
                                  BinaryPredicate p,
                                  bitmask_type* masks[],
                                  size_type mask_count,
                                  size_type mask_num_bits,
                                  size_type* valid_counts)
{
  for (size_type mask_idx = 0; mask_idx < mask_count; mask_idx++) {
    auto const mask = masks[mask_idx];
    if (mask == nullptr) { continue; }

    auto block_offset     = blockIdx.x * blockDim.x;
    auto warp_valid_count = static_cast<size_type>(0);

    while (block_offset < mask_num_bits) {
      auto const thread_idx    = block_offset + threadIdx.x;
      auto const thread_active = thread_idx < mask_num_bits;
      auto const arg_1         = *(begin1 + mask_idx);
      auto const arg_2         = *(begin2 + thread_idx);
      auto const bit_is_valid  = thread_active && p(arg_1, arg_2);
      auto const warp_validity = __ballot_sync(0xffff'ffffu, bit_is_valid);
      auto const mask_idx      = word_index(thread_idx);

      if (thread_active && threadIdx.x % warp_size == 0) { mask[mask_idx] = warp_validity; }

      warp_valid_count += __popc(warp_validity);
      block_offset += blockDim.x * gridDim.x;
    }

    auto block_valid_count = single_lane_block_sum_reduce<block_size, 0>(warp_valid_count);

    if (threadIdx.x == 0) { atomicAdd(valid_counts + mask_idx, block_valid_count); }
  }
}

While the __syncthreads call ensures that in the same iteration in valid_if_n_kernel that there is no data-race between the write on line 98 and read on line 106, I don’t think that is sufficient to ensure no data-races between iterations because without further (outside) synchronisation, the previous iteration’s read on line 106 can race with the current iteration’s write on line 98.

Applying this patch:

diff --git a/cpp/include/cudf/detail/utilities/cuda.cuh b/cpp/include/cudf/detail/utilities/cuda.cuh
index cdbc26701d..fe5ac6d42f 100644
--- a/cpp/include/cudf/detail/utilities/cuda.cuh
+++ b/cpp/include/cudf/detail/utilities/cuda.cuh
@@ -93,7 +93,7 @@ __device__ T single_lane_block_sum_reduce(T lane_value)
   auto const lane_id{threadIdx.x % warp_size};
   auto const warp_id{threadIdx.x / warp_size};
   __shared__ T lane_values[warp_size];
-
+  __syncthreads();
   // Load each lane's value into a shared memory array
   if (lane_id == leader_lane) { lane_values[warp_id] = lane_value; }
   __syncthreads();

Makes the reproducer racecheck-clean, good! But doesn’t fix the original issue, bad!