dask: UnpicklingError: invalid load key - with Large Datasets
Hello All,
I hope you are having a nice day.
I am consistently having the following issue. Note that the load key changes, but is always some obscure char.
-bash-4.1$ python2.7 cat_predict2.py
Traceback (most recent call last):
File "cat_predict2.py", line 83, in <module>
df.to_csv(output_file, index = True, sep = '\t')
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/dataframe/core.py", line 1105, in to_csv
return to_csv(self, filename, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 636, in to_csv
delayed(values).compute(get=get, scheduler=scheduler)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/base.py", line 395, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/threaded.py", line 75, in get
pack_exception=pack_exception, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/local.py", line 501, in get_async
raise_exception(exc, tb)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/local.py", line 272, in execute_task
result = _execute_task(task, data)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/local.py", line 253, in _execute_task
return func(*args2)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/optimization.py", line 942, in __call__
dict(zip(self.inkeys, args)))
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 132, in _get_recursive
res = cache[x] = _get_recursive(dsk, dsk[x], cache)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 136, in _get_recursive
args2 = [_get_recursive(dsk, k, cache) for k in args]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 126, in _get_recursive
return [_get_recursive(dsk, k, cache) for k in x]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 132, in _get_recursive
res = cache[x] = _get_recursive(dsk, dsk[x], cache)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 136, in _get_recursive
args2 = [_get_recursive(dsk, k, cache) for k in args]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 126, in _get_recursive
return [_get_recursive(dsk, k, cache) for k in x]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 136, in _get_recursive
args2 = [_get_recursive(dsk, k, cache) for k in args]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 126, in _get_recursive
return [_get_recursive(dsk, k, cache) for k in x]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/core.py", line 137, in _get_recursive
return func(*args2)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/dask/dataframe/shuffle.py", line 419, in collect
res = p.get(part)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/partd/core.py", line 73, in get
return self.get([keys], **kwargs)[0]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/partd/core.py", line 79, in get
return self._get(keys, **kwargs)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/partd/encode.py", line 30, in _get
for chunk in raw]
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/partd/pandas.py", line 170, in deserialize
headers = pickle.loads(frames[0])
cPickle.UnpicklingError: invalid load key, 'Ë'.
My script is very simple and works just fine with smaller data sets, currently I am trying to merge 9 files that are each between 100 million and 500 million rows with 2-3 columns each.
Note this is on a Linux server with 90GBs of ram and 2.3TB storage.
The code I am running is simply:
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client
import os
import dask
from timeit import default_timer as timer
path = r'/location'
files = os.listdir(path)
files = list(filter(None, [path + '/' + x if x.startswith('val') else None for x in files]))
df = dd.from_pandas(pd.DataFrame(columns = ['delete']),npartitions = 1)
for file in files:
new = dd.read_csv(file)
new['iid'] = new['id'].astype(int)
new = new.set_index('id')
df = df.merge(new, left_index = True, right_index = True, how = 'outer')
start = timer()
output_file = 'master_*.csv'
df.to_csv(output_file, index = True, sep = '\t')
end = timer()
print("Time Taken to write: ", end-start)
Thanks so much for your help!
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 18 (9 by maintainers)
@mrocklin the attached two scripts should reproduce the issue: Archive.zip
Just run:
You may need to run the random data generator a couple of times before you generate a file that produces the error. I was able to reproduce the same
_pickle.UnpicklingError: invalid load key, '\xdd'.error multiple times in a row with the differing runs of./dask_tbl_gen.py -n 2000000, but maybe sometimes the random data will not generate the error.My conda env info is listed above.
I’m running into the same error:
…and after some trouble shooting, I can see why it is so hard to create a reproducible example. I tested this by trimming the input csv files down with
head. In this case, I just joined 2 tables with dask. The relevant section of my python script:The error actually occurred while loading the first csv file, so the 2nd didn’t matter. I could get rid of the error by trimming the file to the first 90000 lines with
head, but the error occurred if I trimmed to the first 1000000 lines. I further narrowed it down:Lines 907850 --> 907900 like the following:
I don’t see anything wrong. I tried to just use this section of the file as input, but there was no error. I tried using this section, plus 10000 or 100000 lines prior, but no error. I tried just removing the header of the csv (
head head -n 907900 FILE.csv| tail -n +2 > OUTPUT.csv), and then there was an error. I tried removing the first 50 lines, and then no error.So, it appears that the error results from having a certain number of lines in the csv (ie., > ~907850).
I also tried searching for the load key (‘\xdd’) but could not find it in my csv file.
Conda env:
conda info: