transformers: Save model checkpoint error when multi-gpu training

System Info

  • transformers version: 4.36.0.dev0
  • Platform: Linux-6.2.0-1017-azure-x86_64-with-glibc2.35
  • Python version: 3.10.13
  • Huggingface_hub version: 0.19.4
  • Safetensors version: 0.4.0
  • Accelerate version: 0.24.1
  • Accelerate config: not found
  • PyTorch version (GPU?): 2.0.1+cu118 (True)
  • Tensorflow version (GPU?): not installed (NA)
  • Flax version (CPU?/GPU?/TPU?): not installed (NA)
  • Jax version: not installed
  • JaxLib version: not installed
  • Using GPU in script?: Yes
  • Using distributed or parallel set-up in script?: Yes

Who can help?

@muellerzr and @pacman100 I found when launch the example trainer code with multi-nodes, the code will raise a FileNotFound error when saving the checkpoint, and after debug, I think the reason is in trainer.py L2382:

        if staging_output_dir != output_dir:
            os.rename(staging_output_dir, output_dir)

When one process rename the folder, and other processes will encounter the FileNotFound error. Maybe one can modify the code like this to avoid the error:

        if self.args.should_save and staging_output_dir != output_dir:
            os.rename(staging_output_dir, output_dir)

Information

  • The official example scripts
  • My own modified scripts

Tasks

  • An officially supported task in the examples folder (such as GLUE/SQuAD, …)
  • My own task or dataset (give details below)

Reproduction

Run the MAE training code from the example folder.

Expected behavior

Solve the FileNotFound error.

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Reactions: 1
  • Comments: 47 (15 by maintainers)

Most upvoted comments

Hi, @snowyday , @tblattner , and @muellerzr . I think main_process_first may be broken.

I run the trainer with 2 nodes X 8 V100 GPUs and deepspeed. When I turned on log_level=debug, I found that only one process entered the waiting mode, while all other processes tried to save the checkpoint.

The log from process that waited:

[DEBUG|training_args.py:2119] 2023-12-27 15:11:30,917 >> 4: waiting for the main process to perform Renaming model checkpoint folder to true location

A similar error has now occurred at L.2561 89c6481

I am experiencing this issue in a distributed training environment that utilizes a shared file system across 16 nodes, with each node equipped with 4 GPUs. I’m deploying the training using DeepSpeed’s OpenMPI launcher.

In this setup, I have observed scenarios where the cleanup command shutil.rmtree(staging_output_dir) at L.2561 in the code fails to execute due to the condition self.is_local_process_zero() not being met on the slave nodes. This is intended to “Clean up the remaining staging checkpoint folders on other nodes,” but it does not always work as expected.

File "XXX/transformers/src/transformers/trainer.py", line 2561, in _save_checkpoint
    shutil.rmtree(staging_output_dir)

File "XXX/lib/python3.11/shutil.py", line 681, in _rmtree_safe_fd
FileNotFoundError: [Errno 2] No such file or directory: 'rng_state_6.pth'    os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)

FileNotFoundError: [Errno 2] No such file or directory: 'rng_state_6.pth'
FileNotFoundError: [Errno 2] No such file or directory: 'rng_state_6.pth'    os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)

FileNotFoundError: FileNotFoundError:     os.unlink(entry.name, dir_fd=topfd)
[Errno 2] No such file or directory: 'rng_state_6.pth'
[Errno 2] No such file or directory: 'rng_state_6.pth'
FileNotFoundError:     os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)
[Errno 2] No such file or directory: 'rng_state_6.pth'
FileNotFoundError: FileNotFoundError: FileNotFoundError:     os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)
[Errno 2] No such file or directory: 'rng_state_6.pth'
[Errno 2] No such file or directory: 'rng_state_6.pth'
[Errno 2] No such file or directory: 'rng_state_6.pth'
FileNotFoundError: FileNotFoundError: [Errno 2] No such file or directory: 'rng_state_6.pth'    os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)
    os.unlink(entry.name, dir_fd=topfd)
[Errno 2] No such file or directory: 'rng_state_6.pth'

FileNotFoundError: FileNotFoundError: [Errno 2] No such file or directory: 'rng_state_6.pth'FileNotFoundError: [Errno 2] No such file or directory: 'rng_state_6.pth'[Errno 2] No such file or directory: 'rng_state_6.pth'

[89c6481]

        # Then go through the rewriting process, only renaming and rotating from main process(es)
        if self.is_local_process_zero() if self.args.save_on_each_node else self.is_world_process_zero():
            if staging_output_dir != output_dir:
                if os.path.exists(staging_output_dir):
                    try:
                        os.rename(staging_output_dir, output_dir)
                    except Exception as e:
                        logger.error(
                            f"Error occurred when attempting to rename checkpoint folder: {e}\n"
                            "The checkpoint folder will not be renamed, but the training will proceed."
                        )

                    # Ensure rename completed in cases where os.rename is not atomic
                    # And can only happen on non-windows based systems
                    if os.name != "nt":
                        fd = os.open(output_dir, os.O_RDONLY)
                        os.fsync(fd)
                        os.close(fd)

            # Maybe delete some older checkpoints.
            if self.args.should_save:
                # Solely rely on numerical checkpoint id for rotation.
                # mtime is not reliable especially on some fuse fs in cloud environments.
                self._rotate_checkpoints(use_mtime=False, output_dir=run_dir)
        elif self.is_local_process_zero():
            # Clean up the remaining staging checkpoint folders on other nodes
            if staging_output_dir != output_dir and os.path.exists(staging_output_dir):
                shutil.rmtree(staging_output_dir) @L.2561
    
        self.args.distributed_state.wait_for_everyone()

Although os.path.exists(staging_output_dir) is used for verification, it seems that staging_output_dir does not exist when shutil.rmtree(staging_output_dir) is executed. It looks like a try-except block needs to be implemented here as well.

            if staging_output_dir != output_dir and os.path.exists(staging_output_dir):
                try:
                    shutil.rmtree(staging_output_dir) @L.2561
                except Exception as e:
                     logger.error(
                            f"Error occurred when attempting to delete checkpoint folder: {e}\n"
                        )

                  if os.name != "nt":
                      fd = os.open(staging_output_dir, os.O_RDONLY)
                      os.fsync(fd)
                      os.close(fd)

I encountered a similar error when using the trainer from DeepSpeed. The error occurs at the exact moment after if os.path.exists(staging_output_dir): is evaluated and another process finishes renaming.

I had no other choice, so I resorted to using a try block to get around it.

if staging_output_dir != output_dir:
    with self.args.main_process_first(
        desc="Renaming model checkpoint folder to true location", local=self.args.save_on_each_node
    ):
        if os.path.exists(staging_output_dir):
            try:
                os.rename(staging_output_dir, output_dir)
            except Exception as e:
                logger.info(f"Could not rename checkpoint directory from {staging_output_dir} to {output_dir}. Reason: {e}")
    

transformers-4.37.0.dev0

I also meet the same problem in 4,38.2. Using the 4.37,2 fix this issue.

I’m not sure if it fails or not. From what I understand, the network attached storage node might not actually complete the operation before the next process comes to check if the path exists. It will complete, just not in the timeframe allowed (sometimes). But that outlines the core issue here.

My suggestion is to use something like this: if self.args.distributed_state.is_local_main_process if self.args.save_on_each_node else self.args.distributed_state.is_main_process:

Then self.args.distributed_state.wait_for_everyone() to synchronize everyone afterwards.

This would only use the main process if save_on_each_node is false, otherwise only the local main processes. Which I think is the intended behavior. The part I’m not sure of is if the renamed file is used later downstream, then that could introduce a race condition there…

It would be nice if we could have an fsync for the shared filesystem to ensure the rename actually completed.

I’ve checked the main_process_first using the code snippet below: Number of nodes: 3 Processes per node (GPUs): 4 Total: 12 processes

import logging

import deepspeed
import transformers
import torch


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

if __name__ == "__main__":
    deepspeed.init_distributed()
    node_rank = torch.distributed.get_rank()   
    training_args = transformers.TrainingArguments(per_device_train_batch_size=8,
                                                   gradient_accumulation_steps=2,
                                                   num_train_epochs=3,
                                                   deepspeed="ds_config/ds_config_zero3.json",
                                                   output_dir="logs")

    with training_args.main_process_first():
        logger.info(f"Check `main_process_first`. Node rank {node_rank}")
Address family not supported by protocol).
[INFO:root:Check `main_process_first`. Node rank 8
INFO:root:Check `main_process_first`. Node rank 0
INFO:root:Check `main_process_first`. Node rank 4
INFO:root:Check `main_process_first`. Node rank 6
INFO:root:Check `main_process_first`. Node rank 10
INFO:root:Check `main_process_first`. Node rank 5
INFO:root:Check `main_process_first`. Node rank 9
INFO:root:Check `main_process_first`. Node rank 1
INFO:root:Check `main_process_first`. Node rank 2
INFO:root:Check `main_process_first`. Node rank 3
INFO:root:Check `main_process_first`. Node rank 7
INFO:root:Check `main_process_first`. Node rank 11

The node rankings appear to be correctly allocated, with Node rank 0 going to node 1, Node rank 4 to node 2, and Node rank 8 to node 3; however, there are inaccuracies with the global rankings. In the context of a shared filesystem, if we proceed without waiting for the result from global rank 0, it could cause conflicts during the os.rename operation.

if staging_output_dir != output_dir:
    with self.args.main_process_first(
        desc="Renaming model checkpoint folder to true location", local=self.args.save_on_each_node
    ):
        if os.path.exists(staging_output_dir):
            os.rename(staging_output_dir, output_dir)
        

@muellerzr @thundergolfer I still get the same issue of saving checkpoint using the latest version of transformers 4.36 and even with ‘4.37.0.dev0’

I used three workers each one has two GPUs, I tried fine-tuning to be saved on a shared storage and a non-shared storage, and for both cases I still got the same error!

FileNotFoundError: [Errno 2] No such file or directory: ‘model/tmp-checkpoint-49’ -> ‘model/checkpoint-49’

File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 1537, in train
  return inner_training_loop(
File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 1929, in _inner_training_loop
  self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval)
File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 2279, in _maybe_log_save_evaluate
  self._save_checkpoint(model, trial, metrics=metrics)
File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 2395, in _save_checkpoint
  os.rename(staging_output_dir, output_dir)
FileNotFoundError: [Errno 2] No such file or directory: 'model/tmp-checkpoint-49' -> 'model/checkpoint-49'

although the model/checkpoint-49 is already created!

I’ve been using a try-except approach for bypassing the issue, and it’s been working well for me. However, as xk-huang mentioned, it seems that the root cause is that self.args.main_process_first is not handling multi-node setups properly.

any solutions? facing the same issue on multinode training using deepspeed

Hi @snowyday - could you open a new issue, including all these details and linking to this issue? This way we can better track what’s been addressed and what’s a new issue

It is, so we could have a race condition. An fsync could be done certainly and your logic makes sense. @tblattner would you like to open a PR on this by chance?

I can get a start on a PR. Not sure what the best methodology for running fsync on a rename operation is, but I’ll give it a shot.

FYI, we tested and also experienced this without shared FS (accelerate/pdsh, simple two-node setup).

Also, if we rely on full fsync implementation in checkpoint folder, it might be good to explicitly call that out in docs as not all filesystems/mount options will fail hard on “fake” fsync calls.

@thundergolfer Rank 0 should pop up first, and the others should hang tight until the renaming wraps up. I should set args.save_on_each_node=False:

with self.args.main_process_first(
        desc="Renaming model checkpoint folder to true location", local=self.args.save_on_each_node
    ):

I encountered this issue with the trainer with the following command-line. This was after recently updating transformers with pip install transformers --upgrade

--save_strategy epoch --save_total_limit 1

transformers==4.36.2

Edit: One thing to note this was with 2 nodes with 8x A100s per node. Looking at the code around the error, I have a feeling this was because I may have used local=True when using with main_process_first. Going to try disabling save_on_each_node.

        if staging_output_dir != output_dir:
            with self.args.main_process_first(
                desc="Renaming model checkpoint folder to true location", local=self.args.save_on_each_node
            ):
                if os.path.exists(staging_output_dir):
                    os.rename(staging_output_dir, output_dir)

edit edit: Looks like its still not working even when specifying save_on_each_node to false.

Here is the full command, launched from a slurm sbatch job:

srun --kill-on-bad-exit=1 --jobid $SLURM_JOB_ID bash -c "accelerate launch --use_deepspeed --zero_stage 1 --deepspeed_hostfile hostfile --deepspeed_multinode_launcher openmpi --gradient_accumulation_steps 1 --num_processes $(( $NUM_GPUS * $COUNT_NODE )) --num_machines $COUNT_NODE --num_cpu_threads_per_process $CPU_COUNT --mixed_precision bf16 --machine_rank \$SLURM_PROCID --main_process_ip $MASTER_ADDR --main_process_port $MASTER_PORT main.py --source_datasets_filepath source_data/clm --output_dir testing_output_cluster --model_number 2 --overwrite_output_dir --dataloader_num_workers 10 --bf16 --data_fraction 0.1 --save_strategy steps --save_total_limit 1 --save_on_each_node false --dataloader_num_workers 2 --per_device_train_batch_size 1 --per_device_eval_batch_size 1 --max_token_length 1024 --num_train_epochs 1"

@hahmad2008 can you try doing either pip install transformers -U or reinstall from git? From the line numbers it’s not adding up that you’re using a version that includes the fix