distributed: distributed.joblib: AssertionError: yield from wasn't used with future
Hi all, thank you for the excellent work on such a great library first. I used distributed as a backend of joblib, but when delayed functions are fed data that is of large size, it seems distributed.joblib will try to scatter the data first and an AssertionError is raised as below:
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x000001CB0DF54AE8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 204, in maybe_to_futures
f = call_data_futures[arg]
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 67, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 1971626397592
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 759, in _run_callback
ret = callback()
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\stack_context.py", line 276, in null_wrapper
return fn(*args, **kwargs)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 780, in _discard_future_result
future.result()
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\gen.py", line 1113, in run
yielded = self.gen.send(value)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 244, in callback_wrapper
callback(result) # gets called in separate thread
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 343, in __call__
self.parallel.dispatch_next()
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 763, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 791, in dispatch_one_batch
self._dispatch(tasks)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 748, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 234, in apply_async
func, args = self._to_func_args(func)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 224, in _to_func_args
args = list(maybe_to_futures(args))
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 212, in maybe_to_futures
[f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future`
By comment out this scatter function, my code ran successfully and I think this may be a bug.
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 18 (11 by maintainers)
Commits related to this issue
- Minimal Reproducible Bug Example: for `distributed#2149` See the issue: https://github.com/dask/distributed/issues/2149 — committed to jjerphan/joblib_dask_deadlock by jjerphan 5 years ago
@jjerphan confirmed, thanks. Will take a closer look later today.
I believe this is related to joblib/joblib#852
My intuition is that some part of joblib is not playing well with the distributed engine.
If the input arrays given to
delayed()are smaller, everything works fine. I believe that the problem is related to themax_nbytesargument injoblib.Parallel. If the arrays sizes are larger than this number, aclient.scatter()is triggered from dask’s side, and ultimately creating the trouble.However, I am not 100% sure.
This problem also happen on Centos7 for both environments.
Python version: 3.6.8 Machine: