ray: [Client][Dask] Dask Errors on Ray Client
What is the problem?
When running a simple Dask/XGBoost script using Ray Client, length computation of the Dask Dataframe ultimately errors out with the following:
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable
Ray version and other system information (Python version, TensorFlow version, OS):
(ray) ~/workspace/scratch/dask python --version
Python 3.8.10
(ray) ~/workspace/scratch/dask pip list | grep -E 'ray|dask|xgboost'
dask 2021.6.2
ray 2.0.0.dev0
ray-shuffling-data-loader 0.1.0
xgboost 1.4.2
xgboost-ray 0.1.0
Reproduction (REQUIRED)
- Run
ray start --headon local machine. - Create python script
dask-xgboost.py:
import dask
import dask.dataframe as dd
import ray
from ray.util.dask import ray_dask_get
from xgboost_ray import RayDMatrix, RayParams, train
ray.client().connect()
S3_PATH = "s3://ray-ci-higgs/simpleHIGGS.csv"
dask.config.set(scheduler=ray_dask_get)
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
data = dd.read_csv(S3_PATH, names=colnames)
df_train = data
df_train = df_train.persist()
dtrain = RayDMatrix(df_train, label="label", columns=colnames)
evals_result = {}
config = {"tree_method": "hist", "eval_metric": ["logloss", "error"]}
train(
params=config,
dtrain=dtrain,
evals_result=evals_result,
ray_params=RayParams(
max_actor_restarts=1, num_actors=4, cpus_per_actor=2),
num_boost_round=100)
- Execute the script:
(ray) ~/workspace/scratch/dask RAY_ADDRESS="localhost:10001" python dask-xgboost.py
Traceback (most recent call last):
File "dask-xgboost.py", line 20, in <module>
train(
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1110, in train
bst, train_evals_result, train_additional_results = ray.get(
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 61, in wrapper
return getattr(ray, func.__name__)(*args, **kwargs)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/api.py", line 42, in get
return self.worker.get(vals, timeout=timeout)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 225, in get
res = self._get(obj_ref, op_timeout)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 248, in _get
raise err
types.RayTaskError(TypeError): ray::_wrapped() (pid=51862, ip=192.168.0.57)
File "python/ray/_raylet.pyx", line 535, in ray._raylet.execute_task
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
return function(*args, **kwargs)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1102, in _wrapped
bst = train(
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/main.py", line 1196, in train
dtrain.load_data(ray_params.num_actors)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 741, in load_data
refs, self.n = self.loader.load_data(
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 317, in load_data
data_source = self.get_data_source()
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/matrix.py", line 301, in get_data_source
self._cached_n = data_source.get_n(self.data)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/xgboost_ray/data_sources/dask.py", line 118, in get_n
return len(data)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/dataframe/core.py", line 3880, in __len__
return len(s)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/dataframe/core.py", line 564, in __len__
return self.reduction(
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/base.py", line 285, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/base.py", line 567, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
results = get_async(
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 514, in get_async
raise_exception(exc, tb)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 325, in reraise
raise exc
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/local.py", line 223, in execute_task
result = _execute_task(task, data)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/optimization.py", line 969, in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 151, in get
result = _execute_task(task, cache)
File "/Users/matt/anaconda3/envs/ray/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
TypeError: 'ray._raylet.ObjectRef' object is not subscriptable
- I have verified my script runs in a clean environment and reproduces the issue.
- I have verified the issue also occurs with the latest wheels.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 31 (31 by maintainers)
My guess is that
df_train.persist()in the user script is inlining Ray futures in the Dask collection, which XGBoost-on-Ray will try to use without the Dask-on-Ray scheduler since thedask.config.set(scheduler=ray_dask_get)in the user script isn’t propagated to the server. As I noted above, we can guard against this by modifying the XGBoost-on-Ray data source to eagerly set the scheduler to Dask-on-Ray globally.@matthewdeng As a workaround, can you try removing the persisting of the training data from the script?
@clarkzinzow haha woops totally forgot that I had originally imported
ray_dask_getin the python script, thanks for catching that. I updated it to useray_dask_get_syncand am getting the same original error, which makes me think this may be a different issue than #16406?@matthewdeng I’m able to reproduce this!