joblib: KeyError with joblib and sklearn cross_validate
This was originally reported as https://github.com/dask/distributed/issues/2532
I tried be reproduce it on joblib master with scikit-learn 0.20.2 as follows.
import os
os.environ['SKLEARN_SITE_JOBLIB'] = "1"
from dask.distributed import Client
from sklearn import datasets, linear_model
from sklearn.model_selection import cross_validate
import joblib
client = Client(processes=False)
joblib.parallel_backend('dask')
diabetes = datasets.load_diabetes()
X = diabetes.data[:150]
y = diabetes.target[:150]
model = linear_model.LinearRegression()
cv_results = cross_validate(model, X, y, cv=10, return_train_score=False,
verbose=100)
However this seem to freeze without reporting the original error. Instead when I interrupt with ctrl-c I get:
[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
[CV] ................................................................
[CV] .................................... , score=0.587, total= 0.0s
[Parallel(n_jobs=-1)]: Done 1 tasks | elapsed: 0.0s
^CTraceback (most recent call last):
File "/home/ogrisel/code/joblib/joblib/_dask.py", line 223, in maybe_to_futures
f = call_data_futures[arg]
File "/home/ogrisel/code/joblib/joblib/_dask.py", line 56, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 140545475489024
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ogrisel/tmp/joblib_dask_freeze.py", line 18, in <module>
verbose=100)
File "/home/ogrisel/code/scikit-learn/sklearn/model_selection/_validation.py", line 231, in cross_validate
for train, test in cv.split(X, y, groups))
File "/home/ogrisel/code/joblib/joblib/parallel.py", line 924, in __call__
while self.dispatch_one_batch(iterator):
File "/home/ogrisel/code/joblib/joblib/parallel.py", line 759, in dispatch_one_batch
self._dispatch(tasks)
File "/home/ogrisel/code/joblib/joblib/parallel.py", line 716, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "/home/ogrisel/code/joblib/joblib/_dask.py", line 254, in apply_async
func, args = self._to_func_args(func)
File "/home/ogrisel/code/joblib/joblib/_dask.py", line 243, in _to_func_args
args = list(maybe_to_futures(args))
File "/home/ogrisel/code/joblib/joblib/_dask.py", line 231, in maybe_to_futures
[f] = self.client.scatter([arg])
File "/home/ogrisel/code/distributed/distributed/client.py", line 1875, in scatter
asynchronous=asynchronous, hash=hash)
File "/home/ogrisel/code/distributed/distributed/client.py", line 676, in sync
return sync(self.loop, func, *args, **kwargs)
File "/home/ogrisel/code/distributed/distributed/utils.py", line 275, in sync
e.wait(10)
File "/opt/python3.7/lib/python3.7/threading.py", line 552, in wait
signaled = self._cond.wait(timeout)
File "/opt/python3.7/lib/python3.7/threading.py", line 300, in wait
gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 22 (19 by maintainers)
Commits related to this issue
- TST non regression test for #852 — committed to ogrisel/joblib by ogrisel 5 years ago
- TST non regression test for #852 — committed to pierreglaser/joblib by ogrisel 5 years ago
Thanks @pierreglaser ; I’ll have a look at it ASAP (cc @samronsin).
Indeed this is what I thought. I wonder why we don’t get the same issue with the call to
self.client.submit. Apparently it always work as a synchronous call probably because it’s expected to always be fast and therefore would not really benefit from anasynchronous=Trueoption.To implement solution 2. we would need to refactor all the functions called under
apply_asyncto work either as synchronous functions or awaitable coroutines using theself.client.synctrick already used inself.client.scatter. I will try to find the time to give it a try this afternoon or tomorrow. If anywayone else wants to try, please feel free to do it as well though 😃The
scatteris called by theapply_asyncmethod. Theapply_asynccan be either called by thejoblib.Parallel.__call__in the main thread or via thecallbackthat is executed asynchronously in thecallback_wrappercoroutine in the thread that runs the tornado event loop.Basically
apply_asynccalls are chained in joblib: once a task has completed it triggers the submission of the next (batch of) tasks via the callback.I dumped the stacktrace using
faulthandlerand @ogrisel’s script:Stacktrace: