distributed: distributed.worker - ERROR - list index out of range

# environment
dask.__version__ : '1.2.2'
distributed.__version__:  '1.28.0'

We got the following error from distributed.worker

# error message
distributed.worker - ERROR - list index out of range
Traceback (most recent call last):
  File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 2336, in execute
    self.transition(key, "memory", value=value)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 1443, in transition
    state = func(key, **kwargs)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 1562, in transition_executing_done
    self.send_task_state_to_scheduler(key)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 1727, in send_task_state_to_scheduler
    typ_serialized = dumps_function(typ)
  File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 3040, in dumps_function
    result = cache[func]
  File "/usr/local/anaconda3/lib/python3.6/site-packages/zict/lru.py", line 50, in __getitem__
    self.heap[key] = self.i
  File "/usr/local/anaconda3/lib/python3.6/site-packages/heapdict.py", line 39, in __setitem__
    self.pop(key)
  File "/usr/local/anaconda3/lib/python3.6/_collections_abc.py", line 801, in pop
    del self[key]
  File "/usr/local/anaconda3/lib/python3.6/site-packages/heapdict.py", line 78, in __delitem__
    self._swap(wrapper[2], parent[2])
  File "/usr/local/anaconda3/lib/python3.6/site-packages/heapdict.py", line 68, in _swap
    self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
IndexError: list index out of range
/usr/local/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 48 leaked semaphores to clean up at shutdown
  len(cache))

We trace the issue, and we figure out this might be caused by the change in recent update in distributed.worker

# copy from distributed/worker.py
try:
    # a 10 MB cache of deserialized functions and their bytes
    from zict import LRU
    cache = LRU(10000000, dict(), weight=lambda k, v: len(v))
except ImportError:
    cache = dict()

def dumps_function(func):
    """ Dump a function to bytes, cache functions """
    try:
        result = cache[func]
    except KeyError:
        result = pickle.dumps(func)
        if len(result) < 100000:
            cache[func] = result
    except TypeError:
        result = pickle.dumps(func)
    return result

In recent change, distributed.worker use zict LRU as cache, but the LRU’s get_item is not thread safe. The following is the minimal example to reproduce the " list index out of range" error purely use LRU.

from zict import LRU
from functools import partial
import concurrent.futures
# create LRU cache
cache=LRU(2,dict())
cache[1]=1
cache[2]=2
# function to get key from cache multiple times
def get_key(key,reps):
    for _ in range(reps):
        cache[key]

get_key_m=partial(get_key,reps=1000000)
# test get key from multiple threads 
def test_get_key():
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        [i for i in executor.map(get_key_m, [1,2])]
    
# this call will provide "IndexError: list index out of range"
test_get_key()

We are not able to figure out a minimal example to produce the list index out of range by using Dask directly, and hopefully the minimal example of using LRU can provide some insights. Please let me know your opinion. Thanks!

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 1
  • Comments: 16 (8 by maintainers)

Commits related to this issue

Most upvoted comments

@TomAugspurger @mrocklin, I just tried the lock modification on distributed source code and run with our application, and “list index out of range” error got resolved! Thanks for the input, and hopefully this fix can be in the recent release.