xgboost: xgboost.dask.DaskXGBClassifier not working with >1 dask distributed worker in case of large datasets

Hi XGBoost devs, I am running the this code on an EC2 machine with 32 threads and 128 GB ram. The size of csv being loaded in 800MB.

class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, columns=[]):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        cols_missing = list(set(self.columns) - set(X.columns))
        print(cols_missing)
        for cols in cols_missing:
            X[cols] = np.nan
        return X[self.columns]


cluster = LocalCluster(n_workers=1)
client = Client(cluster)
client

lp3 = dd.read_csv('./collection_model/collection_train.csv')
lp3

pre_pipe = Pipeline([
('colsel', ColumnSelector(columns=['column1', 'column2', ......]],
  )),
('missna', CustomMissingImputer()),
])
post_pipe= Pipeline([
('pre_pipe', pre_pipe),
('impute', IterativeImputer(n_nearest_features=5, max_iter=100, random_state=0)),
('qt', QuantileTransformer(n_quantiles=10))])

pi= xgboost.dask.DaskXGBClassifier(tree_method='hist')
pi.client= client

param_grid = {
    'learning_rate': [0.1, 0.2],
    'n_estimators': [100],
    'reg_lambda': [0.7],
    }


kfold = 5
skf = model_selection.StratifiedKFold(
    n_splits = kfold, shuffle = True, random_state = 30
)


scoring_1=make_scorer(ks_scorer, greater_is_better = True, needs_proba = True)
scoring={'auc': 'roc_auc', 'ks_scorer': scoring_1}


clf=GridSearchCV(
        estimator = pi,
        param_grid = param_grid,
        verbose = 5,
        cv = skf,
        iid = True,
        return_train_score = True,
        scoring = 'neg_mean_squared_error',
        refit = False
    )


pp = post_pipe.fit_transform(lp3,lp3['target'])        

label = da.from_array(np.array(lp3['target'].astype(int).compute()), chunks=200000)
clf.fit(da.from_array(pp, chunks=200000),label)
clf.cv_results_

It works if the model is trained using a subset of the features with worker=1.

Some cases where it fails :

  1. Same subset of features and with workers > 1, It keeps on running in the notebook with no result. In terminal, WARNING: /home/conda/feedstock_root/build_artifacts/xgboost_1584539733809/work/src/objective/regression_obj.cu:58: Label set is empty.

  2. Using all features with worker=1, it gives memory warnings in the terminal

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 118.65 GB -- Worker memory limit: 126.75 GB```
and after some time error in notebook
```KilledWorker: ("('from-delayed-pandas_read_text-read-block-assign-d8c832b8fa114d4e528a9953dd6402de', 0)", <Worker 'tcp://127.0.0.1:40365', name: 0, memory: 0, processing: 11>)

How can a 800MB csv file consume 118 GB memory ?

Also, there is not ‘predict_proba’ attribute in DaskXgbClassifier, so metrics like roc_auc gives error.

Currently, we are using xgboost with sklearn gridsearch(to distribute the fits). With large datasets, hyper-parameter tuning jobs with 4k-5k fits take days to complete on EC2 and sagemaker.

We are trying dask xgboost to reduce training time.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 24 (12 by maintainers)

Most upvoted comments

Yes, will come back to this after 1.1 release.