pytorch-lightning: DDP with 2 GPUs doesn't give same results as 1 GPU with the same effective batch size

🐛 Bug

Training a network with DDP on 2 GPUs with a batch size of N/2 should give the same result a training on a single GPU with batch size N. I tested this using the Cifar VAE from lightning-bolts. The configurations I tried are single GPU with the default batch size 256, Data Parallel on 2 GPUs (each GPU gets then a batch of 128) and DDP on 2GPUs (manually setting batch size to 128). Although all three experiments have the same effective batch size, DDP doesn’t show the same performance as the single GPU training and DP, specially with respect to the kl loss. The experiments are with the default setting, without fancy stuff like 16bit precision or sharded training. I used a VAE network to analyze this probem as it is close in spirit to the networks I am using in my current research. @SeanNaren @awaelchli

image

To Reproduce

import os
from argparse import ArgumentParser

import pytorch_lightning as pl
import torch
from torch import nn as nn
from torch.nn import functional as F

from pl_bolts import _HTTPS_AWS_HUB
from pl_bolts.models.autoencoders.components import (
    resnet18_decoder,
    resnet18_encoder,
    resnet50_decoder,
    resnet50_encoder,
)


class VAE(pl.LightningModule):
    """
    Standard VAE with Gaussian Prior and approx posterior.

    Model is available pretrained on different datasets:

    Example::

        # not pretrained
        vae = VAE()

        # pretrained on cifar10
        vae = VAE(input_height=32).from_pretrained('cifar10-resnet18')

        # pretrained on stl10
        vae = VAE(input_height=32).from_pretrained('stl10-resnet18')
    """

    pretrained_urls = {
        'cifar10-resnet18': os.path.join(_HTTPS_AWS_HUB, 'vae/vae-cifar10/checkpoints/epoch%3D89.ckpt'),
        'stl10-resnet18': os.path.join(_HTTPS_AWS_HUB, 'vae/vae-stl10/checkpoints/epoch%3D89.ckpt'),
    }

    def __init__(
        self,
        input_height: int,
        enc_type: str = 'resnet18',
        first_conv: bool = False,
        maxpool1: bool = False,
        enc_out_dim: int = 512,
        kl_coeff: float = 0.1,
        latent_dim: int = 256,
        lr: float = 1e-4,
        **kwargs
    ):
        """
        Args:
            input_height: height of the images
            enc_type: option between resnet18 or resnet50
            first_conv: use standard kernel_size 7, stride 2 at start or
                replace it with kernel_size 3, stride 1 conv
            maxpool1: use standard maxpool to reduce spatial dim of feat by a factor of 2
            enc_out_dim: set according to the out_channel count of
                encoder used (512 for resnet18, 2048 for resnet50)
            kl_coeff: coefficient for kl term of the loss
            latent_dim: dim of latent space
            lr: learning rate for Adam
        """

        super(VAE, self).__init__()

        self.save_hyperparameters()

        self.lr = lr
        self.kl_coeff = kl_coeff
        self.enc_out_dim = enc_out_dim
        self.latent_dim = latent_dim
        self.input_height = input_height

        valid_encoders = {
            'resnet18': {
                'enc': resnet18_encoder,
                'dec': resnet18_decoder,
            },
            'resnet50': {
                'enc': resnet50_encoder,
                'dec': resnet50_decoder,
            },
        }

        if enc_type not in valid_encoders:
            self.encoder = resnet18_encoder(first_conv, maxpool1)
            self.decoder = resnet18_decoder(self.latent_dim, self.input_height, first_conv, maxpool1)
        else:
            self.encoder = valid_encoders[enc_type]['enc'](first_conv, maxpool1)
            self.decoder = valid_encoders[enc_type]['dec'](self.latent_dim, self.input_height, first_conv, maxpool1)

        self.fc_mu = nn.Linear(self.enc_out_dim, self.latent_dim)
        self.fc_var = nn.Linear(self.enc_out_dim, self.latent_dim)

    @staticmethod
    def pretrained_weights_available():
        return list(VAE.pretrained_urls.keys())

    def from_pretrained(self, checkpoint_name):
        if checkpoint_name not in VAE.pretrained_urls:
            raise KeyError(str(checkpoint_name) + ' not present in pretrained weights.')

        return self.load_from_checkpoint(VAE.pretrained_urls[checkpoint_name], strict=False)

    def forward(self, x):
        x = self.encoder(x)
        mu = self.fc_mu(x)
        log_var = self.fc_var(x)
        p, q, z = self.sample(mu, log_var)
        return self.decoder(z)

    def _run_step(self, x):
        x = self.encoder(x)
        mu = self.fc_mu(x)
        log_var = self.fc_var(x)
        p, q, z = self.sample(mu, log_var)
        return z, self.decoder(z), p, q

    def sample(self, mu, log_var):
        std = torch.exp(log_var / 2)
        p = torch.distributions.Normal(torch.zeros_like(mu), torch.ones_like(std))
        q = torch.distributions.Normal(mu, std)
        z = q.rsample()
        return p, q, z

    def step(self, batch, batch_idx):
        x, y = batch
        z, x_hat, p, q = self._run_step(x)

        recon_loss = F.mse_loss(x_hat, x, reduction='mean')

        log_qz = q.log_prob(z)
        log_pz = p.log_prob(z)

        kl = log_qz - log_pz
        kl = kl.mean()
        kl *= self.kl_coeff

        loss = kl + recon_loss

        logs = {
            "recon_loss": recon_loss,
            "kl": kl,
            "loss": loss,
        }
        return loss, logs

    def training_step(self, batch, batch_idx):
        loss, logs = self.step(batch, batch_idx)
        self.log_dict({f"train_{k}": v for k, v in logs.items()}, on_step=True, on_epoch=False)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, logs = self.step(batch, batch_idx)
        self.log_dict({f"val_{k}": v for k, v in logs.items()})
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

    @staticmethod
    def add_model_specific_args(parent_parser):
        parser = ArgumentParser(parents=[parent_parser], add_help=False)

        parser.add_argument("--enc_type", type=str, default='resnet18', help="resnet18/resnet50")
        parser.add_argument("--first_conv", action='store_true')
        parser.add_argument("--maxpool1", action='store_true')
        parser.add_argument("--lr", type=float, default=1e-4)

        parser.add_argument(
            "--enc_out_dim",
            type=int,
            default=512,
            help="512 for resnet18, 2048 for bigger resnets, adjust for wider resnets"
        )
        parser.add_argument("--kl_coeff", type=float, default=0.1)
        parser.add_argument("--latent_dim", type=int, default=256)
        parser.add_argument("--batch_size", type=int, default=256)
        parser.add_argument("--num_workers", type=int, default=8)
        parser.add_argument("--data_dir", type=str, default=".")

        return parser


def cli_main(args=None):
    from pl_bolts.datamodules import CIFAR10DataModule, ImagenetDataModule, STL10DataModule

    pl.seed_everything(1234)

    parser = ArgumentParser()
    parser.add_argument("--dataset", default="cifar10", type=str, choices=["cifar10", "stl10", "imagenet"])
    script_args, _ = parser.parse_known_args(args)

    if script_args.dataset == "cifar10":
        dm_cls = CIFAR10DataModule
    elif script_args.dataset == "stl10":
        dm_cls = STL10DataModule
    elif script_args.dataset == "imagenet":
        dm_cls = ImagenetDataModule
    else:
        raise ValueError(f"undefined dataset {script_args.dataset}")

    parser = VAE.add_model_specific_args(parser)
    parser = pl.Trainer.add_argparse_args(parser)
    args = parser.parse_args(args)

    dm = dm_cls.from_argparse_args(args)
    args.input_height = dm.size()[-1]

    if args.max_steps == -1:
        args.max_steps = None

    model = VAE(**vars(args))

    trainer = pl.Trainer.from_argparse_args(args)
    trainer.fit(model, datamodule=dm)
    return dm, model, trainer


if __name__ == "__main__":
    dm, model, trainer = cli_main()

  python vae.py --dataset=cifar10 --batch_size=256 # single gpu training
  python vae.py --dataset=cifar10 --batch_size=128 --gpus=2 --accelerator=ddp
  python vae.py --dataset=cifar10 --batch_size=256 --gpus=2 --accelerator=dp

Expected behavior

SInce the effective batch size and the rest of hyperparameters are the same, the training should be very close.

Environment

  • CUDA: - GPU: - TITAN X (Pascal) - TITAN X (Pascal) - TITAN X (Pascal) - TITAN X (Pascal) - available: True - version: 10.2
  • Packages: - numpy: 1.20.1 - pyTorch_debug: False - pyTorch_version: 1.8.0 - pytorch-lightning: 1.2.6 - tqdm: 4.56.0
  • System: - OS: Linux - architecture: - 64bit - ELF - processor: x86_64 - python: 3.8.7 - version: #143-Ubuntu SMP Tue Mar 16 01:30:17 UTC 2021

cc @tchaton @rohitgr7 @akihironitta @justusschock @kaushikb11 @awaelchli

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 8
  • Comments: 41 (28 by maintainers)

Most upvoted comments

active?

Yes, I think your explanation checks out when no extra steps are taken to diversify the augmentations, in general. In Lightning, if you use seed_everything(workers=True), we take care of this by seeding every dataloader worker differently, across all ranks (across multi-node too), and this will then lead to diversified augmentations, provided that they are applied in the dataset/worker and num_workers > 0. In other words, the seed in each worker is different but deterministically related to the main seed. The default is seed_everything(workers=False) however, so this needs to be turned on by the user explicitly.

I don’t recommend setting a different seed per rank though unless the user knows what they are doing and what implications this has (under Lightning and/or PyTorch). This is difficult to reason about and difficult to debug. I strongly recommend to let the distributed sampler fully handle the shuffling to avoid duplicated or missing samples across the ranks.

Two other thoughts regarding the discussion on the thread here:

  1. Note that if you go from 1 to 2 GPUs while keeping the effective batch size the same (and the seed), you can’t expect the results to be exactly the same numerically because the order in which the samples get batched and hence used to update the model weights will be different. When we talk about achieving “same results” we mean stochastically, after convergence. I hope this makes sense. I wasn’t sure maybe this was already mentioned.
  2. Consider turning on sync-batch norm, if it applies to your network. If there are other terms in your network or loss function that take batch statistics into account, one has to account for that when switching to ddp.

I think one issue with DDP having the same seed across different processes for each GPU is that there is a lower diversity of randomness compared to DP.

For example, say that your batch size is 2 and the number of GPUs is two and you’re randomly rotating images by a random angle. Then using DP, pytorch will rotate each of the two images with a different angle. However, if you’re using DDP, each GPU will get one sample and each image will be rotated the same way across GPUs because each process gets the same seed. This doesn’t just apply to rotations; it applies to all operations that have some sort of randomness (like dropout).

The more GPUs you use while keeping the effective batch size the same, the lower the diversity of randomness. This lower diversity in randomness can lead to poorer results, which may explain the difference between DDP and DP even when the effective batch size is the same.

The solution to this seems to be just set different seeds for each GPU process and this is no longer an issue. However, one big issue to this solution is that this will make batch loading inconsistent. For example, if each GPU process has the same seed, then each batch loader of each GPU will load indices [5, 9, 4, 7]. And then the DistributedSampler from pytorch will subselect that batch based on the GPU rank (so GPU 0 will get [5, 4] and GPU 1 will get [9, 7]). If you set a different seed for each GPU process, then each GPU will load in a different set of indices. So for example, GPU process 0 will load in [3, 8, 9, 6] and GPU process 1 will load in [5, 3, 4, 8].

@awaelchli Does this look right?

Hey @SeanNaren,

Any progress there ?

After speaking to @awaelchli @tchaton we should introduce some form of correctness test between 2 GPUs vs 1 GPU for model loss/convergence.

This may also allow us to easily test correctness for our training system, and showcase any changes needed to maintain parity.

Hey, if there is batch norm hidden in there somewhere or any other operation that takes batch statistics into account for backprop, this could explain it.

@tchaton I was using 32-bit precision [Edit: not 16] all the time. I don’t have a clear idea what caused the problem. After fixing the Torchvision version, I don’t see the problem anymore. I’m using Torchvision for performing non-maximum suppression before scoring, so it could be related to that. Sorry that I cannot help more.

ah okey, yes please let me know and feel free to reopen the issue.

You may get some inspiration from the discussion #3706.