joblib: KeyError with joblib and sklearn cross_validate

This was originally reported as https://github.com/dask/distributed/issues/2532

I tried be reproduce it on joblib master with scikit-learn 0.20.2 as follows.

import os
os.environ['SKLEARN_SITE_JOBLIB'] = "1"
from dask.distributed import Client
from sklearn import datasets, linear_model
from sklearn.model_selection import cross_validate
import joblib


client = Client(processes=False)
joblib.parallel_backend('dask')

diabetes = datasets.load_diabetes()
X = diabetes.data[:150]
y = diabetes.target[:150]
model = linear_model.LinearRegression()

cv_results = cross_validate(model, X, y, cv=10, return_train_score=False,
                            verbose=100)

However this seem to freeze without reporting the original error. Instead when I interrupt with ctrl-c I get:

[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 4 concurrent workers.
[CV]  ................................................................
[CV] .................................... , score=0.587, total=   0.0s
[Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    0.0s
^CTraceback (most recent call last):
  File "/home/ogrisel/code/joblib/joblib/_dask.py", line 223, in maybe_to_futures
    f = call_data_futures[arg]
  File "/home/ogrisel/code/joblib/joblib/_dask.py", line 56, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 140545475489024

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ogrisel/tmp/joblib_dask_freeze.py", line 18, in <module>
    verbose=100)
  File "/home/ogrisel/code/scikit-learn/sklearn/model_selection/_validation.py", line 231, in cross_validate
    for train, test in cv.split(X, y, groups))
  File "/home/ogrisel/code/joblib/joblib/parallel.py", line 924, in __call__
    while self.dispatch_one_batch(iterator):
  File "/home/ogrisel/code/joblib/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "/home/ogrisel/code/joblib/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/home/ogrisel/code/joblib/joblib/_dask.py", line 254, in apply_async
    func, args = self._to_func_args(func)
  File "/home/ogrisel/code/joblib/joblib/_dask.py", line 243, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/home/ogrisel/code/joblib/joblib/_dask.py", line 231, in maybe_to_futures
    [f] = self.client.scatter([arg])
  File "/home/ogrisel/code/distributed/distributed/client.py", line 1875, in scatter
    asynchronous=asynchronous, hash=hash)
  File "/home/ogrisel/code/distributed/distributed/client.py", line 676, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/ogrisel/code/distributed/distributed/utils.py", line 275, in sync
    e.wait(10)
  File "/opt/python3.7/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/opt/python3.7/lib/python3.7/threading.py", line 300, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 22 (19 by maintainers)

Commits related to this issue

Most upvoted comments

Thanks @pierreglaser ; I’ll have a look at it ASAP (cc @samronsin).

Indeed this is what I thought. I wonder why we don’t get the same issue with the call to self.client.submit. Apparently it always work as a synchronous call probably because it’s expected to always be fast and therefore would not really benefit from an asynchronous=True option.

To implement solution 2. we would need to refactor all the functions called under apply_async to work either as synchronous functions or awaitable coroutines using the self.client.sync trick already used in self.client.scatter. I will try to find the time to give it a try this afternoon or tomorrow. If anywayone else wants to try, please feel free to do it as well though 😃

The scatter is called by the apply_async method. The apply_async can be either called by the joblib.Parallel.__call__ in the main thread or via the callback that is executed asynchronously in the callback_wrapper coroutine in the thread that runs the tornado event loop.

Basically apply_async calls are chained in joblib: once a task has completed it triggers the submission of the next (batch of) tasks via the callback.

I dumped the stacktrace using faulthandler and @ogrisel’s script:

from dask.distributed import Client
from sklearn import datasets, linear_model
from sklearn.model_selection import cross_validate
import faulthandler
import joblib

faulthandler.dump_traceback_later(timeout=10, exit=True)

client = Client(processes=False)

diabetes = datasets.load_diabetes()
X = diabetes.data[:150]
y = diabetes.target[:150]
model = linear_model.LinearRegression()

joblib.parallel_backend("dask")

cv_results = cross_validate(
    model, X, y, cv=20, return_train_score=False, verbose=100
)

Stacktrace:

Thread 0x00007f61fb873700 (most recent call first):
  File "/usr/lib/python3.7/threading.py", line 300 in wait
  File "/usr/lib/python3.7/queue.py", line 179 in get
  File "/home/pierreglaser/repos/distributed/distributed/threadpoolexecutor.py", line 53 in _worker
  File "/usr/lib/python3.7/threading.py", line 865 in run
  File "/usr/lib/python3.7/threading.py", line 917 in _bootstrap_inner
  File "/usr/lib/python3.7/threading.py", line 885 in _bootstrap

Thread 0x00007f61fb072700 (most recent call first):
  File "/home/pierreglaser/repos/distributed/distributed/profile.py", line 272 in _watch
  File "/usr/lib/python3.7/threading.py", line 865 in run
  File "/usr/lib/python3.7/threading.py", line 917 in _bootstrap_inner
  File "/usr/lib/python3.7/threading.py", line 885 in _bootstrap

Thread 0x00007f61fa871700 (most recent call first):
  File "/home/pierreglaser/repos/joblib_master/joblib/parallel.py", line 307 in __call__
  File "/home/pierreglaser/repos/joblib_master/joblib/_dask.py", line 265 in callback_wrapper
  File "/home/pierreglaser/.virtualenvs/joblib_dask_py37/lib/python3.7/site-packages/tornado/gen.py", line 748 in run
  File "/home/pierreglaser/.virtualenvs/joblib_dask_py37/lib/python3.7/site-packages/tornado/gen.py", line 787 in inner
  File "/home/pierreglaser/.virtualenvs/joblib_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 743 in _run_callback
  File "/home/pierreglaser/.virtualenvs/joblib_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 690 in <lambda>
  File "/usr/lib/python3.7/asyncio/events.py", line 88 in _run
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1764 in _run_once
  File "/usr/lib/python3.7/asyncio/base_events.py", line 528 in run_forever
  File "/home/pierreglaser/.virtualenvs/joblib_dask_py37/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 148 in start
  File "/home/pierreglaser/repos/distributed/distributed/utils.py", line 411 in run_loop
  File "/usr/lib/python3.7/threading.py", line 865 in run
  File "/usr/lib/python3.7/threading.py", line 917 in _bootstrap_inner
  File "/usr/lib/python3.7/threading.py", line 885 in _bootstrap

Thread 0x00007f6204a41740 (most recent call first):
  File "/usr/lib/python3.7/threading.py", line 300 in wait
  File "/usr/lib/python3.7/threading.py", line 552 in wait
  File "/home/pierreglaser/repos/distributed/distributed/utils.py", line 330 in sync
  File "/home/pierreglaser/repos/distributed/distributed/client.py", line 763 in sync
  File "/home/pierreglaser/repos/distributed/distributed/client.py", line 2030 in scatter
  File "/home/pierreglaser/repos/joblib_master/joblib/_dask.py", line 232 in maybe_to_futures
  File "/home/pierreglaser/repos/joblib_master/joblib/_dask.py", line 244 in _to_func_args
  File "/home/pierreglaser/repos/joblib_master/joblib/_dask.py", line 255 in apply_async
  File "/home/pierreglaser/repos/joblib_master/joblib/parallel.py", line 716 in _dispatch
  File "/home/pierreglaser/repos/joblib_master/joblib/parallel.py", line 760 in dispatch_one_batch
  File "/home/pierreglaser/repos/joblib_master/joblib/parallel.py", line 926 in __call__
  File "/home/pierreglaser/.virtualenvs/joblib_dask_py37/lib/python3.7/site-packages/sklearn/model_selection/_validation.py", line 231 in cross_validate
  File "test_hang_joblib.py", line 19 in <module>