reformer-pytorch: Runtime error when attempting to use data distributed parallel
Thank you for putting in the time to do this. I have a bunch of ideas for it.
I crudely ported your example training script to use the pytorch-lightning library and when I attempted to use data distributed ran into a crash, The problem may be down in the revtorch library, but I want to hand the script off to you so you can play with it while reporting it so you can take a look and decide where the issue is.
you can get the crash by supplying the --distributed flag to the script with any number of gpus
Epoch 1: 0%| | 0/1451 [00:00<?, ?batch/s]Traceback (most recent call last):
File "example/train_lightning.py", line 166, in <module>
main()
File "example/train_lightning.py", line 161, in main
trainer.fit(model)
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/trainer.py", line 687, in fit
mp.spawn(self.ddp_train, nprocs=self.num_gpus, args=(model,))
File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 171, in spawn
while not spawn_context.join():
File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 118, in join
raise Exception(msg)
Exception:
-- Process 0 terminated with the following error:
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 19, in _wrap
fn(i, *args)
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/distrib_data_parallel.py", line 331, in ddp_train
self.run_pretrain_routine(model)
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/trainer.py", line 829, in run_pretrain_routine
self.train()
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/training_loop.py", line 332, in train
self.run_training_epoch()
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/training_loop.py", line 386, in run_training_epoch
output = self.run_training_batch(batch, batch_idx)
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/training_loop.py", line 506, in run_training_batch
loss = optimizer_closure()
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/trainer/training_loop.py", line 489, in optimizer_closure
model_ref.backward(self.use_amp, closure_loss, optimizer)
File "/opt/conda/lib/python3.6/site-packages/pytorch_lightning/core/hooks.py", line 154, in backward
loss.backward()
File "/opt/conda/lib/python3.6/site-packages/torch/tensor.py", line 195, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph)
File "/opt/conda/lib/python3.6/site-packages/torch/autograd/__init__.py", line 99, in backward
allow_unreachable=True) # allow_unreachable flag
File "/opt/conda/lib/python3.6/site-packages/torch/autograd/function.py", line 77, in apply
return self._forward_cls.backward(self, *args)
File "/opt/conda/lib/python3.6/site-packages/revtorch/revtorch.py", line 161, in backward
y, dy = ctx.reversible_blocks[i].backward_pass(y, dy)
File "/opt/conda/lib/python3.6/site-packages/revtorch/revtorch.py", line 89, in backward_pass
gy1.backward(dy2)
File "/opt/conda/lib/python3.6/site-packages/torch/tensor.py", line 195, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph)
File "/opt/conda/lib/python3.6/site-packages/torch/autograd/__init__.py", line 99, in backward
allow_unreachable=True) # allow_unreachable flag
RuntimeError: Expected to mark a variable ready only once. This error is caused by use of a module parameter outside the `forward` function. The return value of the `forward` function is inspected by the distributed data parallel wrapper to figure out if any of the module's parameters went unused. If this is the case, it knows they won't receive gradients in a backward pass. If any of those parameters are then used outside `forward`, this error condition is triggered. You can disable unused parameter detection by passing the keyword argument `find_unused_parameters=False` to `torch.nn.parallel.DistributedDataParallel`.
script:
from reformer_pytorch import ReformerLM
import tqdm
import gzip
import numpy as np
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
from pytorch_lightning import Trainer
import os
import torch
from torch import nn
from torchvision import transforms
import argparse
import pytorch_lightning as pl
# constants
NUM_BATCHES = int(1e5)
BATCH_SIZE = 4
GRADIENT_ACCUMULATE_EVERY = 4
LEARNING_RATE = 1e-4
VALIDATE_EVERY = 100
SEQ_LEN = 4096
# helpers
def cycle(loader):
while True:
for data in loader:
yield data
with gzip.open('./data/enwik8.gz') as file:
X = np.fromstring(file.read(int(95e6)), dtype=np.uint8)
trX, vaX = np.split(X, [int(90e6)])
data_train, data_val = torch.from_numpy(trX), torch.from_numpy(vaX)
class TextSamplerDataset(Dataset):
def __init__(self, data, seq_len):
super().__init__()
self.data = data
self.seq_len = seq_len
def __getitem__(self, index):
rand_start = torch.randint(0, self.data.size(0) - self.seq_len - 1, (1,))
full_seq = self.data[rand_start: rand_start + self.seq_len + 1].long()
return full_seq[0:-1], full_seq[1:]
def __len__(self):
return self.data.size(0) // self.seq_len
class ReformerTrainer(pl.LightningModule):
def __init__(self, batch_size=4, distributed_mode=False):
super(ReformerTrainer, self).__init__()
self.batch_size = batch_size
self.distributed_mode = distributed_mode
# instantiate model
self.model = ReformerLM(
emb = 512,
depth = 6,
max_seq_len = SEQ_LEN,
num_tokens = 256,
heads = 8,
bucket_size = 64,
n_hashes = 4,
ff_chunks = 10,
lsh_dropout = 0.1,
weight_tie = True,
causal = True,
use_full_attn = False # set this to true for comparison with full attention
)
def forward(self, x):
pred = self.model(x).transpose(1, 2)
return pred
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self.forward(x)
loss = F.cross_entropy(y_hat, y, reduction='mean')
tensorboard_logs = {'train_loss': loss}
return {'loss': loss, 'log': tensorboard_logs}
def validation_step(self, batch, batch_idx):
x, y = batch
y_hat = self.forward(x)
return {'val_loss': F.cross_entropy(y_hat, y)}
def validation_end(self, outputs):
avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
tensorboard_logs = {'val_loss': avg_loss}
return {'avg_val_loss': avg_loss, 'log': tensorboard_logs}
def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self.forward(x)
return {'test_loss': F.cross_entropy(y_hat, y)}
def test_end(self, outputs):
avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
tensorboard_logs = {'test_loss': avg_loss}
return {'avg_test_loss': avg_loss, 'log': tensorboard_logs}
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=LEARNING_RATE)
@pl.data_loader
def train_dataloader(self):
# REQUIRED
dataset = TextSamplerDataset(data_train, SEQ_LEN)
if self.distributed_mode:
dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
dataloader = DataLoader(dataset, sampler=dist_sampler, batch_size=self.batch_size)
else:
dataloader = DataLoader(dataset, batch_size=self.batch_size)
return dataloader
@pl.data_loader
def val_dataloader(self):
# OPTIONAL
dataset = TextSamplerDataset(data_val, SEQ_LEN)
if self.distributed_mode:
dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
dataloader = DataLoader(dataset, sampler=dist_sampler, batch_size=self.batch_size)
else:
dataloader = DataLoader(dataset, batch_size=self.batch_size)
return dataloader
@pl.data_loader
def test_dataloader(self):
dataset = TextSamplerDataset(data_val, SEQ_LEN)
if self.distributed_mode:
dist_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
dataloader = DataLoader(dataset, sampler=dist_sampler, batch_size=self.batch_size)
else:
dataloader = DataLoader(dataset, batch_size=self.batch_size)
return dataloader
def main():
parser = argparse.ArgumentParser("reformer-lightning example")
parser.add_argument("--gpus", default=1, help="gpus to use")
parser.add_argument("-d", "--distributed", default=False, action="store_true",
help="activates distributed using data distributed parallel")
parser.add_argument("-b", "--batch_size", type=int, default=4, help="batch_size")
args = parser.parse_args()
model = ReformerTrainer(args.batch_size, args.distributed)
# most basic trainer, uses good defaults
if args.distributed:
trainer = Trainer(gpus=args.gpus, distributed_backend='ddp', accumulate_grad_batches=GRADIENT_ACCUMULATE_EVERY)
else:
trainer = Trainer(gpus=args.gpus, distributed_backend='dp', accumulate_grad_batches=GRADIENT_ACCUMULATE_EVERY)
trainer.fit(model)
trainer.test()
if __name__ == "__main__":
main()
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 28 (17 by maintainers)
@Phirefly9 @zbloss @justindujardin @fcampagne Guys! I got DeepSpeed working with Reformer after the latest Reversible Net changes! It’s blazing fast! (using it in place of DataParallel locally)
I’m not sure about distributed, but the parallelism Deepspeed provided even on my two GPUs at home is world’s faster. You can follow the example at https://github.com/lucidrains/reformer-pytorch/tree/master/examples/enwik8_deepspeed
It did not unfortunately. I’ve opened up an issue on pytorch-lightning and hope to hear from them soon
I’m sure it will work if I add that flag. I’m thinking it’s just a lightning bug, I’ve trained revtorch’s example using lightning and it worked in distributed, and I’ve been looking all over the code and don’t see anything. At his point I think it’s a bug with lightning. If I can’t find the issue after some more searching I’ll open a ticket with them.
To be honest, custom backprop scares me lol
O2 and O3 both run through on that commit. nice work!
I would be fine closing this issue on that, I don’t know what the deal with pytorch-lightning is, but I think we are both in agreement it is probably down in revtorch. So I can play with lightning some more and see if I can isolate the issue in revtorch using it
I’ll try to create a minimal example without lightning just hitting revtorch and then open one. Assuming that is the problem area