modin: Can't pickle fuction objects ray error
System information
- Ubuntu 18
- 0.11.2
- 3.7.1
- Code we can use to reproduce:
Python 3.7.1 (default, Oct 26 2021, 13:26:12)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import modin.pandas as pd
>>> import os
>>> os.environ['MODIN_ENGINE'] = 'ray'
>>> pd.read_csv('tests/googleplaystore.csv')
UserWarning: Ray execution environment not yet initialized. Initializing...
To remove this warning, run the following python code before doing dataframe operations:
import ray
ray.init()
UserWarning: The size of /dev/shm is too small (8117444608 bytes). The required size at least half of RAM (8352208896 bytes). Please, delete files in /dev/shm or increase size of /dev/shm with --shm-size in Docker. Also, you can set the required memory size for each Ray worker in bytes to MODIN_MEMORY environment variable.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/boris/.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/io.py", line 135, in read_csv
return _read(**kwargs)
...
File "/home/boris/.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py", line 576, in save
rv = reduce(self.proto)
TypeError: can't pickle function objects
>>>
Describe the problem
When using the ray engine, an attempt to read a file from disk causes errors. This does not happen on dask.
Source code / logs
dfsql/table.py:57: in fetch_dataframe
return pd.read_csv(self.fpath)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/io.py:135: in read_csv
return _read(**kwargs)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/io.py:58: in _read
Engine.subscribe(_update_engine)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/config/pubsub.py:213: in subscribe
callback(cls)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/__init__.py:113: in _update_engine
initialize_ray()
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/engines/ray/utils.py:174: in initialize_ray
ray.init(**ray_init_kwargs)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/_private/client_mode_hook.py:89: in wrapper
return func(*args, **kwargs)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/worker.py:950: in init
job_config=job_config)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/worker.py:1444: in connect
lambda worker_info: sys.path.insert(1, script_directory))
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/worker.py:386: in run_function_on_all_workers
pickled_function = pickle.dumps(function)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py:73: in dumps
cp.dump(obj)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py:580: in dump
return Pickler.dump(self, obj)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:485: in dump
self.save(obj)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:601: in save
self.save_reduce(obj=obj, *rv)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:689: in save_reduce
save(func)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:601: in save
self.save_reduce(obj=obj, *rv)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:689: in save_reduce
save(func)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <ray.cloudpickle.cloudpickle_fast.CloudPickler object at 0x7fb6f20a70b8>, obj = <function _builtin_type at 0x7fb6f37921e0>, save_persistent_id = True
def save(self, obj, save_persistent_id=True):
self.framer.commit_frame()
# Check for persistent id (defined by a subclass)
pid = self.persistent_id(obj)
if pid is not None and save_persistent_id:
self.save_pers(pid)
return
# Check the memo
x = self.memo.get(id(obj))
if x is not None:
self.write(self.get(x[0]))
return
rv = NotImplemented
reduce = getattr(self, "reducer_override", None)
if reduce is not None:
rv = reduce(obj)
if rv is NotImplemented:
# Check the type dispatch table
t = type(obj)
f = self.dispatch.get(t)
if f is not None:
f(self, obj) # Call unbound method with explicit self
return
# Check private dispatch table if any, or else
# copyreg.dispatch_table
reduce = getattr(self, 'dispatch_table', dispatch_table).get(t)
if reduce is not None:
rv = reduce(obj)
else:
# Check for a class with a custom metaclass; treat as regular
# class
if issubclass(t, type):
self.save_global(obj)
return
# Check for a __reduce_ex__ method, fall back to __reduce__
reduce = getattr(obj, "__reduce_ex__", None)
if reduce is not None:
> rv = reduce(self.proto)
E TypeError: can't pickle function objects
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:576: TypeError
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 16 (4 by maintainers)
Hi @btseytlin , thanks for posting! This seems to be a Ray related error regarding race condition between
import thread
andworker thread
(more in https://github.com/ray-project/ray/issues/7879). We have a hacky solution to bypass the race condition - run pandas import on all Ray workers. However, your example failed exactly during that run. We would like to get rid of redundant pandas imports on Ray workers in #3600. Would you be able to install Modin from that branch and try it out to look if it helps?