pytorch-lightning: DDP (multi GPU) Iterable Dataset is not working as expected ?

Bug description

Hi,

I am currently testing with IterableDataset and DDP.

Total Examples - 10000 Batch_size - 32 NUM_GPUS - 2 .

While using IterableDataset , ideally with 2 GPUS, we are supposed to run 157 steps (10000 / 32 batch / 2 gpus) in one epoch. But, instead of that, it is running for 314 steps (10000 / 32 batch) .

This issue is only with IterableDataset. When I am using normal Dataset (map dataset) from torch things are good and fine. Is there any reason for this particular behaviour ?

How to reproduce the bug

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import lightning as L
import torch
import time
from datasets import list_datasets, load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import DataLoader, Dataset


BATCH_SIZE = 32
NUM_WORKERS = 1

# Load Dataset in Memory

imdb_data = load_dataset("imdb")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_text(batch):
    return tokenizer(batch["text"], truncation=True, padding=True)

imdb_dataset = imdb_data
imdb_tokenized = imdb_dataset.map(tokenize_text, batched=True, batch_size=None)
imdb_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "label"])

def custom_iterator():
    counter = 0
    for item in imdb_tokenized['train']:
        
        inputs = {'input_ids': item['input_ids'], 'attention_mask': item['attention_mask']}
        labels = {'labels': item['label']}
        counter += 1
        yield inputs, labels


class MyIterableDataset(torch.utils.data.IterableDataset):
    def __init__(self):
        super().__init__()

    def __iter__(self):
        yield from custom_iterator()

train_dataset = MyIterableDataset()

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    persistent_workers=False
)

# Load Model
model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-uncased", num_labels=2)

# Ligntning Module
class LightningModel(L.LightningModule):
    def __init__(self, model, learning_rate=5e-5):
        super().__init__()

        self.learning_rate = learning_rate
        self.model = model

    def forward(self, input_ids, attention_mask, labels):
        return self.model(input_ids, attention_mask=attention_mask, labels=labels)
        
    def training_step(self, batch, batch_idx):
        
        inputs, labels = batch
        outputs = self(inputs["input_ids"], attention_mask=inputs["attention_mask"],
                    labels=labels["labels"])        
        self.log("train_loss", outputs["loss"])
        
        # print(" Tensor sum ", torch.sum(inputs['input_ids']))
        # print("-------------------")
        # print(3*"\n")
        
        self.log("tensor_sum", torch.sum(inputs['input_ids']))
        
        return outputs["loss"]  # this is passed to the optimizer for training

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer
    

lightning_model = LightningModel(model)

from pytorch_lightning.loggers import CSVLogger, WandbLogger

name = "train_ddp_map-iterable"
logger = CSVLogger(save_dir="logs/", name=name)
wandb_logger = WandbLogger(project="DDP_exps", name=name)

def train_model():
    
    max_epochs = 2
    
    if os.path.exists('checkpoints'):
        import shutil
        shutil.rmtree('checkpoints')
        
    trainer = L.Trainer(
        max_epochs=max_epochs,
        callbacks=None,
        accelerator="gpu",
        devices=[0, 1],
        logger=[logger, wandb_logger],
        strategy='ddp',
        enable_progress_bar=True, # Disable progress bar
        log_every_n_steps=1,
    )

    trainer.fit(model=lightning_model,
                train_dataloaders=train_loader)
    
if __name__=='__main__':
    
    start_time = time.time()
    train_model()
    end_time = time.time()
    print()
    print("Time taken to train model is {} seconds".format(end_time-start_time))

Error messages and logs


# Error messages and logs here please

Environment


#- Lightning Component (e.g. Trainer, LightningModule, LightningApp, LightningWork, LightningFlow):
#- PyTorch Lightning Version (e.g., 1.5.0):
#- Lightning App Version (e.g., 0.5.2):
#- PyTorch Version (e.g., 1.10):
#- Python version (e.g., 3.9):
#- OS (e.g., Linux):
#- CUDA/cuDNN version:
#- GPU models and configuration:
#- How you installed Lightning(`conda`, `pip`, source):
#- Running environment of LightningApp (e.g. local, cloud):

More info

No response

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 39 (10 by maintainers)

Most upvoted comments

@s4sarath let’s stay on topic.

@s4sarath @amorehead Here is a notebook that explains the difference between the map dataset and iterable dataset with several examples, using dataloader workers and also shows how it behaves across multiple processes. At the bottom, I also show the example with Lightning. I hope this helps your understanding.

Yes this is expected. Lightning can’t know how to shard the data/iterator you provide. You need to make sure your iterator returns half of the data on GPU 0 and the other half on GPU 1. You can do this for example by changing your for loop to something like this (typos expected):

for item in imdb_tokenized['train'][rank::num_gpus]:
    ...

This shards your data. The rank can be accessed for example through trainer.global_rank. If you do this, you need to make sure the iterator returns the same amount of data on each rank (e.g., drop the remainder)

Another way would be to use the DistribuedSampler inside your iterable dataset.

Thanks a lot!

@gorold It should work with deepspeed yes, but probably not with the TPU strategy.

I haven’t mentioned it in the notebook, but PyTorch is developing torchdata which will address these issues completely, as it is heavily focusing on performant iterable-style data loading together with DataLoader2. It would eliminate essentially all of the boilerplate code I show in that notebook.

Ok…maybe that’s the missing detail I needed. I’ll work on it some more!

@EvanZ I was also confused about this at first, but then figured it out. The Trainer does not need any information about the data to be instantiated. So I would recommend instantiating the Trainer first, then you can pass the trainer.world_size and trainer.global_rank to your data module without any issues. Hope this helps!

What I’m failing to understand is how in practice to pass the rank and world_size to the dataset when that is being created by my DataModule, before the Trainer is created. It seems that for this to work the Trainer is supposed to pass the rank somehow to the dataset. I can’t figure out from your example notebook how to do this. When I try to access the rank and/or world_size in my Dataset before the Trainer is created, it either freezes during runtime or says I need to use init_process_group. It would be great to see a full example in the Lightning docs how to use multi-GPU with IterableDataset to make it more clear.

Yes, these observations are all expected. This is not special behavior in Lightning, it’s just how the IterableDataset and DataLoader are working in PyTorch. In short: When using an iterable dataset, you need to take care of the sampler inside your dataset yourself, and shard/partition the data yourself across workers and devices.

Yes, I can put together an example, but it has to wait a few days until new year.

@awaelchli, I second this issue. I am also having difficulties figuring out the simplest way to enable multi-GPU and multi-dataloader worker support for IterableDatasets when using PyTorch Lightning. All the examples I have worked through so far do not seem to work when considering both of the following cases: (1) num_workers>0 and len(trainer.devices)>0.

Would it be possible to put together a simple PyTorch Lightning example of how one can structure their IterableDataset and PyTorch Lightning DataModule to support the two use cases above?

If my num_workers=2, it is looping 628 times in each GPU . Is this expected ?

No. num_workers has nothing to do with the sampling of the data.

Because, num_workers=2 is supposed to make DataLoader pipeline faster right.

Read more about workers here: https://pytorch.org/docs/stable/data.html#multi-process-data-loading

Is there any concept of steps_per_epoch in lightning. Say, epochs=10, steps_per_epoch=1000, I want each epoch to run 1000 loops max.

Trainer(limit_train_batches=1000, max_epochs=10)