modin: Can't pickle fuction objects ray error

System information

  • Ubuntu 18
  • 0.11.2
  • 3.7.1
  • Code we can use to reproduce:
Python 3.7.1 (default, Oct 26 2021, 13:26:12) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import modin.pandas  as pd
>>> import os
>>> os.environ['MODIN_ENGINE'] = 'ray'
>>> pd.read_csv('tests/googleplaystore.csv')
UserWarning: Ray execution environment not yet initialized. Initializing...
To remove this warning, run the following python code before doing dataframe operations:

    import ray
    ray.init()

UserWarning: The size of /dev/shm is too small (8117444608 bytes). The required size at least half of RAM (8352208896 bytes). Please, delete files in /dev/shm or increase size of /dev/shm with --shm-size in Docker. Also, you can set the required memory size for each Ray worker in bytes to MODIN_MEMORY environment variable.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/boris/.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/io.py", line 135, in read_csv
    return _read(**kwargs)
 ...
  File "/home/boris/.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py", line 576, in save
    rv = reduce(self.proto)
TypeError: can't pickle function objects
>>> 

Describe the problem

When using the ray engine, an attempt to read a file from disk causes errors. This does not happen on dask.

Source code / logs


dfsql/table.py:57: in fetch_dataframe
    return pd.read_csv(self.fpath)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/io.py:135: in read_csv
    return _read(**kwargs)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/io.py:58: in _read
    Engine.subscribe(_update_engine)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/config/pubsub.py:213: in subscribe
    callback(cls)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/pandas/__init__.py:113: in _update_engine
    initialize_ray()
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/modin/engines/ray/utils.py:174: in initialize_ray
    ray.init(**ray_init_kwargs)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/_private/client_mode_hook.py:89: in wrapper
    return func(*args, **kwargs)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/worker.py:950: in init
    job_config=job_config)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/worker.py:1444: in connect
    lambda worker_info: sys.path.insert(1, script_directory))
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/worker.py:386: in run_function_on_all_workers
    pickled_function = pickle.dumps(function)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py:73: in dumps
    cp.dump(obj)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py:580: in dump
    return Pickler.dump(self, obj)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:485: in dump
    self.save(obj)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:601: in save
    self.save_reduce(obj=obj, *rv)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:689: in save_reduce
    save(func)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:601: in save
    self.save_reduce(obj=obj, *rv)
../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:689: in save_reduce
    save(func)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ray.cloudpickle.cloudpickle_fast.CloudPickler object at 0x7fb6f20a70b8>, obj = <function _builtin_type at 0x7fb6f37921e0>, save_persistent_id = True

    def save(self, obj, save_persistent_id=True):
        self.framer.commit_frame()
    
        # Check for persistent id (defined by a subclass)
        pid = self.persistent_id(obj)
        if pid is not None and save_persistent_id:
            self.save_pers(pid)
            return
    
        # Check the memo
        x = self.memo.get(id(obj))
        if x is not None:
            self.write(self.get(x[0]))
            return
    
        rv = NotImplemented
        reduce = getattr(self, "reducer_override", None)
        if reduce is not None:
            rv = reduce(obj)
    
        if rv is NotImplemented:
            # Check the type dispatch table
            t = type(obj)
            f = self.dispatch.get(t)
            if f is not None:
                f(self, obj)  # Call unbound method with explicit self
                return
    
            # Check private dispatch table if any, or else
            # copyreg.dispatch_table
            reduce = getattr(self, 'dispatch_table', dispatch_table).get(t)
            if reduce is not None:
                rv = reduce(obj)
            else:
                # Check for a class with a custom metaclass; treat as regular
                # class
                if issubclass(t, type):
                    self.save_global(obj)
                    return
    
                # Check for a __reduce_ex__ method, fall back to __reduce__
                reduce = getattr(obj, "__reduce_ex__", None)
                if reduce is not None:
>                   rv = reduce(self.proto)
E                   TypeError: can't pickle function objects

../../../.pyenv/versions/3.7.1/lib/python3.7/site-packages/ray/pickle5_files/pickle5/pickle.py:576: TypeError

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (4 by maintainers)

Most upvoted comments

Hi @btseytlin , thanks for posting! This seems to be a Ray related error regarding race condition between import thread and worker thread (more in https://github.com/ray-project/ray/issues/7879). We have a hacky solution to bypass the race condition - run pandas import on all Ray workers. However, your example failed exactly during that run. We would like to get rid of redundant pandas imports on Ray workers in #3600. Would you be able to install Modin from that branch and try it out to look if it helps?

pip uninstall modin # this removes your current Modin version
pip install git+https://github.com/YarShev/modin.git@dev/yigoshev-issue3599 # this installs Modin version from that branch