dask: "IndexError: tuple index out of range"

Hello, this is a very weird problem since it doesn’t happen all the time, but I will try to give you some reproducible example.

import numpy as np
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'a':np.random.rand(1000), 'b':np.random.rand(1000), 'c':np.random.randint(0,1000, 1000)})
ddf = dd.from_pandas(df, npartitions=10)

ddf['idx_index'] = ddf.index

n_err = 0
for i in range(100):
    try:
        counts = ddf.count()
        nunique = {col: ddf[col].nunique() for col in ddf.columns}
        dd.compute(counts, nunique)
    except IndexError as ex:
        n_err += 1
print(n_err)

I’m running the exact same operation 100 times and, as you can see if you run it, it fails sometimes.

Two things I noticed:

  • If I remove the ddf['idx_index'] = ddf.index operation, I never get the error.
  • If instead of computing the counts and uniques, I only compute one of them, the error doesn’t appear.

Below is the full traceback

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-11-769d14e49a64> in <module>
      4     counts = ddf.count()
      5     nunique = {col: ddf[col].nunique() for col in ddf.columns}
----> 6     dd.compute(counts, nunique)
      7     #except IndexError as ex:
      8     #    n_err += 1

~/.venvs/tarsier/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

~/.venvs/tarsier/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     80         get_id=_thread_get_id,
     81         pack_exception=pack_exception,
---> 82         **kwargs
     83     )
     84 

~/.venvs/tarsier/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    489                         _execute_task(task, data)  # Re-execute locally
    490                     else:
--> 491                         raise_exception(exc, tb)
    492                 res, worker_id = loads(res_info)
    493                 state["cache"][key] = res

~/.venvs/tarsier/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
    128         if exc.__traceback__ is not tb:
    129             raise exc.with_traceback(tb)
--> 130         raise exc
    131 
    132     import pickle as cPickle

~/.venvs/tarsier/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    231     try:
    232         task, data = loads(task_info)
--> 233         result = _execute_task(task, data)
    234         id = get_id()
    235         result = dumps((result, id))

~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/.venvs/tarsier/lib/python3.6/site-packages/dask/optimization.py in __call__(self, *args)
   1057         if not len(args) == len(self.inkeys):
   1058             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
-> 1059         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
   1060 
   1061     def __reduce__(self):

~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in get(dsk, out, cache)
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

~/.venvs/tarsier/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/frame.py in __getitem__(self, key)
   2899                 if self.columns.nlevels > 1:
   2900                     return self._getitem_multilevel(key)
-> 2901                 return self._get_item_cache(key)
   2902         except (TypeError, ValueError):
   2903             # The TypeError correctly catches non hashable "key" (e.g. list)

~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/generic.py in _get_item_cache(self, item)
   3059         res = cache.get(item)
   3060         if res is None:
-> 3061             values = self._data.get(item)
   3062             res = self._box_item_values(item, values)
   3063             cache[item] = res

~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/internals/managers.py in get(self, item, fastpath)
    950                         raise ValueError("cannot label index with a null key")
    951 
--> 952             return self.iget(loc, fastpath=fastpath)
    953         else:
    954 

~/.venvs/tarsier/lib/python3.6/site-packages/pandas/core/internals/managers.py in iget(self, i, fastpath)
    967         Otherwise return as a ndarray
    968         """
--> 969         block = self.blocks[self._blknos[i]]
    970         values = block.iget(self._blklocs[i])
    971         if not fastpath or not block._box_to_block_values or values.ndim != 1:

IndexError: tuple index out of range

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 15 (11 by maintainers)

Most upvoted comments

It looks like a concurrency issue. Pandas isn’t thread-safe, right ? How do you get around thread-safety in Pandas, does Dask has internal locks for dealing with DataFrames ?