dask: "IndexError: tuple index out of range"
Hello, this is a very weird problem since it doesn’t happen all the time, but I will try to give you some reproducible example.
import numpy as np
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'a':np.random.rand(1000), 'b':np.random.rand(1000), 'c':np.random.randint(0,1000, 1000)})
ddf = dd.from_pandas(df, npartitions=10)
ddf['idx_index'] = ddf.index
n_err = 0
for i in range(100):
try:
counts = ddf.count()
nunique = {col: ddf[col].nunique() for col in ddf.columns}
dd.compute(counts, nunique)
except IndexError as ex:
n_err += 1
print(n_err)
I’m running the exact same operation 100 times and, as you can see if you run it, it fails sometimes.
Two things I noticed:
- If I remove the
ddf['idx_index'] = ddf.indexoperation, I never get the error. - If instead of computing the counts and uniques, I only compute one of them, the error doesn’t appear.
Below is the full traceback
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
<ipython-input-11-769d14e49a64> in <module>
4 counts = ddf.count()
5 nunique = {col: ddf[col].nunique() for col in ddf.columns}
----> 6 dd.compute(counts, nunique)
7 #except IndexError as ex:
8 # n_err += 1
~/.venvs/tarsier/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
~/.venvs/tarsier/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
80 get_id=_thread_get_id,
81 pack_exception=pack_exception,
---> 82 **kwargs
83 )
84
~/.venvs/tarsier/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
489 _execute_task(task, data) # Re-execute locally
490 else:
--> 491 raise_exception(exc, tb)
492 res, worker_id = loads(res_info)
493 state["cache"][key] = res
~/.venvs/tarsier/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
128 if exc.__traceback__ is not tb:
129 raise exc.with_traceback(tb)
--> 130 raise exc
131
132 import pickle as cPickle
~/.venvs/tarsier/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
231 try:
232 task, data = loads(task_info)
--> 233 result = _execute_task(task, data)
234 id = get_id()
235 result = dumps((result, id))
~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/.venvs/tarsier/lib/python3.6/site-packages/dask/optimization.py in __call__(self, *args)
1057 if not len(args) == len(self.inkeys):
1058 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
-> 1059 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
1060
1061 def __reduce__(self):
~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/frame.py in __getitem__(self, key)
2899 if self.columns.nlevels > 1:
2900 return self._getitem_multilevel(key)
-> 2901 return self._get_item_cache(key)
2902 except (TypeError, ValueError):
2903 # The TypeError correctly catches non hashable "key" (e.g. list)
~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/generic.py in _get_item_cache(self, item)
3059 res = cache.get(item)
3060 if res is None:
-> 3061 values = self._data.get(item)
3062 res = self._box_item_values(item, values)
3063 cache[item] = res
~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/internals/managers.py in get(self, item, fastpath)
950 raise ValueError("cannot label index with a null key")
951
--> 952 return self.iget(loc, fastpath=fastpath)
953 else:
954
~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/internals/managers.py in iget(self, i, fastpath)
967 Otherwise return as a ndarray
968 """
--> 969 block = self.blocks[self._blknos[i]]
970 values = block.iget(self._blklocs[i])
971 if not fastpath or not block._box_to_block_values or values.ndim != 1:
IndexError: tuple index out of range
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 15 (11 by maintainers)
It looks like a concurrency issue. Pandas isn’t thread-safe, right ? How do you get around thread-safety in Pandas, does Dask has internal locks for dealing with DataFrames ?