ray: [rllib] can't convert CUDA tensor to numpy for multi-agent serving in torch

What is the problem?

  • Ray version 0.9.0.dev0 (latest wheel)
  • TensorFlow version 2.1.0 (GPU)
  • PyTorch version 1.4.0
  • Python version 3.6.9
  • Linux 5.3.0-28-generic 30~18.04.1-Ubuntu SMP x86_64 x86_64 x86_64 GNU/Linux

I am using a multi-agent setup with PPO and PyTorch. I set up a basic environment and now want to run serving in this environment. This works fine with TensorFlow, but when using PyTorch the exception can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first is thrown, which seems to originate from the Torch PPO Policy implementation.

Reproduction (REQUIRED)

Mock environment issue_world.py:

import numpy as np
import gym
from gym import spaces
from gym.utils import seeding, EzPickle

from ray.rllib.env.multi_agent_env import MultiAgentEnv

class World(MultiAgentEnv, EzPickle):
    def __init__(self, env_config):
        EzPickle.__init__(self)
        self.seed()
        self.cfg = env_config
        self.observation_space = spaces.Dict({
            'id': spaces.Box(0, self.cfg['n_agents'], shape=(1,), dtype=np.int),
            'states': spaces.Dict({
                i: spaces.Dict({
                    'map': spaces.Box(0, 2, shape=(42, 42, 2), dtype=np.float32),
                    'pos': spaces.Box(low=np.array([0,0]), high=np.array([42,42]), dtype=np.int)
                }) for i in range(self.cfg['n_agents'])
            }),
        })
        self.action_space = spaces.Discrete(5)
        self.reset()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self):
        self.agents = {f'a_{i}': np.array([0, 0]) for i in range(self.cfg['n_agents'])}
        self.dones = {key: False for key in self.agents.keys()}
        return self.step({key: 0 for key in self.agents.keys()})[0]

    def step(self, actions):
        states, rewards = {}, {}
        for key, action in actions.items():
            state = np.zeros((42, 42, 2))
            reward = 0
            if action == 1:
                self.agents[key][0] += 1
                reward = 1
            states[key] = state
            rewards[key] = reward
            if self.agents[key][0] > 10:
                self.dones[key] = True
        self.dones['__all__'] = any(self.dones.values())

        get_key_index = lambda key: int(list(self.agents.keys()).index(key))
        all_states = {get_key_index(key): {'map': np.zeros((42, 42, 2)), 'pos': self.agents[key]} for key in self.agents.keys()}
        final_states = {key: {'id': np.array([get_key_index(key)]), 'states': all_states} for key in actions.keys()}
        return final_states, rewards, self.dones, {}

    def render(self, mode='human'):
        pass

Custom Torch model issue_model.py:

from ray.rllib.utils import try_import_torch
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.misc import normc_initializer, valid_padding, SlimConv2d, SlimFC
from ray.rllib.utils.annotations import override

import numpy as np

torch, nn = try_import_torch()

class AdaptedVisionNetwork(TorchModelV2, nn.Module):
    """Generic vision network."""

    def __init__(self, obs_space, action_space, num_outputs, model_config, name):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
        nn.Module.__init__(self)

        self.cfg = model_config['custom_options']
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        filters = [[16, [8, 8], 4], [32, [4, 4], 2], [32, [4, 4], 2]]
        layers = []
        (w, h, in_channels) = (42, 42, 2)
        in_size = [w, h]
        for out_channels, kernel, stride in filters[:-1]:
            padding, out_size = valid_padding(in_size, kernel, [stride, stride])
            layers.append(SlimConv2d(in_channels, out_channels, kernel, stride, padding))
            in_channels = out_channels
            in_size = out_size

        out_channels, kernel, stride = filters[-1]
        layers.append(SlimConv2d(in_channels, out_channels, kernel, stride, None))
        layers.append(nn.Flatten(1, -1))
        self._convs = nn.Sequential(*layers).to(self.device)

        self._logits = SlimFC(
            128, num_outputs, initializer=nn.init.xavier_uniform_).to(self.device)
        self._value_branch = SlimFC(
            128, 1, initializer=normc_initializer()).to(self.device)
        self._cur_value = None

    @override(TorchModelV2)
    def forward(self, input_dict, state, seq_lens):
        featureMap = self._hidden_layers(input_dict["obs"]['states'][0]['map']) # just for demo purposes
        logits = self._logits(featureMap)
        self._cur_value = self._value_branch(featureMap).squeeze(1)
        return logits, state

    @override(TorchModelV2)
    def value_function(self):
        assert self._cur_value is not None, "must call forward() first"
        return self._cur_value

    def _hidden_layers(self, obs):
        res = self._convs(obs.permute(0, 3, 1, 2))  # switch to channel-major
        return res

Model training script train.py (just run for one iteration):

import numpy as np
import gym
from gym import spaces
from issue_world import World
import matplotlib.pyplot as plt
import ray
from ray import tune
from ray.rllib.utils import try_import_torch
from ray.rllib.models import ModelCatalog

import numpy as np

from issue_model import AdaptedVisionNetwork
from issue_model_tf import AdaptedVisionNetwork as AdaptedVisionNetworkTF
        
if __name__ == '__main__':
    ray.init()
    ModelCatalog.register_custom_model("vis_torch", AdaptedVisionNetwork)
    ModelCatalog.register_custom_model("vis_tf", AdaptedVisionNetworkTF)
    
    tune.run(
        "PPO",
        checkpoint_freq=1,
        config={
            "use_pytorch": True,
            "env": World,
            "num_sgd_iter": 10,
            "num_workers": 1,
            "num_envs_per_worker": 1,
            "num_gpus": 1,
            "model": {"custom_model": "vis_torch"},
            #"model": {"custom_model": "vis_tf"},
            "env_config": {
                'world_shape': (42, 42),
                'n_agents': 3
            }
        }
    )

Serving script serving_server.py based on this (after the first checkpoint was created in the previous script put the correct checkpoint path in CHECKPOINT_FILE):

import os
from gym import spaces
import numpy as np

import ray
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.env.external_multi_agent_env import ExternalMultiAgentEnv
from ray.rllib.utils.policy_server import PolicyServer
from ray.tune.registry import register_env

from issue_model import AdaptedVisionNetwork
from issue_model_tf import AdaptedVisionNetwork as AdaptedVisionNetworkTF
from ray.rllib.models import ModelCatalog

SERVER_ADDRESS = "localhost"
SERVER_PORT = 9900
CHECKPOINT_FILE = "path/to/checkpoint"

class WorldServing(ExternalMultiAgentEnv):
    def __init__(self, config):
        self.cfg = config
        ExternalMultiAgentEnv.__init__(
            self, spaces.Discrete(5),
            spaces.Dict({
                'id': spaces.Box(0, self.cfg['n_agents'], shape=(1,), dtype=np.int),
                'states': spaces.Dict({
                    i: spaces.Dict({
                        'map': spaces.Box(0, 2, shape=(42, 42, 2), dtype=np.float32),
                        'pos': spaces.Box(low=np.array([0,0]), high=np.array([42,42]), dtype=np.int)
                    }) for i in range(self.cfg['n_agents'])
                }),
            }))

    def run(self):
        print("Starting policy server at {}:{}".format(SERVER_ADDRESS,
                                                       SERVER_PORT))
        server = PolicyServer(self, SERVER_ADDRESS, SERVER_PORT)
        server.serve_forever()

if __name__ == "__main__":
    ray.init()
    register_env("srv", lambda config: WorldServing(config))
    ModelCatalog.register_custom_model("vis_torch", AdaptedVisionNetwork)
    ModelCatalog.register_custom_model("vis_tf", AdaptedVisionNetworkTF)
    
    ppo = PPOTrainer(
        env="srv",
        config={
            "use_pytorch": True,
            "num_workers": 0,
            "timesteps_per_iteration": 200,
            "env_config": {
                'world_shape': (42, 42),
                'n_agents': 3
            },
            "model": {"custom_model": "vis_torch"}
            #"model": {"custom_model": "vis_tf"}
        })

    ppo.restore(CHECKPOINT_FILE)
    print("restored")

    while True:
        ppo.train()

And the client script serving_client.py based on this (run after the last script has successfully started):

import gym
from issue_world import World

from ray.rllib.utils.policy_client import PolicyClient

if __name__ == "__main__":
    env = World({
        'world_shape': (42, 42),
        'n_agents': 3
    })
    
    client = PolicyClient("http://localhost:9900")

    eid = client.start_episode(training_enabled=False)
    obs = env.reset()
    while True:
        action = client.get_action(eid, obs)
        print(action)
        obs, reward, done, info = env.step(action)
        client.log_returns(eid, reward, info=info)
        if done:
            client.end_episode(eid, obs)
            obs = env.reset()
            eid = client.start_episode(training_enabled=False)

Interestingly, the client receives exactly one action and then the server interrupts the connection. The output of the server window is

Traceback (most recent call last):
  File "issue_serving_server.py", line 71, in <module>
    ppo.train()
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer.py", line 497, in train
    raise e
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer.py", line 483, in train
    result = Trainable.train(self)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/tune/trainable.py", line 254, in train
    result = self._train()
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/trainer_template.py", line 133, in _train
    fetches = self.optimizer.step()
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/optimizers/sync_samples_optimizer.py", line 62, in step
    samples.append(self.workers.local_worker().sample())
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/rollout_worker.py", line 488, in sample
    batches = [self.input_reader.next()]
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 52, in next
    batches = [self.get_data()]
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 95, in get_data
    item = next(self.rollout_provider)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 315, in _env_runner
    soft_horizon, no_done_at_end)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py", line 461, in _process_observations
    episode.batch_builder.postprocess_batch_so_far(episode)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/sample_batch_builder.py", line 152, in postprocess_batch_so_far
    pre_batch, other_batches, episode)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/policy/torch_policy_template.py", line 109, in postprocess_trajectory
    episode)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/agents/ppo/ppo_tf_policy.py", line 191, in postprocess_ppo_gae
    use_gae=policy.config["use_gae"])
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/ray/rllib/evaluation/postprocessing.py", line 45, in compute_advantages
    traj[key] = np.stack(rollout[key])
  File "<__array_function__ internals>", line 6, in stack
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/shape_base.py", line 420, in stack
    arrays = [asanyarray(arr) for arr in arrays]
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/shape_base.py", line 420, in <listcomp>
    arrays = [asanyarray(arr) for arr in arrays]
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/numpy/core/_asarray.py", line 138, in asanyarray
    return array(a, dtype, copy=False, order=order, subok=True)
  File "/auto/homes/jb2270/master-project/venv_ray_master/lib/python3.6/site-packages/torch/tensor.py", line 486, in __array__
    return self.numpy()
TypeError: can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.

For reference, this is the TensorFlow model issue_model_tf.py where the issue does not occur:

from ray import tune
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils import try_import_tf
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel

tf = try_import_tf()

class AdaptedVisionNetwork(TFModelV2):
    """Custom model for policy gradient algorithms."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        super(AdaptedVisionNetwork, self).__init__(obs_space, action_space,
                                           num_outputs, model_config, name)
        
        def dense(inp, channels):
            d = tf.keras.layers.Dense(
            	channels,
            	kernel_initializer=normc_initializer(1.0))(inp)
            r = tf.keras.layers.Dropout(0.2)(d)
            return tf.keras.layers.Activation("relu")(r)
        
        def conv(inp, size, strides, channels):
            c = tf.keras.layers.Conv2D(
            	filters=channels,
            	kernel_size=size,
            	strides=strides,
            	padding="same",
            	kernel_initializer=normc_initializer(1.0))(inp)
            bn = tf.keras.layers.BatchNormalization()(c)
            return tf.keras.layers.Activation("relu")(bn)
            
        self.inputs = tf.keras.layers.Input(shape=(42, 42, 2), name="observations")
        s = conv(self.inputs, 8, 4, 16)
        s = conv(s, 4, 2, 32)
        f = tf.keras.layers.Flatten()(s)
        d = dense(f, 256)
        
        layer_out = tf.keras.layers.Dense(
            num_outputs,
            name="my_out",
            activation=None,
            kernel_initializer=normc_initializer(0.01))(d)
        value_out = tf.keras.layers.Dense(
            1,
            name="value_out",
            activation=None,
            kernel_initializer=normc_initializer(0.01))(d)
        
        self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])
        self.register_variables(self.base_model.variables)

    def forward(self, input_dict, state, seq_lens):
        model_out, self._value_out = self.base_model(input_dict["obs"]["states"][0]) # just for demonstration purposes
        return model_out, state

    def value_function(self):
        return tf.reshape(self._value_out, [-1])
  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 18 (14 by maintainers)

Most upvoted comments

The above changes were done in PR #7445 and have not been merged into master yet! https://github.com/ray-project/ray/pull/7445

Confirmed on my setup; I built Ray ‘recently’ (March 3, 2020) at /ray :

user@hostname$ uname -a
Linux hostname 5.3.0-40-generic #32~18.04.1-Ubuntu SMP Mon Feb 3 14:05:59 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
user@hostname$ python --version
Python 3.7.4

Python package versions:

from importlib import import_module
import yaml
version_lookup = {'torch': '', 'tensorflow': '', 'numpy': '', 'ray': ''}
for module in version_lookup.keys():
    imprt = import_module(module)
    version_lookup[module] = imprt.__version__
print(yaml.dump(version_lookup))
numpy: 1.16.6
ray: 0.9.0.dev0
tensorflow: 1.14.1
torch: 1.4.0

Configuration/setup:

import ray
from ray.rllib.agents.ppo.ddppo import DDPPOTrainer, DEFAULT_CONFIG
import yaml
config = DEFAULT_CONFIG.copy()
with open('/ray/rllib/tuned_examples/atari-ddppo.yaml','r') as f:
    tuned_example = yaml.safe_load(f)
for key, val in tuned_example['atari-ddppo']['config'].items():
    config[key] = val
config['num_workers'] = 1

Ray calls:

ray.init(address="hostIP:6379", redis_password='redis_password', ignore_reinit_error=True)
agent = DDPPOTrainer(config, 'BreakoutNoFrameskip-v4')
agent.train()

Ray errors:

2020-03-05 08:57:26,745	INFO trainer.py:423 -- Tip: set 'eager': true or the --eager flag to enable TensorFlow eager execution
2020-03-05 08:57:26,759	INFO trainer.py:580 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
---------------------------------------------------------------------------
RayTaskError(TypeError)                   Traceback (most recent call last)
<ipython-input-2-b53e91f7fd27> in <module>
      1 ray.init(address="hostIP:6379", redis_password='redis_password', ignore_reinit_error=True)
      2 agent = DDPPOTrainer(config, 'BreakoutNoFrameskip-v4')
----> 3 agent.train()

/ray/python/ray/rllib/agents/trainer.py in train(self)
    495                         "continue training without the failed worker, set "
    496                         "`'ignore_worker_failures': True`.")
--> 497                     raise e
    498             except Exception as e:
    499                 time.sleep(0.5)  # allow logs messages to propagate

/ray/python/ray/rllib/agents/trainer.py in train(self)
    484         for _ in range(1 + MAX_WORKER_FAILURE_RETRIES):
    485             try:
--> 486                 result = Trainable.train(self)
    487             except RayError as e:
    488                 if self.config["ignore_worker_failures"]:

/ray/python/ray/tune/trainable.py in train(self)
    252         """
    253         start = time.time()
--> 254         result = self._train()
    255         assert isinstance(result, dict), "_train() needs to return a dict."
    256 

/ray/python/ray/rllib/agents/trainer_template.py in _train(self)
    137             start = time.time()
    138             while True:
--> 139                 fetches = self.optimizer.step()
    140                 if after_optimizer_step:
    141                     after_optimizer_step(self, fetches)

/ray/python/ray/rllib/optimizers/torch_distributed_data_parallel_optimizer.py in step(self)
     64                     self.expected_batch_size, self.num_sgd_iter,
     65                     self.sgd_minibatch_size, self.standardize_fields)
---> 66                 for w in self.workers.remote_workers()
     67             ])
     68         for info, count in results:

/ray/python/ray/worker.py in get(object_ids, timeout)
   1502                     worker.core_worker.dump_object_store_memory_usage()
   1503                 if isinstance(value, RayTaskError):
-> 1504                     raise value.as_instanceof_cause()
   1505                 else:
   1506                     raise value

RayTaskError(TypeError): ray::RolloutWorker.sample_and_learn() (pid=4004, ip=IP)
  File "python/ray/_raylet.pyx", line 448, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 426, in ray._raylet.execute_task.function_executor
  File "/ray/python/ray/rllib/evaluation/rollout_worker.py", line 652, in sample_and_learn
    batch = self.sample()
  File "/ray/python/ray/rllib/evaluation/rollout_worker.py", line 489, in sample
    batches = [self.input_reader.next()]
  File "/ray/python/ray/rllib/evaluation/sampler.py", line 53, in next
    batches = [self.get_data()]
  File "/ray/python/ray/rllib/evaluation/sampler.py", line 96, in get_data
    item = next(self.rollout_provider)
  File "/ray/python/ray/rllib/evaluation/sampler.py", line 316, in _env_runner
    soft_horizon, no_done_at_end)
  File "/ray/python/ray/rllib/evaluation/sampler.py", line 462, in _process_observations
    episode.batch_builder.postprocess_batch_so_far(episode)
  File "/ray/python/ray/rllib/evaluation/sample_batch_builder.py", line 153, in postprocess_batch_so_far
    pre_batch, other_batches, episode)
  File "/ray/python/ray/rllib/policy/torch_policy_template.py", line 109, in postprocess_trajectory
    episode)
  File "/ray/python/ray/rllib/agents/ppo/ppo_tf_policy.py", line 191, in postprocess_ppo_gae
    use_gae=policy.config["use_gae"])
  File "/ray/python/ray/rllib/evaluation/postprocessing.py", line 45, in compute_advantages
    traj[key] = np.stack(rollout[key])
  File "/usr/local/lib/python3.7/site-packages/numpy/core/shape_base.py", line 410, in stack
    arrays = [asanyarray(arr) for arr in arrays]
  File "/usr/local/lib/python3.7/site-packages/numpy/core/shape_base.py", line 410, in <listcomp>
    arrays = [asanyarray(arr) for arr in arrays]
  File "/usr/local/lib/python3.7/site-packages/numpy/core/numeric.py", line 591, in asanyarray
    return array(a, dtype, copy=False, order=order, subok=True)
  File "/usr/local/lib/python3.7/site-packages/torch/tensor.py", line 486, in __array__
    return self.numpy()
TypeError: can't convert CUDA tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.