distributed: KeyError in distributed joblib
I haven’t been able to reproduce the locally yet.
This is on distributed, dask, & dask-ml master, and the scikit-learn / joblib we used at the sprint.
I’m doing
%%time
with joblib.parallel_backend("dask"):
gs.fit(X, y, classes=[0, 1])
with X and y being dask arrays (so can’t pre-scatter).
The behavior I observe is
- Make the call, tasks show up in the dashboard
- A short time later, the tasks go black / gray, indicating they failed
- The notebook is hanging
- ctrl-C to interrupt (the keyboard interrupt you see in the exception below)
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fc37ad6fbf8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 204, in maybe_to_futures
f = call_data_futures[arg]
File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 67, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 140477560921584
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 759, in _run_callback
ret = callback()
File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 276, in null_wrapper
return fn(*args, **kwargs)
File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 780, in _discard_future_result
future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
yielded = self.gen.send(value)
File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 244, in callback_wrapper
callback(result) # gets called in separate thread
File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 326, in __call__
self.parallel.dispatch_next()
File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 746, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 774, in dispatch_one_batch
self._dispatch(tasks)
File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 731, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 234, in apply_async
func, args = self._to_func_args(func)
File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 224, in _to_func_args
args = list(maybe_to_futures(args))
File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 212, in maybe_to_futures
[f] = self.client.scatter([arg], broadcast=3)
AssertionError: yield from wasn't used with future
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in maybe_to_futures(args)
203 try:
--> 204 f = call_data_futures[arg]
205 except KeyError:
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in __getitem__(self, obj)
66 def __getitem__(self, obj):
---> 67 ref, val = self._data[id(obj)]
68 if ref() is not obj:
KeyError: 140477560921344
During handling of the above exception, another exception occurred:
KeyboardInterrupt Traceback (most recent call last)
<timed exec> in <module>()
~/src/scikit-learn/sklearn/model_selection/_search.py in fit(self, X, y, groups, **fit_params)
658 error_score=self.error_score)
659 for parameters, (train, test) in product(candidate_params,
--> 660 cv.split(X, y, groups)))
661
662 # if one choose to see train score, "out" will contain train score info
~/src/scikit-learn/sklearn/externals/joblib/parallel.py in __call__(self, iterable)
943 self._iterating = self._original_iterator is not None
944
--> 945 while self.dispatch_one_batch(iterator):
946 pass
947
~/src/scikit-learn/sklearn/externals/joblib/parallel.py in dispatch_one_batch(self, iterator)
772 return False
773 else:
--> 774 self._dispatch(tasks)
775 return True
776
~/src/scikit-learn/sklearn/externals/joblib/parallel.py in _dispatch(self, batch)
729 with self._lock:
730 job_idx = len(self._jobs)
--> 731 job = self._backend.apply_async(batch, callback=cb)
732 # A job can complete so quickly than its callback is
733 # called before we get here, causing self._jobs to
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in apply_async(self, func, callback)
232 def apply_async(self, func, callback=None):
233 key = '%s-batch-%s' % (joblib_funcname(func), uuid4().hex)
--> 234 func, args = self._to_func_args(func)
235
236 future = self.client.submit(func, *args, key=key, **self.submit_kwargs)
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in _to_func_args(self, func)
222 tasks = []
223 for f, args, kwargs in func.items:
--> 224 args = list(maybe_to_futures(args))
225 kwargs = dict(zip(kwargs.keys(), maybe_to_futures(kwargs.values())))
226 tasks.append((f, args, kwargs))
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in maybe_to_futures(args)
210 # more workers need to reuse this data concurrently
211 # beyond the initial broadcast arity.
--> 212 [f] = self.client.scatter([arg], broadcast=3)
213 call_data_futures[arg] = f
214
/opt/conda/lib/python3.6/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, maxsize, timeout, asynchronous)
1771 broadcast=broadcast, direct=direct,
1772 local_worker=local_worker, timeout=timeout,
-> 1773 asynchronous=asynchronous, hash=hash)
1774
1775 @gen.coroutine
/opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
650 return future
651 else:
--> 652 return sync(self.loop, func, *args, **kwargs)
653
654 def __repr__(self):
/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
271 else:
272 while not e.is_set():
--> 273 e.wait(10)
274 if error[0]:
275 six.reraise(*error[0])
/opt/conda/lib/python3.6/threading.py in wait(self, timeout)
549 signaled = self._flag
550 if not signaled:
--> 551 signaled = self._cond.wait(timeout)
552 return signaled
553
/opt/conda/lib/python3.6/threading.py in wait(self, timeout)
297 else:
298 if timeout > 0:
--> 299 gotit = waiter.acquire(True, timeout)
300 else:
301 gotit = waiter.acquire(False)
KeyboardInterrupt:
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 26 (22 by maintainers)
Commits related to this issue
- BUG: Normalize address before comparison Closes https://github.com/dask/distributed/issues/2058 — committed to TomAugspurger/distributed by TomAugspurger 6 years ago
- BUG: Normalize address before comparison (#2066) Fixes https://github.com/dask/distributed/issues/2058 — committed to dask/distributed by TomAugspurger 6 years ago
@asifali22 what versions of distributed and joblib?
No, those two are aliases for the same backend.