distributed: dd.read_parquet + compute raises `_pickle.UnpicklingError: pickle data was truncated` for multiple parquet files

What happened:

When trying to compute multiple parquet files into a single pandas dataframe with a distributed client on a single VM, I get the following traceback :

Traceback

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x, buffers=buffers)
_pickle.UnpicklingError: pickle data was truncated
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 2666, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1981, in gather
    asynchronous=asynchronous,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 844, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/home/cedric/venv/lib/python3.7/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1869, in _gather
    response = await future
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/client.py", line 1920, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/core.py", line 645, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/tcp.py", line 222, in read
    allow_offload=self.allow_offload,
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 77, in from_frames
    res = await offload(_from_frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/usr/local/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/utils.py", line 1440, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 105, in loads
    frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/core.py", line 97, in _decode_default
    sub_header, sub_frames, deserializers=deserializers
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 455, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 389, in deserialize
    return loads(header, frames)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 85, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/home/cedric/venv/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 73, in loads
    return pickle.loads(x, buffers=buffers)
_pickle.UnpicklingError: pickle data was truncated

What you expected to happen:

I would expect no issue since the parquet files have the same schema (they don’t have any parquet metadata though). The same process worked fine on distributed 2021.2.0 and 2021.3.0

Minimal Complete Verifiable Example:

Install a new virtualenv with Python 3.7.9, install the following libraries with pip:

requirements.txt :

dask==2021.3.0
pickle5==0.0.11
numpy==1.20.1
fastparquet==0.5.0
pandas==1.2.3
git+https://github.com/dask/distributed.git@aac50f63fdacfb43be64279ab540f68cabe7351b
zict==2.0.0
tblib==1.7.0
sortedcontainers==2.3.0
cloudpickle==1.6.0
click==7.1.2
dask-glm==0.2.0
dask-ml==1.8.0
tornado==6.1
toolz==0.11.1
psutil==5.8.0
msgpack==1.0.2
partd==1.1.0
fsspec==0.8.5

Then run the following python script :

from dask.distributed import Client
import dask.dataframe as dd

client = Client()
df = dd.read_parquet("s3://sra-pub-sars-cov2-metadata-us-east-1/v2/tax_analysis/", storage_options={"anon": True})
df.compute()

Anything else we need to know?:

Here is the result from the following command:

client.get_versions(check=True)
Result

{
    "scheduler": {
        "host": {
            "python": "3.7.9.final.0",
            "python-bits": 64,
            "OS": "Linux",
            "OS-release": "3.10.0-1062.9.1.el7.x86_64",
            "machine": "x86_64",
            "processor": "x86_64",
            "byteorder": "little",
            "LC_ALL": "null",
            "LANG": "fr_FR.UTF-8"
        },
        "packages": {
            "python": "3.7.9.final.0",
            "dask": "2021.03.0",
            "distributed": "2021.03.1+12.gaac50f6",
            "msgpack": "1.0.2",
            "cloudpickle": "1.6.0",
            "tornado": "6.1",
            "toolz": "0.11.1",
            "numpy": "1.20.1",
            "lz4": null,
            "blosc": null
        }
    },
    "workers": {
        "tcp://127.0.0.1:34472": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:34639": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:35957": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:35992": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:39976": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:40883": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:43619": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:43934": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:44065": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:45409": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:45573": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        },
        "tcp://127.0.0.1:46478": {
            "host": {
                "python": "3.7.9.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "3.10.0-1062.9.1.el7.x86_64",
                "machine": "x86_64",
                "processor": "x86_64",
                "byteorder": "little",
                "LC_ALL": "null",
                "LANG": "fr_FR.UTF-8"
            },
            "packages": {
                "python": "3.7.9.final.0",
                "dask": "2021.03.0",
                "distributed": "2021.03.1+12.gaac50f6",
                "msgpack": "1.0.2",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.20.1",
                "lz4": null,
                "blosc": null
            }
        }
    },
    "client": {
        "host": {
            "python": "3.7.9.final.0",
            "python-bits": 64,
            "OS": "Linux",
            "OS-release": "3.10.0-1062.9.1.el7.x86_64",
            "machine": "x86_64",
            "processor": "x86_64",
            "byteorder": "little",
            "LC_ALL": "null",
            "LANG": "fr_FR.UTF-8"
        },
        "packages": {
            "python": "3.7.9.final.0",
            "dask": "2021.03.0",
            "distributed": "2021.03.1+12.gaac50f6",
            "msgpack": "1.0.2",
            "cloudpickle": "1.6.0",
            "tornado": "6.1",
            "toolz": "0.11.1",
            "numpy": "1.20.1",
            "lz4": null,
            "blosc": null
        }
    }
}

This issue was discovered trying to debug #4645.

@jakirkham has raised something interesting. He said that, using Python 3.7, distributed needs pickle5 on all the workers, the scheduler and the client(s). Here, pickle5 is installed when the LocalCluster is being created but I am not sure it is installed on all workers, the scheduler and the client(s) since pickle5 is neither in the install requirements for Python3.7, nor in the packages from the workers when I check using client.get_versions (or at least, it does not appear in the packages available).

Environment:

  • Dask version: 2021.3.1 and 2021.3.0
  • Distributed version: main branch (commit: aac50f63fdacfb43be64279ab540f68cabe7351b)
  • Python version: 3.7.9
  • Operating System: RHEL 7.9
  • Install method (conda, pip, source): pip

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (14 by maintainers)

Commits related to this issue

Most upvoted comments

I’ve tested with the code I had provided to read parquet files with distributed from the current main branch and it seems to work as well ! Congrats and thanks for the effort ! (It also works without pickle5)

Ok this should now be fixed. Please retest and let us know how it goes. Also it should be possible to test without pickle5. If you are still seeing issues with main, please let us know as soon as you can (we are planning to release tomorrow) 🙂

cc @alejandrofiel @Cedric-Magnan @gabicca @williamBlazing

I’ve just ran a few tests that were failing before and now they pass with the latest code on main, on python3.7!

Thank everyone for all the effort you put into fixing this!! 😃

Resolved I think in https://github.com/dask/distributed/pull/4666

Tests are running now.

Ok have found a simpler reproducer

import numpy as np
from dask.distributed import Client, wait


class Data:
    def __init__(self, n):
        self.data = np.empty(n, dtype="u1")


c = Client(n_workers=1)

f = c.submit(Data, 200_000_000)
wait(f)

d = f.result()