ray: [rllib] can't convert CUDA tensor to numpy for multi-agent serving in torch
What is the problem?
- Ray version 0.9.0.dev0 (latest wheel)
- TensorFlow version 2.1.0 (GPU)
- PyTorch version 1.4.0
- Python version 3.6.9
- Linux 5.3.0-28-generic 30~18.04.1-Ubuntu SMP x86_64 x86_64 x86_64 GNU/Linux
I am using a multi-agent setup with PPO and PyTorch. I set up a basic environment and now want to run serving in this environment. This works fine with TensorFlow, but when using PyTorch the exception can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first is thrown, which seems to originate from the Torch PPO Policy implementation.
Reproduction (REQUIRED)
Mock environment issue_world.py:
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding, EzPickle
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class World(MultiAgentEnv, EzPickle):
def __init__(self, env_config):
EzPickle.__init__(self)
self.seed()
self.cfg = env_config
self.observation_space = spaces.Dict({
'id': spaces.Box(0, self.cfg['n_agents'], shape=(1,), dtype=np.int),
'states': spaces.Dict({
i: spaces.Dict({
'map': spaces.Box(0, 2, shape=(42, 42, 2), dtype=np.float32),
'pos': spaces.Box(low=np.array([0,0]), high=np.array([42,42]), dtype=np.int)
}) for i in range(self.cfg['n_agents'])
}),
})
self.action_space = spaces.Discrete(5)
self.reset()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self.agents = {f'a_{i}': np.array([0, 0]) for i in range(self.cfg['n_agents'])}
self.dones = {key: False for key in self.agents.keys()}
return self.step({key: 0 for key in self.agents.keys()})[0]
def step(self, actions):
states, rewards = {}, {}
for key, action in actions.items():
state = np.zeros((42, 42, 2))
reward = 0
if action == 1:
self.agents[key][0] += 1
reward = 1
states[key] = state
rewards[key] = reward
if self.agents[key][0] > 10:
self.dones[key] = True
self.dones['__all__'] = any(self.dones.values())
get_key_index = lambda key: int(list(self.agents.keys()).index(key))
all_states = {get_key_index(key): {'map': np.zeros((42, 42, 2)), 'pos': self.agents[key]} for key in self.agents.keys()}
final_states = {key: {'id': np.array([get_key_index(key)]), 'states': all_states} for key in actions.keys()}
return final_states, rewards, self.dones, {}
def render(self, mode='human'):
pass
Custom Torch model issue_model.py:
from ray.rllib.utils import try_import_torch
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.misc import normc_initializer, valid_padding, SlimConv2d, SlimFC
from ray.rllib.utils.annotations import override
import numpy as np
torch, nn = try_import_torch()
class AdaptedVisionNetwork(TorchModelV2, nn.Module):
"""Generic vision network."""
def __init__(self, obs_space, action_space, num_outputs, model_config, name):
TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
nn.Module.__init__(self)
self.cfg = model_config['custom_options']
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
filters = [[16, [8, 8], 4], [32, [4, 4], 2], [32, [4, 4], 2]]
layers = []
(w, h, in_channels) = (42, 42, 2)
in_size = [w, h]
for out_channels, kernel, stride in filters[:-1]:
padding, out_size = valid_padding(in_size, kernel, [stride, stride])
layers.append(SlimConv2d(in_channels, out_channels, kernel, stride, padding))
in_channels = out_channels
in_size = out_size
out_channels, kernel, stride = filters[-1]
layers.append(SlimConv2d(in_channels, out_channels, kernel, stride, None))
layers.append(nn.Flatten(1, -1))
self._convs = nn.Sequential(*layers).to(self.device)
self._logits = SlimFC(
128, num_outputs, initializer=nn.init.xavier_uniform_).to(self.device)
self._value_branch = SlimFC(
128, 1, initializer=normc_initializer()).to(self.device)
self._cur_value = None
@override(TorchModelV2)
def forward(self, input_dict, state, seq_lens):
featureMap = self._hidden_layers(input_dict["obs"]['states'][0]['map']) # just for demo purposes
logits = self._logits(featureMap)
self._cur_value = self._value_branch(featureMap).squeeze(1)
return logits, state
@override(TorchModelV2)
def value_function(self):
assert self._cur_value is not None, "must call forward() first"
return self._cur_value
def _hidden_layers(self, obs):
res = self._convs(obs.permute(0, 3, 1, 2)) # switch to channel-major
return res
Model training script train.py (just run for one iteration):
import numpy as np
import gym
from gym import spaces
from issue_world import World
import matplotlib.pyplot as plt
import ray
from ray import tune
from ray.rllib.utils import try_import_torch
from ray.rllib.models import ModelCatalog
import numpy as np
from issue_model import AdaptedVisionNetwork
from issue_model_tf import AdaptedVisionNetwork as AdaptedVisionNetworkTF
if __name__ == '__main__':
ray.init()
ModelCatalog.register_custom_model("vis_torch", AdaptedVisionNetwork)
ModelCatalog.register_custom_model("vis_tf", AdaptedVisionNetworkTF)
tune.run(
"PPO",
checkpoint_freq=1,
config={
"use_pytorch": True,
"env": World,
"num_sgd_iter": 10,
"num_workers": 1,
"num_envs_per_worker": 1,
"num_gpus": 1,
"model": {"custom_model": "vis_torch"},
#"model": {"custom_model": "vis_tf"},
"env_config": {
'world_shape': (42, 42),
'n_agents': 3
}
}
)
Serving script serving_server.py based on this (after the first checkpoint was created in the previous script put the correct checkpoint path in CHECKPOINT_FILE):
import os
from gym import spaces
import numpy as np
import ray
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.env.external_multi_agent_env import ExternalMultiAgentEnv
from ray.rllib.utils.policy_server import PolicyServer
from ray.tune.registry import register_env
from issue_model import AdaptedVisionNetwork
from issue_model_tf import AdaptedVisionNetwork as AdaptedVisionNetworkTF
from ray.rllib.models import ModelCatalog
SERVER_ADDRESS = "localhost"
SERVER_PORT = 9900
CHECKPOINT_FILE = "path/to/checkpoint"
class WorldServing(ExternalMultiAgentEnv):
def __init__(self, config):
self.cfg = config
ExternalMultiAgentEnv.__init__(
self, spaces.Discrete(5),
spaces.Dict({
'id': spaces.Box(0, self.cfg['n_agents'], shape=(1,), dtype=np.int),
'states': spaces.Dict({
i: spaces.Dict({
'map': spaces.Box(0, 2, shape=(42, 42, 2), dtype=np.float32),
'pos': spaces.Box(low=np.array([0,0]), high=np.array([42,42]), dtype=np.int)
}) for i in range(self.cfg['n_agents'])
}),
}))
def run(self):
print("Starting policy server at {}:{}".format(SERVER_ADDRESS,
SERVER_PORT))
server = PolicyServer(self, SERVER_ADDRESS, SERVER_PORT)
server.serve_forever()
if __name__ == "__main__":
ray.init()
register_env("srv", lambda config: WorldServing(config))
ModelCatalog.register_custom_model("vis_torch", AdaptedVisionNetwork)
ModelCatalog.register_custom_model("vis_tf", AdaptedVisionNetworkTF)
ppo = PPOTrainer(
env="srv",
config={
"use_pytorch": True,
"num_workers": 0,
"timesteps_per_iteration": 200,
"env_config": {
'world_shape': (42, 42),
'n_agents': 3
},
"model": {"custom_model": "vis_torch"}
#"model": {"custom_model": "vis_tf"}
})
ppo.restore(CHECKPOINT_FILE)
print("restored")
while True:
ppo.train()
And the client script serving_client.py based on this (run after the last script has successfully started):
import gym
from issue_world import World
from ray.rllib.utils.policy_client import PolicyClient
if __name__ == "__main__":
env = World({
'world_shape': (42, 42),
'n_agents': 3
})
client = PolicyClient("http://localhost:9900")
eid = client.start_episode(training_enabled=False)
obs = env.reset()
while True:
action = client.get_action(eid, obs)
print(action)
obs, reward, done, info = env.step(action)
client.log_returns(eid, reward, info=info)
if done:
client.end_episode(eid, obs)
obs = env.reset()
eid = client.start_episode(training_enabled=False)
Interestingly, the client receives exactly one action and then the server interrupts the connection. The output of the server window is
Traceback (most recent call last):
File "issue_serving_server.py", line 71, in <module>
ppo.train()
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer.py", line 497, in train
raise e
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer.py", line 483, in train
result = Trainable.train(self)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/tune/trainable.py", line 254, in train
result = self._train()
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer_template.py", line 133, in _train
fetches = self.optimizer.step()
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/optimizers/sync_samples_optimizer.py", line 62, in step
samples.append(self.workers.local_worker().sample())
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/rollout_worker.py", line 488, in sample
batches = [self.input_reader.next()]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 52, in next
batches = [self.get_data()]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 95, in get_data
item = next(self.rollout_provider)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 315, in _env_runner
soft_horizon, no_done_at_end)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 461, in _process_observations
episode.batch_builder.postprocess_batch_so_far(episode)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sample_batch_builder.py", line 152, in postprocess_batch_so_far
pre_batch, other_batches, episode)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/policy/torch_policy_template.py", line 109, in postprocess_trajectory
episode)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/ppo/ppo_tf_policy.py", line 191, in postprocess_ppo_gae
use_gae=policy.config["use_gae"])
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/postprocessing.py", line 45, in compute_advantages
traj[key] = np.stack(rollout[key])
File "<__array_function__ internals>", line 6, in stack
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/shape_base.py", line 420, in stack
arrays = [asanyarray(arr) for arr in arrays]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/shape_base.py", line 420, in <listcomp>
arrays = [asanyarray(arr) for arr in arrays]
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/_asarray.py", line 138, in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/torch/tensor.py", line 486, in __array__
return self.numpy()
TypeError: can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.
For reference, this is the TensorFlow model issue_model_tf.py where the issue does not occur:
from ray import tune
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils import try_import_tf
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
tf = try_import_tf()
class AdaptedVisionNetwork(TFModelV2):
"""Custom model for policy gradient algorithms."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
super(AdaptedVisionNetwork, self).__init__(obs_space, action_space,
num_outputs, model_config, name)
def dense(inp, channels):
d = tf.keras.layers.Dense(
channels,
kernel_initializer=normc_initializer(1.0))(inp)
r = tf.keras.layers.Dropout(0.2)(d)
return tf.keras.layers.Activation("relu")(r)
def conv(inp, size, strides, channels):
c = tf.keras.layers.Conv2D(
filters=channels,
kernel_size=size,
strides=strides,
padding="same",
kernel_initializer=normc_initializer(1.0))(inp)
bn = tf.keras.layers.BatchNormalization()(c)
return tf.keras.layers.Activation("relu")(bn)
self.inputs = tf.keras.layers.Input(shape=(42, 42, 2), name="observations")
s = conv(self.inputs, 8, 4, 16)
s = conv(s, 4, 2, 32)
f = tf.keras.layers.Flatten()(s)
d = dense(f, 256)
layer_out = tf.keras.layers.Dense(
num_outputs,
name="my_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(d)
value_out = tf.keras.layers.Dense(
1,
name="value_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(d)
self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])
self.register_variables(self.base_model.variables)
def forward(self, input_dict, state, seq_lens):
model_out, self._value_out = self.base_model(input_dict["obs"]["states"][0]) # just for demonstration purposes
return model_out, state
def value_function(self):
return tf.reshape(self._value_out, [-1])
- I have verified my script runs in a clean environment and reproduces the issue.
- I have verified the issue also occurs with the latest wheels.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 18 (14 by maintainers)
The above changes were done in PR #7445 and have not been merged into master yet! https://github.com/ray-project/ray/pull/7445
Confirmed on my setup; I built Ray ‘recently’ (March 3, 2020) at
/ray:Python package versions:
Configuration/setup:
Ray calls:
Ray errors: