modin: BUG: KeyError when UDF in groupby.apply accesses data from another column partition
System information
- **OS Platform and Distribution Ubuntu 20.04.2 LTS **:
- Modin version 0.10.2:
- Python version 3.7.11:
import modin.pandas as pd
import numpy as np
def printcol(dataset, **kwargs):
col = kwargs["column"]
print(dataset[col])
return dataset
df = pd.DataFrame(np.random.randn(11, 2), columns=["A", "B"])
df['KEY'] = df['A'].apply(lambda x: 'K1' if x < 0 else 'K2')
df = df.groupby(['KEY']).apply(printcol,column="A")
[snip]
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
Describe the problem
A key error occurs when trying to reference a column in a grouping within the function you are applying. This works as expected if a standard pandas.DataFrame object is used.
Source code / logs
Full stack trace
2021-09-16 10:05:17,089 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:17,092 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:17,095 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
(pid=19064) 1 -1.342643
(pid=19064) 3 -0.855525
(pid=19064) 5 -0.656729
(pid=19064) 6 -0.795196
(pid=19064) 8 -1.158136
(pid=19064) 9 -1.422635
(pid=19064) 10 -1.237912
(pid=19064) Name: A, dtype: float64
(pid=19064) 0 2.028227
(pid=19064) 2 0.469303
(pid=19064) 4 0.867193
(pid=19064) 7 1.239291
(pid=19064) Name: A, dtype: float64
---------------------------------------------------------------------------
RayTaskError(KeyError) Traceback (most recent call last)
<ipython-input-4-97c0bcb4466e> in <module>
----> 1 df = df.groupby(['KEY']).apply(printcol,column="A")
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py in apply(self, func, *args, **kwargs)
278 if not isinstance(func, BuiltinFunctionType):
279 func = wrap_udf_function(func)
--> 280 return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
281
282 @property
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py in _apply_agg_function(self, f, *args, **kwargs)
921 agg_kwargs=kwargs,
922 groupby_kwargs=self._kwargs,
--> 923 drop=self._drop,
924 )
925 if self._idx_name is not None and self._as_index:
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py in groupby_agg(self, by, is_multi_by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop)
2847 other=broadcastable_by,
2848 apply_indices=apply_indices,
-> 2849 enumerate_partitions=True,
2850 )
2851 result = self.__constructor__(new_modin_frame)
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py in broadcast_apply_full_axis(self, axis, func, other, new_index, new_columns, apply_indices, enumerate_partitions, dtypes)
1816 if new_axis is None
1817 else new_axis
-> 1818 for i, new_axis in enumerate([new_index, new_columns])
1819 ]
1820 if dtypes == "copy":
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py in <listcomp>(.0)
1816 if new_axis is None
1817 else new_axis
-> 1818 for i, new_axis in enumerate([new_index, new_columns])
1819 ]
1820 if dtypes == "copy":
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py in _compute_axis_labels(self, axis, partitions)
305 partitions = self._partitions
306 return self._partition_mgr_cls.get_indices(
--> 307 axis, partitions, lambda df: df.axes[axis]
308 )
309
~/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition_manager.py in get_indices(cls, axis, partitions, index_func)
135 else []
136 )
--> 137 new_idx = ray.get(new_idx)
138 return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx
139
~/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
60 if client_mode_should_convert():
61 return getattr(ray, func.__name__)(*args, **kwargs)
---> 62 return func(*args, **kwargs)
63
64 return wrapper
~/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/worker.py in get(object_refs, timeout)
1493 worker.core_worker.dump_object_store_memory_usage()
1494 if isinstance(value, RayTaskError):
-> 1495 raise value.as_instanceof_cause()
1496 else:
1497 raise value
RayTaskError(KeyError): ray::apply_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 460, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 351, in ray._raylet.raise_if_dependency_failed
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:22,867 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:22,870 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
2021-09-16 10:05:22,874 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2834, in groupby_agg_builder
return compute_groupby(df, drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
During handling of the above exception, another exception occurred:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'A'
The above exception was the direct cause of the following exception:
ray::deploy_ray_func() (pid=19062, ip=172.31.12.170)
File "python/ray/_raylet.pyx", line 490, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 497, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/axis_partition.py", line 207, in deploy_ray_func
result = func(*args)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/axis_partition.py", line 369, in deploy_func_between_two_axis_partitions
result = func(lt_frame, rt_frame, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 1153, in _map_reduce_func
series_result = func(df, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2845, in <lambda>
df, by, drop, partition_idx
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2838, in groupby_agg_builder
return compute_groupby(df.copy(), drop, partition_idx)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 2752, in compute_groupby
result = agg_func(grouped_df, **agg_kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/pandas/groupby.py", line 280, in <lambda>
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1253, in apply
result = self._python_apply_general(f, self._selected_obj)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1287, in _python_apply_general
keys, values, mutated = self.grouper.apply(f, data, self.axis)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 783, in apply
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/ops.py", line 1328, in fast_apply
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)
File "pandas/_libs/reduction.pyx", line 381, in pandas._libs.reduction.apply_frame_axis0
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/groupby/groupby.py", line 1237, in f
return func(g, *args, **kwargs)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/modin/utils.py", line 500, in wrapper
result = func(*args, **kwargs)
File "<ipython-input-2-5658305649cc>", line 3, in printcol
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "/home/stus/miniconda3/envs/fctk/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'A'
About this issue
- Original URL
- State: open
- Created 3 years ago
- Comments: 16
It appears that the root cause of this issue is similar to the problem reported in #2511. When we’re applying a function in groupby, we expect that the applied function obtains the whole group frame:
In pandas, the history of calls of the applied function would be:
When a frame is converted to Modin, it’s being split into partitions, in the related reproducer before doing the failing
groupby.apply
the frame split is:As you can see, now we got two parts and only one of them contains the “A” column. When doing
groupby.apply
Modin does not concatenate partition along rows, so we get 4 calls of the applied fn here:Since only “part1” contains the “A” column, we get a KeyError for every “part2”.
This is certainly a bug, Modin’s users don’t have to care about partitioning, but unfortunately in this case there’s no way of not being informed about them for now.
Unfortunately, I can’t suggest a workaround for except of defaulting to pandas for this particular operation (@devin-petersohn maybe you can?):
@modin-project/modin-core BTW, maybe we should replace our current
groupby.apply
implementation to default-to-pandas since it just doesn’t work as intended?😄 oh, well…
but generally, both our answers seem to be correct.
Answering the question: “does it matter for the partitioning that the ‘KEY’ column was inserted after frame’s construction”, the answer is YES (see my comment).
Answering the question: “does it matter for the ‘apply’ that the ‘KEY’ column was inserted after”, the answer is NO, because the error happens due to a bug (see Devin’s comment). (correct if I’m wrong)
Yes. The initial minimal chunk size for partitioning is 32. That means that if the frame contains <=32 columns, all of them will be located in a single column-partition. But if the amount of cols is 33, then the Modin frame will have 2 partitions, the first containing 32 columns, and the second containing the rest column.
The reason why you’re having 2 column-partitions in the frame of 3 columns is this new column insertion:
Inserting a new column is equivalent to
concat
. When concatenating frames Modin does not repartition anything, it literally concatenating partitions:scheme under spoiler
Partitioning becomes unnormalized along the row axis until some row-function is called that firstly triggers concatenation of all of the row-partitions (to be able to provide the whole row to the function) and then repartition the result using the initial partitioning rule:
scheme under spoiler
The reason I’m describing the execution flow of full-axis functions in details is that
groupby.apply
is also one of them:scheme under spoiler
To achieve parallelism in GroupBy some of the groupby aggregation function is implemented in two phases via MapReduce approach,
groupby.apply
is not the case, since we don’t know much about passed UDF.SO, if after placing all of the columns into a single column-partition you would be concerned about performance, then you can try to split your aggregation into two phases (map and reduce) and then use Modin’s function operator
GroupbyReduceFunction
to make it work with Modin. You can refer to the corresponding section of Modin’s documentation to learn how to do that.