distributed: "distributed.protocol.core - CRITICAL - Failed to Serialize" with PyArrow 0.13

Reading from Parquet is failing with PyArrow 0.13. Downgrading to PyArrow 0.12.1 seems to fix the problem. I’ve only encountered this when using the distributed client. Using a Dask dataframe by itself does not appear to be affected.

For example,

from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.set_index('index')

Gives

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/batched.py", line 94, in _background_send
    on_error='raise')
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 224, in write
    'recipient': self._peer_addr})
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 50, in to_frames
    res = yield offload(_to_frames)
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 43, in _to_frames
    context=context))
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
    raise TypeError(msg, str(x)[:10000])

Similarly,

from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.compute()

Causes this error

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
     37     try:
---> 38         result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     39         if len(result) < 1000:

AttributeError: Can't pickle local object 'ParquetDataset._get_open_file_func.<locals>.open_file'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-3-a90291ebde1e> in <module>
      1 ft_inp_ddf = dd.read_parquet('DATA/ft_14_15_inputs_parquet/', engine = 'pyarrow')
----> 2 ft_inp_ddf.compute()

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2318             retries=retries,
   2319             user_priority=priority,
-> 2320             actors=actors,
   2321         )
   2322         packed = pack_data(keys, futures)

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2259 
   2260             self._send_to_scheduler({'op': 'update-graph',
-> 2261                                      'tasks': valmap(dumps_task, dsk3),
   2262                                      'dependencies': dependencies,
   2263                                      'keys': list(flatkeys),

/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/opt/conda/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
   2769         elif not any(map(_maybe_complex, task[1:])):
   2770             return {'function': dumps_function(task[0]),
-> 2771                     'args': warn_dumps(task[1:])}
   2772     return to_serialize(task)
   2773 

/opt/conda/lib/python3.6/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
   2778 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
   2779     """ Dump an object to bytes, warn if those bytes are large """
-> 2780     b = dumps(obj)
   2781     if not _warn_dumps_warned[0] and len(b) > limit:
   2782         _warn_dumps_warned[0] = True

/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
     49     except Exception:
     50         try:
---> 51             return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     52         except Exception as e:
     53             logger.info("Failed to serialize %s. Exception: %s", x, e)

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    959     try:
    960         cp = CloudPickler(file, protocol=protocol)
--> 961         cp.dump(obj)
    962         return file.getvalue()
    963     finally:

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    265         self.inject_addons()
    266         try:
--> 267             return Pickler.dump(self, obj)
    268         except RuntimeError as e:
    269             if 'recursion' in e.args[0]:

/opt/conda/lib/python3.6/pickle.py in dump(self, obj)
    407         if self.proto >= 4:
    408             self.framer.start_framing()
--> 409         self.save(obj)
    410         self.write(STOP)
    411         self.framer.end_framing()

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_tuple(self, obj)
    749         write(MARK)
    750         for element in obj:
--> 751             save(element)
    752 
    753         if id(obj) in memo:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    398             # func is nested
    399             if lookedup_by_name is None or lookedup_by_name is not obj:
--> 400                 self.save_function_tuple(obj)
    401                 return
    402 

/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_list(self, obj)
    779 
    780         self.memoize(obj)
--> 781         self._batch_appends(obj)
    782 
    783     dispatch[list] = save_list

/opt/conda/lib/python3.6/pickle.py in _batch_appends(self, items)
    806                 write(APPENDS)
    807             elif n:
--> 808                 save(tmp[0])
    809                 write(APPEND)
    810             # else tmp is empty, and we're done

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    519 
    520         # Save the reduce() output and finally memoize the object
--> 521         self.save_reduce(obj=obj, *rv)
    522 
    523     def persistent_id(self, obj):

/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    632 
    633         if state is not None:
--> 634             save(state)
    635             write(BUILD)
    636 

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    474         f = self.dispatch.get(t)
    475         if f is not None:
--> 476             f(self, obj) # Call unbound method with explicit self
    477             return
    478 

/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
    819 
    820         self.memoize(obj)
--> 821         self._batch_setitems(obj.items())
    822 
    823     dispatch[dict] = save_dict

/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
    845                 for k, v in tmp:
    846                     save(k)
--> 847                     save(v)
    848                 write(SETITEMS)
    849             elif n:

/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
    494             reduce = getattr(obj, "__reduce_ex__", None)
    495             if reduce is not None:
--> 496                 rv = reduce(self.proto)
    497             else:
    498                 reduce = getattr(obj, "__reduce__", None)

/opt/conda/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-x86_64-linux-gnu.so in pyarrow._parquet.ParquetSchema.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 4
  • Comments: 56 (36 by maintainers)

Most upvoted comments

I have a similar issue. Downgrade pandas from 0.25 to 0.24.2 worked for me.

cloudpickle 1.2.2 was released.

I had to upgrade PyArrow from 0.12.1 to 0.14 because dask asked me to it. But it worked! Thanks a lot @jrbourbeau !