ray: ValueError: Message ray.rpc.DataRequest exceeds maximum protobuf size of 2GB

What is the problem?

Hello Training with large size data(2GB) results in the following error. There is no problem with small size of training data(<100Mb)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2021-09-06 17:15:49,920 [ERROR] Exception serializing message!
Traceback (most recent call last):
  File "/opt/conda/envs/py38/lib/python3.8/site-packages/grpc/_common.py", line 86, in _transform
    return transformer(message)
ValueError: Message ray.rpc.DataRequest exceeds maximum protobuf size of 2GB: 2339503268
2021-09-06 17:15:49,925 [ERROR] Got Error from data channel -- shutting down:
Traceback (most recent call last):
  File "/opt/conda/envs/py38/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 69, in _data_main
    for response in resp_stream:
  File "/opt/conda/envs/py38/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/opt/conda/envs/py38/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "Exception serializing request!"
        debug_error_string = "None"
>

Ray version and other system information (Python version, TensorFlow version, OS):

Reproduction (REQUIRED)

Please provide a short code snippet (less than 50 lines if possible) that can be copy-pasted to reproduce the issue. The snippet should have no external library dependencies (i.e., use fake or mock data / environments):

If the code snippet cannot be run by itself, the issue will be closed with “needs-repro-script”.

  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 42 (23 by maintainers)

Most upvoted comments

@ckw017 @AmeerHajAli can we revisit this issue? The same problem is happening again in https://github.com/ray-project/ray/issues/20869.

It would be great if we can prioritize this and have a better error message.

Chatted with Ameer. This should be doable. We will start looking at this 1 week from now.

Can we make this change as part of making ray.put() async? How complex is that?

This must be the client issue where the returned data message size limit is 2GB. We should probably use streaming to get the return value from the ray cluster. cc @AmeerHajAli @ijrsvt

@scv119 Thank you for reply ray version is 1.5.1 ray-lightning version is 0.11 pytorch-lightning version is 1.4.5 pytorch version is 1.7.0

inside the trainset object, 2GB of learning data is placed in memory and used. If i reduce the number of trainset data, train well in ray cluster. The size of the train set is over 2GB. Is this a problem? Is it impossible for ray clusters to put over 2GB of training data in memory and use?

  • dataset.py
from torch.utils.data import Dataset
import json
class Trainset(Dataset):
      def __init__(self):
          self.total_data = []
          with open('trainset.json', 'r') as f:
                  for line in f:
                        self.total_data.append(self._preprocess_trainset(line))
      def __getitem__(self, idx):
          return self.total_data[idx]

      def __len__(self):
          return len(self.total_data)

     
   

  • train.py
    ray_cluster_connection = f'ray host'
    ray.client(ray_cluster_connection).namespace('test').connect()
    print(ray.available_resources())

    train_loader = DataLoader(trainset,
                        batch_size=cfg.batch_size,
                        num_workers=cfg.num_workers)
    valid_loader = DataLoader(validset,
                        batch_size=cfg.batch_size,
                        num_workers=cfg.num_workers)

    plugin = RayShardedPlugin(num_workers=2, num_cpus_per_worker=1, use_gpu=True)

    logging.info("training start")
    trainer = pl.Trainer(
        logger=[tb_logger],
        gpus=1,
        max_steps=cfg.max_steps,
        val_check_interval=cfg.val_check_interval,
        log_every_n_steps=cfg.log_interval,
        precision=16 if cfg.train.use_fp16 else 32,
        flush_logs_every_n_steps=cfg.flush_logs_every_n_steps,
        progress_bar_refresh_rate=cfg.progress_bar_refresh_rate,
        plugins=[plugin]
    )
    trainer.fit(model, train_loader, valid_loader)