pytorch-forecasting: Error when Using Distributed GPU Processing
When I initialize my TFT trainer to use multiple GPUs
# Configure network and trainer
pl.seed_everything(407)
trainer = pl.Trainer(
gpus = [0],
gradient_clip_val = 0.1 # hyperparam to prevent gradient divergance for RNNs
)
tft = TemporalFusionTransformer.from_dataset(
training,
# not meaningful for finding the learning rate but otherwise very important
learning_rate = 0.03,
hidden_size = 16, # most important hyperparameter apart from learning rate
# number of attention heads. Set to up to 4 for large datasets
attention_head_size = 1,
dropout = 0.1, # between 0.1 and 0.3 are good values
hidden_continuous_size = 8, # set to <= hidden_size
output_size = 7, # 7 quantiles by default
loss = QuantileLoss(),
# reduce learning rate if no improvement in validation loss after x epochs
reduce_on_plateau_patience = 4,
)
print(f"Number of parameters in network: {tft.size()/1e3:.1f}k")
The library is able to recognize that I used both GPUs
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
CUDA_VISIBLE_DEVICES: [0,1]
Number of parameters in network: 23.4k
However, when I try to find the optimal learning rate
# Find optimal learning rate
res = trainer.lr_find(
tft,
train_dataloader = train_dataloader,
val_dataloaders = val_dataloader,
max_lr = 10.,
min_lr = 1e-6,
)
print(f"Suggested learning rate: {res.suggestion()}")
fig = res.plot(show = True, suggest = True)
fig.show()
I get an AttributeError: Can't pickle local object '_apply_to_outputs.<locals>.decorator_fn.<locals>.new_func' error with the following trace:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-29-01060df08a43> in <module>
1 # Find optimal learning rate
----> 2 res = trainer.lr_find(
3 tft,
4 train_dataloader = train_dataloader,
5 val_dataloaders = val_dataloader,
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/lr_finder.py in lr_find(self, model, train_dataloader, val_dataloaders, min_lr, max_lr, num_training, mode, early_stop_threshold)
198
199 # Fit, lr & loss logged in callback
--> 200 self.fit(model,
201 train_dataloader=train_dataloader,
202 val_dataloaders=val_dataloaders)
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/states.py in wrapped_fn(self, *args, **kwargs)
46 if entering is not None:
47 self.state = entering
---> 48 result = fn(self, *args, **kwargs)
49
50 # The INTERRUPTED state can be set inside the run function. To indicate that run was interrupted
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py in fit(self, model, train_dataloader, val_dataloaders, datamodule)
1050 self.accelerator_backend = DDPSpawnBackend(self)
1051 self.accelerator_backend.setup()
-> 1052 self.accelerator_backend.train(model, nprocs=self.num_processes)
1053 results = self.accelerator_backend.teardown(model)
1054
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/accelerators/ddp_spawn_backend.py in train(self, model, nprocs)
41
42 def train(self, model, nprocs):
---> 43 mp.spawn(self.ddp_train, nprocs=nprocs, args=(self.mp_queue, model,))
44
45 def teardown(self, model):
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/torch/multiprocessing/spawn.py in spawn(fn, args, nprocs, join, daemon)
160 daemon=daemon,
161 )
--> 162 process.start()
163 error_queues.append(error_queue)
164 processes.append(process)
~/anaconda3/envs/forecasting/lib/python3.8/multiprocessing/process.py in start(self)
119 'daemonic processes are not allowed to have children'
120 _cleanup()
--> 121 self._popen = self._Popen(self)
122 self._sentinel = self._popen.sentinel
123 # Avoid a refcycle if the target function holds an indirect
~/anaconda3/envs/forecasting/lib/python3.8/multiprocessing/context.py in _Popen(process_obj)
282 def _Popen(process_obj):
283 from .popen_spawn_posix import Popen
--> 284 return Popen(process_obj)
285
286 class ForkServerProcess(process.BaseProcess):
~/anaconda3/envs/forecasting/lib/python3.8/multiprocessing/popen_spawn_posix.py in __init__(self, process_obj)
30 def __init__(self, process_obj):
31 self._fds = []
---> 32 super().__init__(process_obj)
33
34 def duplicate_for_child(self, fd):
~/anaconda3/envs/forecasting/lib/python3.8/multiprocessing/popen_fork.py in __init__(self, process_obj)
17 self.returncode = None
18 self.finalizer = None
---> 19 self._launch(process_obj)
20
21 def duplicate_for_child(self, fd):
~/anaconda3/envs/forecasting/lib/python3.8/multiprocessing/popen_spawn_posix.py in _launch(self, process_obj)
45 try:
46 reduction.dump(prep_data, fp)
---> 47 reduction.dump(process_obj, fp)
48 finally:
49 set_spawning_popen(None)
~/anaconda3/envs/forecasting/lib/python3.8/multiprocessing/reduction.py in dump(obj, file, protocol)
58 def dump(obj, file, protocol=None):
59 '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60 ForkingPickler(file, protocol).dump(obj)
61
62 #
AttributeError: Can't pickle local object '_apply_to_outputs.<locals>.decorator_fn.<locals>.new_func'
Any idea what may be triggering this? My guess is that because I’m not distributing across multiple machines, the pickle is getting messed up. That’s fine and just indicates I misunderstood that setting for distributed_backend, but moving on, I hit errors with the other distributed_backend settings as well.
Following https://pytorch-lightning.readthedocs.io/en/latest/multi_gpu.html#distributed-modes, when I hard-code distributed_backend to ddp2, I get this trace
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/accelerators/ddp2_backend.py in _resolve_task_idx(self)
52 try:
---> 53 self.task_idx = int(os.environ['LOCAL_RANK'])
54 except Exception as e:
~/anaconda3/envs/forecasting/lib/python3.8/os.py in __getitem__(self, key)
674 # raise KeyError with the original key value
--> 675 raise KeyError(key) from None
676 return self.decodevalue(value)
KeyError: 'LOCAL_RANK'
During handling of the above exception, another exception occurred:
MisconfigurationException Traceback (most recent call last)
<ipython-input-29-01060df08a43> in <module>
1 # Find optimal learning rate
----> 2 res = trainer.lr_find(
3 tft,
4 train_dataloader = train_dataloader,
5 val_dataloaders = val_dataloader,
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/lr_finder.py in lr_find(self, model, train_dataloader, val_dataloaders, min_lr, max_lr, num_training, mode, early_stop_threshold)
198
199 # Fit, lr & loss logged in callback
--> 200 self.fit(model,
201 train_dataloader=train_dataloader,
202 val_dataloaders=val_dataloaders)
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/states.py in wrapped_fn(self, *args, **kwargs)
46 if entering is not None:
47 self.state = entering
---> 48 result = fn(self, *args, **kwargs)
49
50 # The INTERRUPTED state can be set inside the run function. To indicate that run was interrupted
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py in fit(self, model, train_dataloader, val_dataloaders, datamodule)
1033 if self.use_ddp2:
1034 self.accelerator_backend = DDP2Backend(self)
-> 1035 self.accelerator_backend.setup()
1036 self.accelerator_backend.train(model)
1037
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/accelerators/ddp2_backend.py in setup(self)
43
44 def setup(self):
---> 45 self._resolve_task_idx()
46
47 def _resolve_task_idx(self):
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/accelerators/ddp2_backend.py in _resolve_task_idx(self)
54 except Exception as e:
55 m = 'ddp2 only works in SLURM or via torchelastic with the WORLD_SIZE, LOCAL_RANK, GROUP_RANK flags'
---> 56 raise MisconfigurationException(m)
57
58 def train(self, model):
MisconfigurationException: ddp2 only works in SLURM or via torchelastic with the WORLD_SIZE, LOCAL_RANK, GROUP_RANK flags
and when I hard-code distributed_backend to dp (which is what I would expect to work most readily), I get
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-29-01060df08a43> in <module>
1 # Find optimal learning rate
----> 2 res = trainer.lr_find(
3 tft,
4 train_dataloader = train_dataloader,
5 val_dataloaders = val_dataloader,
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/lr_finder.py in lr_find(self, model, train_dataloader, val_dataloaders, min_lr, max_lr, num_training, mode, early_stop_threshold)
198
199 # Fit, lr & loss logged in callback
--> 200 self.fit(model,
201 train_dataloader=train_dataloader,
202 val_dataloaders=val_dataloaders)
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/states.py in wrapped_fn(self, *args, **kwargs)
46 if entering is not None:
47 self.state = entering
---> 48 result = fn(self, *args, **kwargs)
49
50 # The INTERRUPTED state can be set inside the run function. To indicate that run was interrupted
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py in fit(self, model, train_dataloader, val_dataloaders, datamodule)
1062 self.accelerator_backend = DataParallelBackend(self)
1063 self.accelerator_backend.setup(model)
-> 1064 results = self.accelerator_backend.train()
1065 self.accelerator_backend.teardown()
1066
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/accelerators/dp_backend.py in train(self)
95 def train(self):
96 model = self.trainer.model
---> 97 results = self.trainer.run_pretrain_routine(model)
98 return results
99
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py in run_pretrain_routine(self, model)
1222
1223 # run a few val batches before training starts
-> 1224 self._run_sanity_check(ref_model, model)
1225
1226 # clear cache before training
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py in _run_sanity_check(self, ref_model, model)
1255 num_loaders = len(self.val_dataloaders)
1256 max_batches = [self.num_sanity_val_steps] * num_loaders
-> 1257 eval_results = self._evaluate(model, self.val_dataloaders, max_batches, False)
1258
1259 # allow no returns from eval
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/evaluation_loop.py in _evaluate(self, model, dataloaders, max_batches, test_mode)
394 # ---------------------
395 using_eval_result = len(outputs) > 0 and len(outputs[0]) > 0 and isinstance(outputs[0][0], EvalResult)
--> 396 eval_results = self.__run_eval_epoch_end(test_mode, outputs, dataloaders, using_eval_result)
397
398 # log callback metrics
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_lightning/trainer/evaluation_loop.py in __run_eval_epoch_end(self, test_mode, outputs, dataloaders, using_eval_result)
488 eval_results = self.__gather_epoch_end_eval_results(outputs)
489
--> 490 eval_results = model.validation_epoch_end(eval_results)
491 user_reduced = True
492
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_forecasting/models/base_model.py in validation_epoch_end(self, outputs)
142
143 def validation_epoch_end(self, outputs):
--> 144 log, _ = self.epoch_end(outputs, label="val")
145 return log
146
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_forecasting/models/temporal_fusion_transformer/__init__.py in epoch_end(self, outputs, label)
611 run at epoch end for training or validation
612 """
--> 613 log, out = super().epoch_end(outputs, label=label)
614 if self.log_interval(label == "train") > 0:
615 self._log_interpretation(out, label=label)
~/anaconda3/envs/forecasting/lib/python3.8/site-packages/pytorch_forecasting/models/base_model.py in epoch_end(self, outputs, label)
245 outputs = [out["callback_metrics"] for out in outputs]
246 # log average loss and metrics
--> 247 n_samples = sum([x["n_samples"] for x in outputs])
248 avg_loss = torch.stack([x[f"{label}_loss"] * x["n_samples"] / n_samples for x in outputs]).sum()
249 log_keys = outputs[0]["log"].keys()
TypeError: unsupported operand type(s) for +: 'int' and 'list'
When I use ddp (as recommended for pytorch, given the speedup), the pipeline freezes and running watch nvidia-smi from the terminal just shows the GPUs aren’t moving and aren’t loading any memory for processing.
This error is thrown using the same setup as I had in #85, which I got working on a single GPU but now that I’m doing multivariate time series across all 50 states I’d really like to use both my GPUs to speed up the runtime.
Thanks!
About this issue
- Original URL
- State: open
- Created 4 years ago
- Comments: 21 (6 by maintainers)
Thanks for all these updates!
With
pytorch-forecasting=0.5.2,torch.cuda.device_count()returns2(the expected value); however, nowpl.Trainer(gpus = [0,1])andpl.Trainer(gpus = 2)both returnRunning
pl.Trainer(gpus=2, distributed_backend='ddp')throws the same error. Really weird…I think this is because of the recent upgrade to
pytorch-lightning> 1.0, as their docs forTrainerhave updated: https://pytorch-lightning.readthedocs.io/en/latest/trainer.html.On the other hand, when I open Python and only load
pytorch-lightningand then directly run the code above it works:When I try to initialize the
Trainerobject right at the very beginning of the notebook, It loads fine but throws a future warning:I’ll play around with the notebook and try to track down what’s going on and will update you.
Totally understand! No rush 😃 Just was curious 😉
Interesting, I have not tried that, yet. Let me look into that pickling error. It might be the main issue.