distributed: distributed.worker - ERROR - list index out of range
# environment
dask.__version__ : '1.2.2'
distributed.__version__: '1.28.0'
We got the following error from distributed.worker
# error message
distributed.worker - ERROR - list index out of range
Traceback (most recent call last):
File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 2336, in execute
self.transition(key, "memory", value=value)
File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 1443, in transition
state = func(key, **kwargs)
File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 1562, in transition_executing_done
self.send_task_state_to_scheduler(key)
File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 1727, in send_task_state_to_scheduler
typ_serialized = dumps_function(typ)
File "/usr/local/anaconda3/lib/python3.6/site-packages/distributed/worker.py", line 3040, in dumps_function
result = cache[func]
File "/usr/local/anaconda3/lib/python3.6/site-packages/zict/lru.py", line 50, in __getitem__
self.heap[key] = self.i
File "/usr/local/anaconda3/lib/python3.6/site-packages/heapdict.py", line 39, in __setitem__
self.pop(key)
File "/usr/local/anaconda3/lib/python3.6/_collections_abc.py", line 801, in pop
del self[key]
File "/usr/local/anaconda3/lib/python3.6/site-packages/heapdict.py", line 78, in __delitem__
self._swap(wrapper[2], parent[2])
File "/usr/local/anaconda3/lib/python3.6/site-packages/heapdict.py", line 68, in _swap
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
IndexError: list index out of range
/usr/local/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 48 leaked semaphores to clean up at shutdown
len(cache))
We trace the issue, and we figure out this might be caused by the change in recent update in distributed.worker
# copy from distributed/worker.py
try:
# a 10 MB cache of deserialized functions and their bytes
from zict import LRU
cache = LRU(10000000, dict(), weight=lambda k, v: len(v))
except ImportError:
cache = dict()
def dumps_function(func):
""" Dump a function to bytes, cache functions """
try:
result = cache[func]
except KeyError:
result = pickle.dumps(func)
if len(result) < 100000:
cache[func] = result
except TypeError:
result = pickle.dumps(func)
return result
In recent change, distributed.worker use zict LRU as cache, but the LRU’s get_item is not thread safe. The following is the minimal example to reproduce the " list index out of range" error purely use LRU.
from zict import LRU
from functools import partial
import concurrent.futures
# create LRU cache
cache=LRU(2,dict())
cache[1]=1
cache[2]=2
# function to get key from cache multiple times
def get_key(key,reps):
for _ in range(reps):
cache[key]
get_key_m=partial(get_key,reps=1000000)
# test get key from multiple threads
def test_get_key():
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
[i for i in executor.map(get_key_m, [1,2])]
# this call will provide "IndexError: list index out of range"
test_get_key()
We are not able to figure out a minimal example to produce the list index out of range by using Dask directly, and hopefully the minimal example of using LRU can provide some insights. Please let me know your opinion. Thanks!
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Reactions: 1
- Comments: 16 (8 by maintainers)
Commits related to this issue
- Add lock around dumps_function cache Fixes #2727 — committed to mrocklin/distributed by mrocklin 5 years ago
- Add lock around dumps_function cache (#3337) Fixes #2727 — committed to dask/distributed by mrocklin 5 years ago
- Add lock around dumps_function cache (#3337) Fixes #2727 — committed to replicahq/distributed by mrocklin 5 years ago
@TomAugspurger @mrocklin, I just tried the lock modification on distributed source code and run with our application, and “list index out of range” error got resolved! Thanks for the input, and hopefully this fix can be in the recent release.