scikit-learn: Error thrown when calling fit on RFECV of a pipeline with n_jobs=-1 in version 0.20.0

Description

Error thrown when calling fit on RFECV of a pipeline with n_jobs=-1 in version 0.20.0 This wasn’t a problem on version 0.19.2 and previous. It also works when n_jobs in RFECV is not declared or is equal to 1. Why is a pipeline not pickable?

Steps/Code to Reproduce

# Load libraries
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn import linear_model, datasets
from sklearn.model_selection import GridSearchCV
from sklearn import feature_selection
from sklearn import preprocessing
from sklearn.model_selection import StratifiedKFold
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.pipeline import Pipeline

# Load data
iris = sns.load_dataset("iris")
iris.head()

# 1. Instatiate
le = preprocessing.LabelEncoder()

# 2/3. Fit and transform
X = iris.apply(le.fit_transform)
target = X['species']
del X['species']

#Class defining
class PipelineRFE(Pipeline):

    def fit(self, X, y=None, **fit_params):
        super(PipelineRFE, self).fit(X, y, **fit_params)
        self.feature_importances_ = self.steps[-1][-1].feature_importances_
        return self

#pipeline
pipe = PipelineRFE([
    ('std_scaler', preprocessing.StandardScaler()),
    ("ET", ExtraTreesRegressor(random_state=42, n_estimators=250))
])

# Sets RNG seed to reproduce results
kf = StratifiedKFold(random_state=42)

feature_selector_cv = feature_selection.RFECV(pipe, cv=kf, step=1, scoring="neg_mean_squared_error", n_jobs=-1)
feature_selector_cv.fit(X, target)

selected_features = X.columns.values[feature_selector_cv.support_].tolist()
print(selected_features)

Expected Results

No error is thrown. Prints selected_features.

Actual Results

No handlers could be found for logger "concurrent.futures"
---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-11-676fd87a9b51> in <module>()
     10 
     11 feature_selector_cv = feature_selection.RFECV(pipe, cv=10, step=1, scoring="neg_mean_squared_error", n_jobs=-1)
---> 12 feature_selector_cv.fit(X, target)

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/sklearn/feature_selection/rfe.pyc in fit(self, X, y, groups)
    510         scores = parallel(
    511             func(rfe, self.estimator, X, y, train, test, scorer)
--> 512             for train, test in cv.split(X, y, groups))
    513 
    514         scores = np.sum(scores, axis=0)

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/sklearn/externals/joblib/parallel.pyc in __call__(self, iterable)
    994 
    995             with self._backend.retrieval_context():
--> 996                 self.retrieve()
    997             # Make sure that we get a last message telling us we are done
    998             elapsed_time = time.time() - self._start_time

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/sklearn/externals/joblib/parallel.pyc in retrieve(self)
    897             try:
    898                 if getattr(self._backend, 'supports_timeout', False):
--> 899                     self._output.extend(job.get(timeout=self.timeout))
    900                 else:
    901                     self._output.extend(job.get())

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/sklearn/externals/joblib/_parallel_backends.pyc in wrap_future_result(future, timeout)
    515         AsyncResults.get from multiprocessing."""
    516         try:
--> 517             return future.result(timeout=timeout)
    518         except LokyTimeoutError:
    519             raise TimeoutError()

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/sklearn/externals/joblib/externals/loky/_base.pyc in result(self, timeout)
    431                     raise CancelledError()
    432                 elif self._state == FINISHED:
--> 433                     return self.__get_result()
    434                 else:
    435                     raise TimeoutError()

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/sklearn/externals/joblib/externals/loky/_base.pyc in __get_result(self)
    379         def __get_result(self):
    380             if self._exception:
--> 381                 raise self._exception
    382             else:
    383                 return self._result

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

Versions

System

python: 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016, 12:39:47)  [GCC 4.2.1 (Apple Inc. build 5666) (dot 3)]

machine: Darwin-17.7.0-x86_64-i386-64bit executable: /Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python

BLAS

macros: NO_ATLAS_INFO=3, HAVE_CBLAS=None

cblas_libs: cblas lib_dirs:

Python deps

Cython: None
 scipy: 1.1.0

setuptools: 39.1.0 pip: 18.0 numpy: 1.14.3 pandas: 0.22.0 sklearn: 0.20.0

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 22 (16 by maintainers)

Most upvoted comments

@ogrisel and team, thank you for your hard work and exceptional effort you put into scikit!

Regarding this issue, I found that patching the line 643 of _search.py in model_selection submodule of scikit with explicit regression to backend="multiprocessing" seems to have solved the issue for me, when prototyping in ipython notbook.

       parallel = Parallel(n_jobs=self.n_jobs, backend="multiprocessing",
                            verbose=self.verbose, pre_dispatch=self.pre_dispatch)

I haven’t tested its performance penalties, since it is a temporary hacky fix.

This is slightly non trivial, but in order to use the context manager for setting parallel backends I had to import from scikit’s utils module, instead of joblib. It seems that using the latter does not affect the actually used backend in scikit.

The reason for explicit backend management arose from the errors during pickling of forward-inverse functions for FunctionTransformer defined in an interactive session in IPython. The errors were much like the ones in the initial post of this issue thread.

Below is a slightly contrived example that illustrates the details. The root of the problem here is stable_log. If i use np.log instead, all tests return OK. However, if stable_log is used, then only sklearn-multiprocessing pair returns OK, others fail with A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

import numpy as np
from sklearn.datasets import make_regression
from sklearn.preprocessing import FunctionTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline

# toy dataset
X, y = make_regression(random_state=42)
X = np.exp(X)

# the real input data might have small negative values -- clip them
def stable_log(x):
    return np.log(np.maximum(x, 1e-4))

# set up the estimator with preprocessing
estimator = Pipeline([
    ("txform", FunctionTransformer(stable_log, np.exp,
                                   validate=False, check_inverse=False)),
    ("forest", RandomForestRegressor(random_state=123)),
])

cv_results = GridSearchCV(estimator, [{
    "forest__n_estimators": [10, 100],
    "forest__min_samples_split": [5, 25],
    "forest__max_features": [0.50, 1.00],
}], cv=7, n_jobs=2, return_train_score=False, iid=False)

## >>>> Test starts HERE
# import managers from different libs
from sklearn.utils import parallel_backend as sk_pb
from joblib import parallel_backend as jl_pb

for lib, ctx, back in [("joblib", jl_pb, "multiprocessing"),
                       ("sklearn", sk_pb, "multiprocessing"),
                       ("joblib", jl_pb, "loky"),
                       ("sklearn", sk_pb, "loky")]:
    with ctx(backend=back):
        status = "OK"
        try:
            cv_results.fit(X, y)
        except Exception as e:
            status = f"{e}"
            # raise
    # end with

    print(f"backend {lib}-{back}: {status}")
# end for

@glemaitre solution of importing the class works for me. In the mean time, I analysed the runtimes and it is slower to run the RFECV with n_jobs=-1 than it is with n_jobs=1. @ogrisel may this be due to the joblib detection failure you mentioned?

possibly because it’s defined in the same file

Moving the the class to another class and importing it will solve the issue as you mentioned.

Probably @ogrisel @tomMoral will have some insights.

Looks like this has something to do with the use of a custom class (possibly because it’s defined in the same file). In this case it doesn’t look like you need it because the scaling is unnecessary for the tree. But in general this should work (in the sense that we want this to work).

For the record, this works:

feature_selector_cv = feature_selection.RFECV( ExtraTreesRegressor(random_state=42, n_estimators=250), cv=kf, step=1, scoring="neg_mean_squared_error", n_jobs=-1)
feature_selector_cv.fit(X, target)