distributed: Cannot persist dask.dataframes
What happened:
DataFrame collections like dask dataframes or dask-cudf cannot be persisted after release 2021.2.0. @wphicks triaged that after the merge of this PR the issue started to present: https://github.com/dask/distributed/pull/4406
What you expected to happen:
Persist to work (see reproducer)
Minimal Complete Verifiable Example:
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.distributed import Client
from dask.distributed import LocalCluster
def persist_across_workers(client, objects, workers=None):
if workers is None:
# Default to all workers
workers = client.has_what().keys()
return client.persist(objects, workers={o: workers for o in objects})
if __name__ == "__main__":
cluster = LocalCluster()
client = Client(cluster)
X = np.ones((10000, 20))
X_df = pd.DataFrame(X)
X_dist = dd.from_pandas(X_df, npartitions=2)
X_f = persist_across_workers(client, X_dist)
Output:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'dict_keys' object
distributed.comm.utils - ERROR - can not serialize 'dict_keys' object
Traceback (most recent call last):
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
protocol.dumps(
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'dict_keys' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/tcp.py", line 230, in write
frames = await to_frames(
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/utils.py", line 52, in to_frames
return _to_frames()
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
protocol.dumps(
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'dict_keys' object
Environment:
- Dask version: 2021.2.0
- Distributed version: 2021.2.0 from conda and built from master after https://github.com/dask/distributed/pull/4406
- Python version: 3.7 and 3.8
- Operating System: Linux / AMD64
- Install method (conda, pip, source): conda and from source
About this issue
- Original URL
- State: open
- Created 3 years ago
- Comments: 15 (11 by maintainers)
@trivialfis you might be interested in this – I think xgboost maybe does similar things ?
Yes, that’s right @jrbourbeau .
@jakirkham I agree that an improved error message would be helpful. At the very least, we could do a better job ensuring that the shape of the priority/workers/etc makes sense (i.e., iterable for workers, number for priority, error if dict-of-collections)
Thanks for the nice example @dantegd. In https://github.com/dask/distributed/pull/4406 we removed the ability to pass Dask collections to
priority=,workers=keywords as they were broken in many cases. Instead you can now use Dask’s newdask.annotatemachinery in these cases, which should hopefully be more robust.For the above example, this means changing
to
Details:
@jakirkham @jrbourbeau Thanks for the advice!
If you don’t need the workers to be specified, would not specify them