distributed: distributed.joblib: AssertionError: yield from wasn't used with future

Hi all, thank you for the excellent work on such a great library first. I used distributed as a backend of joblib, but when delayed functions are fed data that is of large size, it seems distributed.joblib will try to scatter the data first and an AssertionError is raised as below:

tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x000001CB0DF54AE8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 204, in maybe_to_futures
    f = call_data_futures[arg]
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 67, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 1971626397592

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 759, in _run_callback
    ret = callback()
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 780, in _discard_future_result
    future.result()
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 244, in callback_wrapper
    callback(result)  # gets called in separate thread
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 343, in __call__
    self.parallel.dispatch_next()
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 763, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 791, in dispatch_one_batch
    self._dispatch(tasks)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 748, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 234, in apply_async
    func, args = self._to_func_args(func)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 224, in _to_func_args
    args = list(maybe_to_futures(args))
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 212, in maybe_to_futures
    [f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future`

By comment out this scatter function, my code ran successfully and I think this may be a bug.

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 18 (11 by maintainers)

Commits related to this issue

Most upvoted comments

@jjerphan confirmed, thanks. Will take a closer look later today.

I believe this is related to joblib/joblib#852

My intuition is that some part of joblib is not playing well with the distributed engine.

If the input arrays given to delayed() are smaller, everything works fine. I believe that the problem is related to the max_nbytes argument in joblib.Parallel. If the arrays sizes are larger than this number, a client.scatter() is triggered from dask’s side, and ultimately creating the trouble.

However, I am not 100% sure.

This problem also happen on Centos7 for both environments.


Python version: 3.6.8 Machine:

$ uname -a
Linux 6ea377206ec7 4.9.125-linuxkit #1 SMP Fri Sep 7 08:20:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux