transformers: Running Trainer.train() with deepspeed throws OSError: handle is closed error when saving checkpoint
System Info
transformersversion: 4.27.0.dev0- Platform: Linux-5.15.0-58-generic-x86_64-with-glibc2.35
- Python version: 3.10.6
- Huggingface_hub version: 0.12.0
- PyTorch version (GPU?): 1.12.0+cu113 (True)
- Tensorflow version (GPU?): not installed (NA)
- Flax version (CPU?/GPU?/TPU?): not installed (NA)
- Jax version: not installed
- JaxLib version: not installed
- Using GPU in script?: yes, via deepspeed
- Using distributed or parallel set-up in script?: yes, via deepspeed
Who can help?
Information
- The official example scripts
- My own modified scripts
Tasks
- An officially supported task in the
examplesfolder (such as GLUE/SQuAD, …) - My own task or dataset (give details below)
Reproduction
I’ve been trying to use the Trainer with deepspeed using the following guide: https://huggingface.co/docs/transformers/v4.25.1/en/main_classes/deepspeed#trainer-deepspeed-integration
Below is my python code:
#!/usr/bin/env python
# coding=utf-8
# Copyright The HuggingFace Team and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Fine-tuning the library models for sequence to sequence.
"""
# You can also adapt this script on your own sequence to sequence task. Pointers for this are left as comments.
import logging
import os
import sys
from dataclasses import dataclass, field
from typing import Optional
import datasets
import numpy as np
from datasets import Dataset, DatasetDict, load_dataset
import evaluate
import transformers
from transformers import (
AutoConfig,
AutoTokenizer,
HfArgumentParser,
M2M100Tokenizer,
MBart50Tokenizer,
MBart50TokenizerFast,
MBartTokenizer,
MBartTokenizerFast,
Trainer,
TrainingArguments,
AutoModelForCausalLM,
default_data_collator,
set_seed,
)
from transformers.trainer_utils import get_last_checkpoint
from transformers.utils import check_min_version, send_example_telemetry
from transformers.utils.versions import require_version
import bittensor
from itertools import chain
from tqdm.auto import tqdm
# Will error if the minimal version of Transformers is not installed. Remove at your own risks.
check_min_version("4.27.0.dev0")
require_version("datasets>=1.8.0", "To fix: pip install -r examples/pytorch/translation/requirements.txt")
logger = logging.getLogger(__name__)
# A list of all multilingual tokenizer which require src_lang and tgt_lang attributes.
MULTILINGUAL_TOKENIZERS = [MBartTokenizer, MBartTokenizerFast, MBart50Tokenizer, MBart50TokenizerFast, M2M100Tokenizer]
@dataclass
class ModelArguments:
"""
Arguments pertaining to which model/config/tokenizer we are going to fine-tune from.
"""
model_name_or_path: str = field(
metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"}
)
config_name: Optional[str] = field(
default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"}
)
tokenizer_name: Optional[str] = field(
default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"}
)
cache_dir: Optional[str] = field(
default=None,
metadata={"help": "Where to store the pretrained models downloaded from huggingface.co"},
)
use_fast_tokenizer: bool = field(
default=True,
metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."},
)
model_revision: str = field(
default="main",
metadata={"help": "The specific model version to use (can be a branch name, tag name or commit id)."},
)
use_auth_token: bool = field(
default=False,
metadata={
"help": (
"Will use the token generated when running `huggingface-cli login` (necessary to use this script "
"with private models)."
)
},
)
@dataclass
class DataTrainingArguments:
"""
Arguments pertaining to what data we are going to input our model for training and eval.
"""
source_lang: str = field(default=None, metadata={"help": "Source language id for translation."})
target_lang: str = field(default=None, metadata={"help": "Target language id for translation."})
dataset_name: Optional[str] = field(
default=None, metadata={"help": "The name of the dataset to use (via the datasets library)."}
)
dataset_config_name: Optional[str] = field(
default=None, metadata={"help": "The configuration name of the dataset to use (via the datasets library)."}
)
train_file: Optional[str] = field(default=None, metadata={"help": "The input training data file (a jsonlines)."})
validation_file: Optional[str] = field(
default=None,
metadata={
"help": "An optional input evaluation data file to evaluate the metrics (sacrebleu) on a jsonlines file."
},
)
test_file: Optional[str] = field(
default=None,
metadata={"help": "An optional input test data file to evaluate the metrics (sacrebleu) on a jsonlines file."},
)
overwrite_cache: bool = field(
default=False, metadata={"help": "Overwrite the cached training and evaluation sets"}
)
preprocessing_num_workers: Optional[int] = field(
default=None,
metadata={"help": "The number of processes to use for the preprocessing."},
)
max_source_length: Optional[int] = field(
default=1024,
metadata={
"help": (
"The maximum total input sequence length after tokenization. Sequences longer "
"than this will be truncated, sequences shorter will be padded."
)
},
)
max_target_length: Optional[int] = field(
default=128,
metadata={
"help": (
"The maximum total sequence length for target text after tokenization. Sequences longer "
"than this will be truncated, sequences shorter will be padded."
)
},
)
val_max_target_length: Optional[int] = field(
default=None,
metadata={
"help": (
"The maximum total sequence length for validation target text after tokenization. Sequences longer "
"than this will be truncated, sequences shorter will be padded. Will default to `max_target_length`."
"This argument is also used to override the ``max_length`` param of ``model.generate``, which is used "
"during ``evaluate`` and ``predict``."
)
},
)
pad_to_max_length: bool = field(
default=False,
metadata={
"help": (
"Whether to pad all samples to model maximum sentence length. "
"If False, will pad the samples dynamically when batching to the maximum length in the batch. More "
"efficient on GPU but very bad for TPU."
)
},
)
max_train_samples: Optional[int] = field(
default=None,
metadata={
"help": (
"For debugging purposes or quicker training, truncate the number of training examples to this "
"value if set."
)
},
)
max_eval_samples: Optional[int] = field(
default=None,
metadata={
"help": (
"For debugging purposes or quicker training, truncate the number of evaluation examples to this "
"value if set."
)
},
)
max_predict_samples: Optional[int] = field(
default=None,
metadata={
"help": (
"For debugging purposes or quicker training, truncate the number of prediction examples to this "
"value if set."
)
},
)
num_beams: Optional[int] = field(
default=None,
metadata={
"help": (
"Number of beams to use for evaluation. This argument will be passed to ``model.generate``, "
"which is used during ``evaluate`` and ``predict``."
)
},
)
ignore_pad_token_for_loss: bool = field(
default=True,
metadata={
"help": "Whether to ignore the tokens corresponding to padded labels in the loss computation or not."
},
)
source_prefix: Optional[str] = field(
default=None, metadata={"help": "A prefix to add before every source text (useful for T5 models)."}
)
forced_bos_token: Optional[str] = field(
default=None,
metadata={
"help": (
"The token to force as the first generated token after the :obj:`decoder_start_token_id`.Useful for"
" multilingual models like :doc:`mBART <../model_doc/mbart>` where the first generated token needs to"
" be the target language token.(Usually it is the target language token)"
)
},
)
def __post_init__(self):
if self.dataset_name is None and self.train_file is None and self.validation_file is None:
raise ValueError("Need either a dataset name or a training/validation file.")
# accepting both json and jsonl file extensions, as
# many jsonlines files actually have a .json extension
valid_extensions = ["json", "jsonl"]
if self.train_file is not None:
extension = self.train_file.split(".")[-1]
assert extension in valid_extensions, "`train_file` should be a jsonlines file."
if self.validation_file is not None:
extension = self.validation_file.split(".")[-1]
assert extension in valid_extensions, "`validation_file` should be a jsonlines file."
if self.val_max_target_length is None:
self.val_max_target_length = self.max_target_length
def load_raw_datasets(name: str, confName: str) -> DatasetDict:
if name == "bittensor":
dataset = bittensor.dataset(
no_tokenizer=True,
# batch_size=cfg.training.train_batch_size,
# block_size=cfg.dataset.block_size,
)
dataloader = dataset.dataloader(1000)
bittensor_dataset = {"text": []}
for batch in tqdm(dataloader, desc="Loading data from bittensor IPFS"):
bittensor_dataset["text"].extend(batch)
raw_datasets = Dataset.from_dict(bittensor_dataset)
dataset.close() # Avoid leaving threadqueue running.
return raw_datasets
if os.path.exists(name):
data_files = {"text": name}
dataset_args = {}
extension = os.path.splitext(name)[-1].lstrip(".")
if extension == "txt":
extension = "text"
dataset_args["keep_linebreaks"] = True
raw_datasets = load_dataset(
extension, data_files=data_files, **dataset_args)
raw_datasets = raw_datasets["text"]
else:
raw_datasets = load_dataset(name, confName)
return raw_datasets
def load_model_and_tokenizer(model_args: ModelArguments):
config = AutoConfig.from_pretrained(
model_args.config_name if model_args.config_name else model_args.model_name_or_path,
cache_dir=model_args.cache_dir,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
)
tokenizer = AutoTokenizer.from_pretrained(
model_args.tokenizer_name if model_args.tokenizer_name else model_args.model_name_or_path,
cache_dir=model_args.cache_dir,
use_fast=model_args.use_fast_tokenizer,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
)
model = AutoModelForCausalLM.from_pretrained(
model_args.model_name_or_path,
from_tf=bool(".ckpt" in model_args.model_name_or_path),
config=config,
cache_dir=model_args.cache_dir,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
)
# tokenizer.pad_token = cfg.tokenizer.pad_token
if tokenizer.pad_token is None and tokenizer.eos_token is not None:
tokenizer.pad_token = tokenizer.eos_token
# model = AutoModelForCausalLM.from_pretrained(
# name,
# from_tf=bool(".ckpt" in name),
# config=config,
# )
# model.to('cuda')
# model.resize_token_embeddings(len(tokenizer))
# We resize the embeddings only when necessary to avoid index errors. If you are creating a model from scratch
# on a small vocab and want a smaller embedding size, remove this test.
embedding_size = model.get_input_embeddings().weight.shape[0]
if len(tokenizer) > embedding_size:
model.resize_token_embeddings(len(tokenizer))
return tokenizer, model
def preprocess(blockSize, tokenizer, raw_datasets):
# First we tokenize all the texts.
column_names = raw_datasets.column_names
text_column_name = "text" if "text" in column_names else column_names["train"][0]
if True is True:
pad = False
else:
pad = "max_length"
def group_texts(examples):
# print(examples)
# Concatenate all texts.
concatenated_examples = {
k: list(chain(*examples[k])) for k in examples.keys()}
# print(concatenated_examples)
total_length = len(concatenated_examples[list(examples.keys())[0]])
if total_length >= blockSize:
total_length = (
total_length // blockSize
) * blockSize
# Split by chunks of max_len.
result = {
k: [
t[i: i + blockSize]
for i in range(0, total_length, blockSize)
]
for k, t in concatenated_examples.items()
}
result["labels"] = result["input_ids"].copy()
return result
def tokenize_fn(examples):
# result = tokenizer(
# examples[text_column_name],
# padding=pad,
# truncation=True,
# max_length=cfg.dataset.block_size,
# )
# result["labels"] = result["input_ids"].copy()
# return result
return tokenizer(examples[text_column_name])
tokenized_datasets = raw_datasets.map(
tokenize_fn,
batched=True,
remove_columns=text_column_name,
load_from_cache_file=not False,
desc="Running tokenizer on dataset",
)
lm_datasets = tokenized_datasets.map(
group_texts,
batched=True,
num_proc=None,
load_from_cache_file=not False,
desc=f"Grouping texts in chunks of {blockSize}",
)
return lm_datasets
def main():
# See all possible arguments in src/transformers/training_args.py
# or by passing the --help flag to this script.
# We now keep distinct sets of args, for a cleaner separation of concerns.
parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
if len(sys.argv) == 2 and sys.argv[1].endswith(".json"):
# If we pass only one argument to the script and it's the path to a json file,
# let's parse it to get our arguments.
model_args, data_args, training_args = parser.parse_json_file(json_file=os.path.abspath(sys.argv[1]))
else:
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
# Sending telemetry. Tracking the example usage helps us better allocate resources to maintain them. The
# information sent is the one passed as arguments along with your Python/PyTorch versions.
send_example_telemetry("run_translation", model_args, data_args)
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log_level = training_args.get_process_log_level()
logger.setLevel(log_level)
datasets.utils.logging.set_verbosity(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
# Log on each process the small summary:
logger.warning(
f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}"
+ f"distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}"
)
logger.info(f"Training/evaluation parameters {training_args}")
tokenizer, model = load_model_and_tokenizer(model_args)
# dataset = load_raw_datasets("bittensor", None)
dataset = load_raw_datasets("wikitext", "wikitext-2-raw-v1")
tokenized_datasets = preprocess(2, tokenizer, dataset)
if "train" not in tokenized_datasets.column_names:
tokenized_datasets = tokenized_datasets.train_test_split(
test_size=5 / 100
)
tokenized_datasets_test_valid = tokenized_datasets["test"].train_test_split(
test_size=0.5
)
tokenized_datasets["test"] = tokenized_datasets_test_valid["train"]
tokenized_datasets["validation"] = tokenized_datasets_test_valid["test"]
train_dataset = tokenized_datasets["train"]
eval_dataset = tokenized_datasets["validation"]
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
# tokenizer=tokenizer,
# compute_metrics=compute_metrics,
)
trainer.train()
if __name__ == "__main__":
main()`
The JSON config I'm using for deepspeed is:
`{
"fp16": {
"enabled": "auto",
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"bf16": {
"enabled": "auto"
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": "auto",
"betas": "auto",
"eps": "auto",
"weight_decay": "auto"
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": "auto",
"warmup_max_lr": "auto",
"warmup_num_steps": "auto"
}
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": true
},
"offload_param": {
"device": "cpu",
"pin_memory": true
},
"overlap_comm": true,
"contiguous_gradients": true,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": true
},
"gradient_accumulation_steps": "auto",
"gradient_clipping": "auto",
"steps_per_print": 2000,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": false
}
And the command I’m using is:
deepspeed examples/pytorch/translation/run-text-gen.py --deepspeed tests/deepspeed/ds_config_zero3.json --model_name_or_path EleutherAI/gpt-neo-1.3B --output_dir=bennyD --evaluation_strategy epoch --num_train_epochs 2 --dataset_name wikitext --dataset_config "wikitext-2-raw-v1"
The full stack trace:
Exception in thread MsgRouterThr:
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/home/horza/.local/lib/python3.10/site-packages/wandb/sdk/interface/router.py", line 69, in message_loop
msg = self._read_message()
File "/home/horza/.local/lib/python3.10/site-packages/wandb/sdk/interface/router_queue.py", line 32, in _read_message
msg = self._response_queue.get(timeout=1)
File "/usr/lib/python3.10/multiprocessing/queues.py", line 117, in get
res = self._recv_bytes()
File "/usr/lib/python3.10/multiprocessing/connection.py", line 212, in recv_bytes
self._check_closed()
File "/usr/lib/python3.10/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
It’s worth noting that if I run the following code: https://github.com/huggingface/transformers/blob/main/examples/pytorch/translation/run_translation.py used in the guide, and modify it to make checkpoints, I do not get the same error.
Additionally if I add --save_strategy no to my command, it completes with no errors. But I need the checkpoints.
Please help, been trying to figure this one out for a while.
Expected behavior
The command runs with checkpoints and completes without errors.
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 16 (8 by maintainers)
All super helpful pointers thanks again