ray: [Client][Dask] Dask Errors on Ray Client

What is the problem?

When running a simple Dask/XGBoost script using Ray Client, length computation of the Dask Dataframe ultimately errors out with the following:

TypeError: 'ray._raylet.ObjectRef' object is not subscriptable

Ray version and other system information (Python version, TensorFlow version, OS):

(ray) ~/workspace/scratch/dask python --version
Python 3.8.10
(ray) ~/workspace/scratch/dask pip list | grep -E 'ray|dask|xgboost'
dask                                   2021.6.2
ray                                    2.0.0.dev0
ray-shuffling-data-loader              0.1.0
xgboost                                1.4.2
xgboost-ray                            0.1.0

Reproduction (REQUIRED)

  1. Run ray start --head on local machine.
  2. Create python script dask-xgboost.py:
import dask
import dask.dataframe as dd
import ray
from ray.util.dask import ray_dask_get
from xgboost_ray import RayDMatrix, RayParams, train

ray.client().connect()

S3_PATH = "s3://ray-ci-higgs/simpleHIGGS.csv"

dask.config.set(scheduler=ray_dask_get)
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
data = dd.read_csv(S3_PATH, names=colnames)
df_train = data
df_train = df_train.persist()
dtrain = RayDMatrix(df_train, label="label", columns=colnames)

evals_result = {}
config = {"tree_method": "hist", "eval_metric": ["logloss", "error"]}
train(
    params=config,
    dtrain=dtrain,
    evals_result=evals_result,
    ray_params=RayParams(
        max_actor_restarts=1, num_actors=4, cpus_per_actor=2),
    num_boost_round=100)
  1. Execute the script:
(ray) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py
Traceback (most recent call last):
  File "dask-xgboost.py", line 20, in <module>
    train(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1110, in train
    bst, train_evals_result, train_additional_results = ray.get(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 61, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 248, in _get
    raise err
types.RayTaskError(TypeError): ray::_wrapped() (pid=51862, ip=192.168.0.57)
  File "python/ray/_raylet.pyx", line 535, in ray._raylet.execute_task
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1102, in _wrapped
    bst = train(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1196, in train
    dtrain.load_data(ray_params.num_actors)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 741, in load_data
    refs, self.n = self.loader.load_data(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 317, in load_data
    data_source = self.get_data_source()
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 301, in get_data_source
    self._cached_n = data_source.get_n(self.data)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
    return len(data)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/dataframe/core.py", line 3880, in __len__
    return len(s)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/dataframe/core.py", line 564, in __len__
    return self.reduction(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable
  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 31 (31 by maintainers)

Most upvoted comments

My guess is that df_train.persist() in the user script is inlining Ray futures in the Dask collection, which XGBoost-on-Ray will try to use without the Dask-on-Ray scheduler since the dask.config.set(scheduler=ray_dask_get) in the user script isn’t propagated to the server. As I noted above, we can guard against this by modifying the XGBoost-on-Ray data source to eagerly set the scheduler to Dask-on-Ray globally.

@matthewdeng As a workaround, can you try removing the persisting of the training data from the script?

df_train = df_train.persist()

@clarkzinzow haha woops totally forgot that I had originally imported ray_dask_get in the python script, thanks for catching that. I updated it to use ray_dask_get_sync and am getting the same original error, which makes me think this may be a different issue than #16406?

(python37) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py  
Traceback (most recent call last):
  File "dask-xgboost.py", line 26, in <module>
    num_boost_round=100)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1160, in train
    **kwargs,
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 81, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 225, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/client/worker.py", line 248, in _get
    raise err
types.RayTaskError(TypeError): ray::_wrapped() (pid=66381, ip=192.168.0.57)
  File "python/ray/_raylet.pyx", line 534, in ray._raylet.execute_task
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1146, in _wrapped
    **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/main.py", line 1249, in train
    dtrain.load_data(ray_params.num_actors)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 746, in load_data
    self.num_actors, self.sharding, rank=rank)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 319, in load_data
    data_source = self.get_data_source()
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/matrix.py", line 303, in get_data_source
    self._cached_n = data_source.get_n(self.data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
    return len(data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/dataframe/core.py", line 3880, in __len__
    return len(s)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/dataframe/core.py", line 565, in __len__
    len, np.sum, token="len", meta=int, split_every=False
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/threaded.py", line 87, in get
    **kwargs
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/Users/matt/anaconda3/envs/python37/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable

@matthewdeng I’m able to reproduce this!