modin: BUG: KeyError when UDF in groupby.apply accesses data from another column partition

System information

  • **OS Platform and Distribution Ubuntu 20.04.2 LTS **:
  • Modin version 0.10.2:
  • Python version 3.7.11:

import modin.pandas as pd
import numpy as np

def printcol(dataset, **kwargs):
    col = kwargs["column"]
    print(dataset[col])
    return dataset

df = pd.DataFrame(np.random.randn(11, 2), columns=["A", "B"])
df['KEY'] = df['A'].apply(lambda x: 'K1' if x < 0 else 'K2')

df = df.groupby(['KEY']).apply(printcol,column="A")

[snip]
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

Describe the problem

A key error occurs when trying to reference a column in a grouping within the function you are applying. This works as expected if a standard pandas.DataFrame object is used.

Source code / logs

Full stack trace
2021-09-16 10:05:17,089	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:17,092	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:17,095	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'
(pid=19064) 1    -1.342643
(pid=19064) 3    -0.855525
(pid=19064) 5    -0.656729
(pid=19064) 6    -0.795196
(pid=19064) 8    -1.158136
(pid=19064) 9    -1.422635
(pid=19064) 10   -1.237912
(pid=19064) Name: A, dtype: float64
(pid=19064) 0    2.028227
(pid=19064) 2    0.469303
(pid=19064) 4    0.867193
(pid=19064) 7    1.239291
(pid=19064) Name: A, dtype: float64
---------------------------------------------------------------------------
RayTaskError(KeyError)                    Traceback (most recent call last)
<ipython-input-4-97c0bcb4466e> in <module>
----> 1 df = df.groupby(['KEY']).apply(printcol,column="A")

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py in apply(self, func, *args, **kwargs)
    278         if not isinstance(func, BuiltinFunctionType):
    279             func = wrap_udf_function(func)
--> 280         return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
    281 
    282     @property

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py in _apply_agg_function(self, f, *args, **kwargs)
    921             agg_kwargs=kwargs,
    922             groupby_kwargs=self._kwargs,
--> 923             drop=self._drop,
    924         )
    925         if self._idx_name is not None and self._as_index:

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py in groupby_agg(self, by, is_multi_by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop)
   2847             other=broadcastable_by,
   2848             apply_indices=apply_indices,
-> 2849             enumerate_partitions=True,
   2850         )
   2851         result = self.__constructor__(new_modin_frame)

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py in broadcast_apply_full_axis(self, axis, func, other, new_index, new_columns, apply_indices, enumerate_partitions, dtypes)
   1816             if new_axis is None
   1817             else new_axis
-> 1818             for i, new_axis in enumerate([new_index, new_columns])
   1819         ]
   1820         if dtypes == "copy":

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py in <listcomp>(.0)
   1816             if new_axis is None
   1817             else new_axis
-> 1818             for i, new_axis in enumerate([new_index, new_columns])
   1819         ]
   1820         if dtypes == "copy":

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py in _compute_axis_labels(self, axis, partitions)
    305             partitions = self._partitions
    306         return self._partition_mgr_cls.get_indices(
--> 307             axis, partitions, lambda df: df.axes[axis]
    308         )
    309 

~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition_manager.py in get_indices(cls, axis, partitions, index_func)
    135                 else []
    136             )
--> 137         new_idx = ray.get(new_idx)
    138         return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx
    139 

~/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
     60         if client_mode_should_convert():
     61             return getattr(ray, func.__name__)(*args, **kwargs)
---> 62         return func(*args, **kwargs)
     63 
     64     return wrapper

~/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/worker.py in get(object_refs, timeout)
   1493                     worker.core_worker.dump_object_store_memory_usage()
   1494                 if isinstance(value, RayTaskError):
-> 1495                     raise value.as_instanceof_cause()
   1496                 else:
   1497                     raise value

RayTaskError(KeyError): ray::apply_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

2021-09-16 10:05:22,867	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:22,870	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:22,874	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
    return compute_groupby(df, drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

During handling of the above exception, another exception occurred:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
  File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
    result = func(*args)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
    result = func(lt_frame, rt_frame, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
    df, by, drop, partition_idx
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
    return compute_groupby(df.copy(), drop, partition_idx)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
    result = agg_func(grouped_df, **agg_kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
    return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
    result = self._python_apply_general(f, self._selected_obj)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
    keys, values, mutated = self.grouper.apply(f, data, self.axis)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
    result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
    return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
  File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
    return func(g, *args, **kwargs)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
    result = func(*args, **kwargs)
  File "<ipython-input-2-5658305649cc>", line 3, in printcol
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'A'

About this issue

Most upvoted comments

It appears that the root cause of this issue is similar to the problem reported in #2511. When we’re applying a function in groupby, we expect that the applied function obtains the whole group frame:

   A  B  KEY |
0  .  .  .   | first group
1  .  .  .   |
-------------|
2  .  .  .   |
3  .  .  .   | second group
4  .  .  .   |

In pandas, the history of calls of the applied function would be:

fn(first_group)
fn(second_group)

When a frame is converted to Modin, it’s being split into partitions, in the related reproducer before doing the failing groupby.apply the frame split is:

  part1 | part2|
   A  B | KEY  |
0  .  . | .    | first group
1  .  . | .    |
--------|------|
2  .  . | .    |
3  .  . | .    | second group
4  .  . | .    |

As you can see, now we got two parts and only one of them contains the “A” column. When doing groupby.apply Modin does not concatenate partition along rows, so we get 4 calls of the applied fn here:

fn(part1_first_group)
fn(part1_second_group)
fn(part2_first_group)
fn(part2_second_group)

Since only “part1” contains the “A” column, we get a KeyError for every “part2”.

This is certainly a bug, Modin’s users don’t have to care about partitioning, but unfortunately in this case there’s no way of not being informed about them for now.

Unfortunately, I can’t suggest a workaround for except of defaulting to pandas for this particular operation (@devin-petersohn maybe you can?):

df = df._default_to_pandas(lambda df: df.groupby("KEY").apply(printcol, column="A"))

@modin-project/modin-core BTW, maybe we should replace our current groupby.apply implementation to default-to-pandas since it just doesn’t work as intended?

😄 oh, well…

but generally, both our answers seem to be correct.

Answering the question: “does it matter for the partitioning that the ‘KEY’ column was inserted after frame’s construction”, the answer is YES (see my comment).

Answering the question: “does it matter for the ‘apply’ that the ‘KEY’ column was inserted after”, the answer is NO, because the error happens due to a bug (see Devin’s comment). (correct if I’m wrong)

Would it have mattered if KEY was part of the dataframe at construction time?

Yes. The initial minimal chunk size for partitioning is 32. That means that if the frame contains <=32 columns, all of them will be located in a single column-partition. But if the amount of cols is 33, then the Modin frame will have 2 partitions, the first containing 32 columns, and the second containing the rest column.

The reason why you’re having 2 column-partitions in the frame of 3 columns is this new column insertion:

df['KEY'] = df['A'].apply(lambda x: 'K1' if x < 0 else 'K2')

Inserting a new column is equivalent to concat. When concatenating frames Modin does not repartition anything, it literally concatenating partitions:

scheme under spoiler
df1:
  part(0,0) |part(0, 1)|
   A  B  C  | D        |  
0  .  .  .  | .        |
1  .  .  .  | .        |
------------|----------|
  part(1,0) |part(1,1) |
2  .  .  .  | .        |
3  .  .  .  | .        |

df2:
part(0,0)|part(0,1)|
   E     | F  G    |
0  .     | .  .    |
1  .     | .  .    |
---------|---------|
part(1,0)|part(1,1)|
2  .     | .  .    |
3  .     | .  .    |

pd.concat([df1, df2], axis=1):
  part(0,0) |part(0, 1)|part(0,2)|part(0,3)|
   A  B  C  | D        | E       | F  G    |
0  .  .  .  | .        | .       | .  .    |
1  .  .  .  | .        | .       | .  .    |
------------|----------|---------|---------|
  part(1,0) |part(1,1) |part(1,2)|part(1,3)|
2  .  .  .  | .        | .       | .  .    |
3  .  .  .  | .        | .       | .  .    |

Partitioning becomes unnormalized along the row axis until some row-function is called that firstly triggers concatenation of all of the row-partitions (to be able to provide the whole row to the function) and then repartition the result using the initial partitioning rule:

scheme under spoiler
df1:
  part(0,0) |part(0, 1)|part(0,2)|part(0,3)|
   A  B  C  | D        | E       | F  G    |
0  .  .  .  | .        | .       | .  .    |
1  .  .  .  | .        | .       | .  .    |
------------|----------|---------|---------|
  part(1,0) |part(1,1) |part(1,2)|part(1,3)|
2  .  .  .  | .        | .       | .  .    |
3  .  .  .  | .        | .       | .  .    |

df1.apply(lambda df: df, axis=1)

1 step is union row partitions to be able to provide the whole row to the applied function:
  part(0,0)            |
   A  B  C  D  E  F  G |
0  .  .  .  .  .  .  . |  fn(part(0,0)[0])
1  .  .  .  .  .  .  . |  fn(part(0,0)[1])
-----------------------|
  part(1,0)            |
2  .  .  .  .  .  .  . |  fn(part(1,0)[2])
3  .  .  .  .  .  .  . |  fn(part(1,0)[3])

2 step is repartitioning the resulted frame
  part(0,0)   |part(0,1)|
   A  B  C  D | E  F  G |
0  .  .  .  . | .  .  . |
1  .  .  .  . | .  .  . |
--------------|---------|
  part(1,0)   |part(1,1)|
2  .  .  .  . | .  .  . |
3  .  .  .  . | .  .  . |

The reason I’m describing the execution flow of full-axis functions in details is that groupby.apply is also one of them:

scheme under spoiler
df1:
  part(0,0) |
   A  B  C  |
0  .  .  .  |
1  .  .  .  |
------------|
  part(1,0) |
2  .  .  .  |
3  .  .  .  |

df1.groupby("A").apply(fn)

1 step is union column partitions to be able to provide the whole column to the applied function:
  part(0,0) |
   A  B  C  |
0  .  .  .  | part(0,0).groupby("A").apply(fn)
1  .  .  .  | (no parallelism at all, groupby.apply is applied to a single one partition)
2  .  .  .  |
3  .  .  .  |

2 step is repartitioning the resulted frame...

To achieve parallelism in GroupBy some of the groupby aggregation function is implemented in two phases via MapReduce approach, groupby.apply is not the case, since we don’t know much about passed UDF.

SO, if after placing all of the columns into a single column-partition you would be concerned about performance, then you can try to split your aggregation into two phases (map and reduce) and then use Modin’s function operator GroupbyReduceFunction to make it work with Modin. You can refer to the corresponding section of Modin’s documentation to learn how to do that.