distributed: KeyError in distributed joblib

I haven’t been able to reproduce the locally yet.

This is on distributed, dask, & dask-ml master, and the scikit-learn / joblib we used at the sprint.

I’m doing

%%time
with joblib.parallel_backend("dask"):
    gs.fit(X, y, classes=[0, 1])

with X and y being dask arrays (so can’t pre-scatter).

The behavior I observe is

  1. Make the call, tasks show up in the dashboard
  2. A short time later, the tasks go black / gray, indicating they failed
  3. The notebook is hanging
  4. ctrl-C to interrupt (the keyboard interrupt you see in the exception below)
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fc37ad6fbf8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 204, in maybe_to_futures
    f = call_data_futures[arg]
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 67, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 140477560921584

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 759, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 780, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 244, in callback_wrapper
    callback(result)  # gets called in separate thread
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 326, in __call__
    self.parallel.dispatch_next()
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 746, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 774, in dispatch_one_batch
    self._dispatch(tasks)
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 731, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 234, in apply_async
    func, args = self._to_func_args(func)
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 224, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 212, in maybe_to_futures
    [f] = self.client.scatter([arg], broadcast=3)
AssertionError: yield from wasn't used with future

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in maybe_to_futures(args)
    203                     try:
--> 204                         f = call_data_futures[arg]
    205                     except KeyError:

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in __getitem__(self, obj)
     66     def __getitem__(self, obj):
---> 67         ref, val = self._data[id(obj)]
     68         if ref() is not obj:

KeyError: 140477560921344

During handling of the above exception, another exception occurred:

KeyboardInterrupt                         Traceback (most recent call last)
<timed exec> in <module>()

~/src/scikit-learn/sklearn/model_selection/_search.py in fit(self, X, y, groups, **fit_params)
    658                                   error_score=self.error_score)
    659           for parameters, (train, test) in product(candidate_params,
--> 660                                                    cv.split(X, y, groups)))
    661 
    662         # if one choose to see train score, "out" will contain train score info

~/src/scikit-learn/sklearn/externals/joblib/parallel.py in __call__(self, iterable)
    943                 self._iterating = self._original_iterator is not None
    944 
--> 945             while self.dispatch_one_batch(iterator):
    946                 pass
    947 

~/src/scikit-learn/sklearn/externals/joblib/parallel.py in dispatch_one_batch(self, iterator)
    772                 return False
    773             else:
--> 774                 self._dispatch(tasks)
    775                 return True
    776 

~/src/scikit-learn/sklearn/externals/joblib/parallel.py in _dispatch(self, batch)
    729         with self._lock:
    730             job_idx = len(self._jobs)
--> 731             job = self._backend.apply_async(batch, callback=cb)
    732             # A job can complete so quickly than its callback is
    733             # called before we get here, causing self._jobs to

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in apply_async(self, func, callback)
    232     def apply_async(self, func, callback=None):
    233         key = '%s-batch-%s' % (joblib_funcname(func), uuid4().hex)
--> 234         func, args = self._to_func_args(func)
    235 
    236         future = self.client.submit(func, *args, key=key, **self.submit_kwargs)

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in _to_func_args(self, func)
    222         tasks = []
    223         for f, args, kwargs in func.items:
--> 224             args = list(maybe_to_futures(args))
    225             kwargs = dict(zip(kwargs.keys(), maybe_to_futures(kwargs.values())))
    226             tasks.append((f, args, kwargs))

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in maybe_to_futures(args)
    210                             # more workers need to reuse this data concurrently
    211                             # beyond the initial broadcast arity.
--> 212                             [f] = self.client.scatter([arg], broadcast=3)
    213                             call_data_futures[arg] = f
    214 

/opt/conda/lib/python3.6/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, maxsize, timeout, asynchronous)
   1771                              broadcast=broadcast, direct=direct,
   1772                              local_worker=local_worker, timeout=timeout,
-> 1773                              asynchronous=asynchronous, hash=hash)
   1774 
   1775     @gen.coroutine

/opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    650             return future
    651         else:
--> 652             return sync(self.loop, func, *args, **kwargs)
    653 
    654     def __repr__(self):

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    271     else:
    272         while not e.is_set():
--> 273             e.wait(10)
    274     if error[0]:
    275         six.reraise(*error[0])

/opt/conda/lib/python3.6/threading.py in wait(self, timeout)
    549             signaled = self._flag
    550             if not signaled:
--> 551                 signaled = self._cond.wait(timeout)
    552             return signaled
    553 

/opt/conda/lib/python3.6/threading.py in wait(self, timeout)
    297             else:
    298                 if timeout > 0:
--> 299                     gotit = waiter.acquire(True, timeout)
    300                 else:
    301                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 26 (22 by maintainers)

Commits related to this issue

Most upvoted comments

@asifali22 what versions of distributed and joblib?

Does it make any difference on performance or any other metrics?

No, those two are aliases for the same backend.