diff --git a/configs/iterative_dpo.yaml b/configs/iterative_dpo.yaml index 1b18e1d03..d70a41ead 100644 --- a/configs/iterative_dpo.yaml +++ b/configs/iterative_dpo.yaml @@ -17,7 +17,6 @@ preprocessing_num_workers: 16 output_dir: ./output_models/iterative_dpo run_name: iterative_dpo random_seed: 42 -use_accelerator: True enable_distributed_inference: True distributed_inference_num_instances: 8 initial_iter_idx: 0 # 0 refers to the first dataset in dataset_path_list diff --git a/examples/benchmarking.py b/examples/benchmarking.py index 2b1d57741..3becf2e91 100644 --- a/examples/benchmarking.py +++ b/examples/benchmarking.py @@ -214,7 +214,7 @@ def main(): dataset_name = benchmarking_args.dataset_name # metric = pipeline_args.metric if is_lmflow_local_benchmarking(dataset_name): # TODO (@Jipeng) - model = AutoModel.get_model(model_args, tune_strategy='none', ds_config=ds_config) + model = AutoModel.get_model(model_args, do_train=False, ds_config=ds_config) run_lmflow_local_benchmarking(dataset_name,pipeline_name,model_args,pipeline_args,model) # Pass args TODO (@Jipeng) elif is_lm_evaluation_benchmarking(dataset_name): model = model_args.model_name_or_path diff --git a/examples/chatbot.py b/examples/chatbot.py index b6d8138e7..34e1587de 100644 --- a/examples/chatbot.py +++ b/examples/chatbot.py @@ -64,10 +64,9 @@ def main(): model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, ds_config=ds_config, device=pipeline_args.device, - use_accelerator=True, ) # We don't need input data, we will read interactively from stdin diff --git a/examples/chatbot_gradio.py b/examples/chatbot_gradio.py index d522497a1..b3259221e 100644 --- a/examples/chatbot_gradio.py +++ b/examples/chatbot_gradio.py @@ -110,7 +110,7 @@ class ChatbotArguments: model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, ds_config=ds_config, device=pipeline_args.device, torch_dtype=torch.float16 diff --git a/examples/evaluation.py b/examples/evaluation.py index b9360763c..a1224769a 100644 --- a/examples/evaluation.py +++ b/examples/evaluation.py @@ -36,9 +36,8 @@ model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, ds_config=ds_config, - use_accelerator=pipeline_args.use_accelerator_for_evaluator ) dataset = Dataset(data_args) diff --git a/examples/finetune_multi_modal.py b/examples/finetune_multi_modal.py index 90fecc2a3..cacab7144 100644 --- a/examples/finetune_multi_modal.py +++ b/examples/finetune_multi_modal.py @@ -59,7 +59,7 @@ def main(): # do not resiger deepspeed in the model. # with_deepspeed flag may be removed # by modifying the tune strategy in the future. - model = AutoModel.get_model(model_args, tune_strategy='none', + model = AutoModel.get_model(model_args, do_train=True, ds_config=pipeline_args.deepspeed, custom_model=True, with_deepspeed=False, diff --git a/examples/inference.py b/examples/inference.py index 6381aa86d..cbee351ba 100644 --- a/examples/inference.py +++ b/examples/inference.py @@ -39,10 +39,9 @@ def main(): model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, ds_config=ds_config, device=pipeline_args.device, - use_accelerator=True, ) # We don't need input data, we will read interactively from stdin diff --git a/examples/merge_lora.py b/examples/merge_lora.py index cd52363a8..3bbdb1dde 100644 --- a/examples/merge_lora.py +++ b/examples/merge_lora.py @@ -62,7 +62,7 @@ def main(): model_args.use_lora = True model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, device=merge_lora_args.device, ds_config=merge_lora_args.ds_config ) diff --git a/examples/rm_inference.py b/examples/rm_inference.py index 0f8acf7ce..9c999e988 100644 --- a/examples/rm_inference.py +++ b/examples/rm_inference.py @@ -40,7 +40,7 @@ def main(): model_args, data_args, pipeline_args = parser.parse_args_into_dataclasses() dataset = Dataset(data_args) - model = AutoModel.get_model(model_args, tune_strategy='none', use_accelerator=pipeline_args.use_accelerator) + model = AutoModel.get_model(model_args, do_train=False) inferencer = AutoPipeline.get_pipeline( pipeline_name=pipeline_name, model_args=model_args, diff --git a/examples/vis_chatbot.py b/examples/vis_chatbot.py index d313ef9f9..3cb2491fd 100644 --- a/examples/vis_chatbot.py +++ b/examples/vis_chatbot.py @@ -105,7 +105,7 @@ def main(): ds_config = json.load(f) model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, ds_config=ds_config, device=pipeline_args.device, custom_model=model_args.custom_model, diff --git a/examples/vis_chatbot_gradio.py b/examples/vis_chatbot_gradio.py index e86a29c6a..4326d7950 100644 --- a/examples/vis_chatbot_gradio.py +++ b/examples/vis_chatbot_gradio.py @@ -245,7 +245,7 @@ def start_inferencer( model = AutoModel.get_model( model_args, - tune_strategy='none', + do_train=False, ds_config=ds_config, device=pipeline_args.device, custom_model=model_args.custom_model, diff --git a/examples/vllm_inference.py b/examples/vllm_inference.py index d6e4b7859..1e06f9db9 100644 --- a/examples/vllm_inference.py +++ b/examples/vllm_inference.py @@ -40,7 +40,7 @@ def main(): model_args, data_args, pipeline_args = parser.parse_args_into_dataclasses() dataset = Dataset(data_args) - model = AutoModel.get_model(model_args, tune_strategy='none') + model = AutoModel.get_model(model_args, do_train=False) inferencer = AutoPipeline.get_pipeline( pipeline_name=pipeline_name, model_args=model_args, diff --git a/requirements.txt b/requirements.txt index 66e885d6c..3db38164d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,6 @@ tokenizers>=0.13.3 peft>=0.10.0 torch>=2.0.1 wandb -deepspeed>=0.14.4 sentencepiece transformers>=4.31.0 cpm_kernels==1.0.11 diff --git a/scripts/run_chatbot.sh b/scripts/run_chatbot.sh index 8e1c49be0..a5a58f77b 100755 --- a/scripts/run_chatbot.sh +++ b/scripts/run_chatbot.sh @@ -16,7 +16,6 @@ accelerate launch --config_file configs/accelerator_multigpu_config.yaml \ examples/chatbot.py \ --deepspeed configs/ds_config_chatbot.json \ --model_name_or_path ${model} \ - --use_accelerator True \ --max_new_tokens 256 \ --temperature 1.0 \ --end_string "#" \ diff --git a/scripts/run_evaluation_accelerator.sh b/scripts/run_evaluation_accelerator.sh index 8959f6f4b..f5411398c 100644 --- a/scripts/run_evaluation_accelerator.sh +++ b/scripts/run_evaluation_accelerator.sh @@ -13,5 +13,4 @@ CUDA_VISIBLE_DEVICES=0 accelerate launch --config_file configs/accelerator_singl --metric accuracy \ --output_dir output_dir/accelerator_1_card \ --inference_batch_size_per_device 1 \ - --use_accelerator_for_evaluator True \ --torch_dtype bfloat16 diff --git a/scripts/run_inference.sh b/scripts/run_inference.sh index 1a3241c1f..03fc60a94 100755 --- a/scripts/run_inference.sh +++ b/scripts/run_inference.sh @@ -15,7 +15,6 @@ accelerate launch --config_file configs/accelerator_multigpu_config.yaml \ examples/inference.py \ --deepspeed configs/ds_config_chatbot.json \ --model_name_or_path ${model} \ - --use_accelerator True \ --max_new_tokens 256 \ --temperature 1.0 \ ${lora_args} diff --git a/scripts/run_rm_inference.sh b/scripts/run_rm_inference.sh index 701daf120..32d68dff1 100644 --- a/scripts/run_rm_inference.sh +++ b/scripts/run_rm_inference.sh @@ -61,7 +61,6 @@ accelerate launch --config_file configs/accelerator_multigpu_config.yaml \ --trust_remote_code ${trust_remote_code} \ --model_name_or_path ${model_name_or_path} \ --arch_type text_regression \ - --use_accelerator True \ --block_size 4096 \ --inference_batch_size 16 \ --dataset_path ${dataset_path} \ diff --git a/service/app.py b/service/app.py index 7580ac39e..29b92daa9 100644 --- a/service/app.py +++ b/service/app.py @@ -54,7 +54,7 @@ class AppArguments: local_rank = int(os.getenv("LOCAL_RANK", "0")) world_size = int(os.getenv("WORLD_SIZE", "1")) torch.cuda.set_device(local_rank) -model = AutoModel.get_model(model_args, tune_strategy='none', ds_config=ds_config, use_accelerator=True) +model = AutoModel.get_model(model_args, do_train=False, ds_config=ds_config) accelerator = Accelerator() def stream_generate(inputs,context_len = 1024, max_new_tokens=128, end_string="##"): diff --git a/setup.py b/setup.py index 37ee0e965..e6d894e49 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,8 @@ "gradio": ["gradio"], "flask": ["flask", "flask_cors"], "flash_attn": ["flash-attn>=2.0.2"], - "trl": ["trl==0.8.0"] + "trl": ["trl==0.8.0"], + "deepspeed": ["deepspeed>=0.14.4"], } readme_path = os.path.join(folder, "README.md") diff --git a/src/lmflow/args.py b/src/lmflow/args.py index 518957db6..22425e2bf 100644 --- a/src/lmflow/args.py +++ b/src/lmflow/args.py @@ -924,8 +924,8 @@ class EvaluatorArguments: ), }, ) - use_accelerator_for_evaluator: bool = field( - default=False, metadata={"help": "Whether to use Huggingface Accelerator instead of Deepspeed"}, + use_accelerator_for_evaluator: Optional[bool] = field( + default=None, metadata={"help": "[Deprecated] Whether to use Huggingface Accelerator instead of Deepspeed"}, ) temperature: float = field( @@ -942,6 +942,14 @@ class EvaluatorArguments: default=100, metadata={"help": "Maximum length during inference."}, ) + + def __post_init__(self): + if self.use_accelerator_for_evaluator is not None: + logger.warning( + "You've specified `use_accelerator_for_evaluator`. This argument is deprecated. " + "It will not take effect and will be removed in a future version, " + "since LMFlow now can automatically detect whether is in Accelerate or Deepspeed environment." + ) @dataclass @@ -1061,8 +1069,8 @@ class InferencerArguments: "help": "whether turn on true random sampling during inference." }, ) - use_accelerator: bool = field( - default=False, metadata={"help": "Whether to use Huggingface Accelerator instead of Deepspeed"}, + use_accelerator: Optional[bool] = field( + default=None, metadata={"help": "[Deprecated] Whether to use Huggingface Accelerator instead of Deepspeed"}, ) use_beam_search: Optional[bool] = field( default=False, @@ -1131,6 +1139,13 @@ class InferencerArguments: ) def __post_init__(self): + if self.use_accelerator is not None: + logger.warning( + "You've specified `use_accelerator`. This argument is deprecated. " + "It will not take effect and will be removed in a future version, " + "since LMFlow now can automatically detect whether is in Accelerate or Deepspeed environment." + ) + if self.save_results: if self.results_path is None: raise ValueError("Need to specify results_path when save_results is True.") diff --git a/src/lmflow/models/hf_decoder_model.py b/src/lmflow/models/hf_decoder_model.py index 0fd6da2f0..30c70e73d 100644 --- a/src/lmflow/models/hf_decoder_model.py +++ b/src/lmflow/models/hf_decoder_model.py @@ -3,16 +3,12 @@ """This is a class called HFDecoderModel which is a wrapper around transformers model and tokenizer classes. It has several methods such as __init__, tokenize, and train that are used for training and fine-tuning the model. The __init__ method takes in several arguments -such as model_args, tune_strategy, and ds_config, which are used to load the pretrained +such as model_args which are used to load the pretrained model and tokenizer, and initialize the training settings. The tokenize method is used to tokenize the input text and return the input IDs and attention masks that can be fed to the model for training or inference. -This class supports different tune_strategy options such as 'normal', 'none', 'lora', and -'adapter', which allow for different fine-tuning settings of the model. However, the 'lora' -and 'adapter' strategies are not yet implemented. - Overall, this class provides a convenient interface for loading and fine-tuning transformer models and can be used for various NLP tasks such as language modeling, text classification, and question answering. @@ -46,6 +42,7 @@ conversation_tokenize_function ) from lmflow.utils.versioning import is_ray_available, is_vllm_available, is_flash_attn_available +from lmflow.utils.envs import is_accelerate_env logger = logging.getLogger(__name__) @@ -74,11 +71,9 @@ class HFDecoderModel(DecoderModel, HFModelMixin, Tunable): model_args : Model arguments such as model name, path, revision, etc. - tune_strategy : str or none, default="normal". - A string representing the dataset backend. Defaults to "huggingface". - - ds_config : - Deepspeed configuations. + do_train : bool, default True + Determines whether to prepare the model for training, including distribtued env, model placement, quantization, + lora, etc. args : Optional. Positional arguments. @@ -90,26 +85,16 @@ class HFDecoderModel(DecoderModel, HFModelMixin, Tunable): def __init__( self, model_args, - tune_strategy='normal', - ds_config=None, + do_train=True, device="gpu", - use_accelerator=False, *args, **kwargs ): - """ - Initializes a HFDecoderModel instance. - :param model_args: dictionary with model arguments such as model name, path, revision, etc. - :param tune_strategy: tuning strategy: normal, none, lora or adapter - :param ds_config: deepspeed configuration for distributed training - """ HFModelMixin.__init__( self, model_args=model_args, - do_train=True if tune_strategy == "normal" else False, - ds_config=ds_config, + do_train=do_train, device=device, - use_accelerator=use_accelerator, *args, **kwargs ) @@ -384,7 +369,7 @@ def __inference(self, inputs, *args, **kwargs): The generated sequence output """ with torch.no_grad(): - if self.use_accelerator: + if is_accelerate_env(): outputs = self.backend_model.generate( input_ids=inputs, pad_token_id=self.tokenizer.pad_token_id, diff --git a/src/lmflow/models/hf_model_mixin.py b/src/lmflow/models/hf_model_mixin.py index 45f414292..251761e2e 100644 --- a/src/lmflow/models/hf_model_mixin.py +++ b/src/lmflow/models/hf_model_mixin.py @@ -1,14 +1,13 @@ #!/usr/bin/env python # coding=utf-8 # Copyright 2024 Statistics and Machine Learning Research Group. All rights reserved. +import copy import gc -import os import logging +from contextlib import nullcontext from typing import Union, Optional, Dict, List -import copy import torch -import deepspeed from transformers import ( CONFIG_MAPPING, AutoConfig, @@ -33,7 +32,8 @@ LMFLOW_LORA_TARGET_MODULES_MAPPING ) from lmflow.args import ModelArguments -from lmflow.utils.versioning import is_vllm_available +from lmflow.utils.versioning import is_vllm_available, is_deepspeed_available +from lmflow.utils.envs import is_accelerate_env if is_vllm_available(): from vllm import LLM, SamplingParams @@ -61,9 +61,7 @@ def __init__( self, model_args: ModelArguments, do_train: bool, - ds_config=None, device: Optional[str]="gpu", - use_accelerator: bool=False, hf_auto_model_additional_args: Optional[Dict]=None, *args, **kwargs @@ -76,12 +74,8 @@ def __init__( Dictionary with model arguments such as model name, path, revision, etc. do_train : bool To prepare the model for training or inference. - ds_config : optional - Deepspeed configuration for distributed training, by default None device : str, optional By default "gpu" - use_accelerator : bool, optional - By default False """ # See more about loading any type of standard or custom dataset (from @@ -96,8 +90,6 @@ def __init__( self.device = device self.model_args = model_args self.hf_auto_model = HF_AUTOMODEL_MAPPING[model_args.arch_type] - self.use_accelerator = use_accelerator - self.ds_config = ds_config self.do_train = do_train self.tokenizer = self.__prepare_tokenizer(model_args) @@ -380,9 +372,15 @@ def __prepare_model_for_training( # We resize the embeddings only when necessary to avoid index errors. # If you are creating a model from scratch on a small vocab and want a # smaller embedding size, remove this test. - with deepspeed.zero.GatheredParameters(model.get_input_embeddings().weight, modifier_rank=None): + resize_embedding_context = nullcontext() + if is_deepspeed_available() and not is_accelerate_env(): + import deepspeed + resize_embedding_context = deepspeed.zero.GatheredParameters(model.get_input_embeddings().weight, modifier_rank=None) + + with resize_embedding_context: weights = model.get_input_embeddings().weight embedding_size = weights.shape[0] + if len(self.tokenizer) > embedding_size: model.resize_token_embeddings(len(self.tokenizer)) @@ -394,8 +392,6 @@ def __prepare_model_for_inference( self, model_args: ModelArguments, hf_auto_model: HF_AUTOMODEL_TYPE, - use_accelerator: bool, - ds_config ): logger.info(f"Backend model already initialized, moving to device: {self.device}") if hasattr(self, "backend_model"): @@ -413,12 +409,8 @@ def __prepare_model_for_inference( "offload_state_dict": True, } - if use_accelerator or model_args.use_ram_optimized_load: + if model_args.use_ram_optimized_load: inference_load_kwargs.update(ram_optimized_load_kwargs) - - if not use_accelerator: - from transformers.integrations import HfDeepSpeedConfig - dschf = HfDeepSpeedConfig(ds_config) try: self.backend_model = hf_auto_model.from_pretrained( @@ -448,10 +440,14 @@ def __prepare_model_for_inference( model_args.lora_model_path, ) - if (not use_accelerator) and self.device == "gpu": - deepspeed.init_distributed() - self.ds_engine = deepspeed.initialize(model=self.backend_model, config_params=ds_config)[0] - self.ds_engine.module.eval() + if self.device == "gpu" and not is_accelerate_env(): + if is_deepspeed_available(): + import deepspeed + deepspeed.init_distributed() + self.ds_engine = deepspeed.initialize(model=self.backend_model)[0] + self.ds_engine.module.eval() + else: + raise ImportError("Deepspeed is not available. Please install via `pip install -e '.[deepspeed]'`.") self.__prepare_model_post_process() @@ -517,8 +513,6 @@ def activate_model_for_inference( self.__prepare_model_for_inference( model_args=self.model_args, hf_auto_model=self.hf_auto_model, - use_accelerator=self.use_accelerator, - ds_config=self.ds_config, ) self._activated = True diff --git a/src/lmflow/models/hf_text_regression_model.py b/src/lmflow/models/hf_text_regression_model.py index 46904cdbd..8ab95cb75 100644 --- a/src/lmflow/models/hf_text_regression_model.py +++ b/src/lmflow/models/hf_text_regression_model.py @@ -1,22 +1,12 @@ #!/usr/bin/env python # coding=utf-8 # Copyright 2024 Statistics and Machine Learning Research Group. All rights reserved. -import os import copy import hashlib import logging -from pathlib import Path from typing import List, Union, Dict, Optional import torch -from peft import ( - LoraConfig, - PeftModel, - TaskType, - get_peft_config, - get_peft_model, - prepare_model_for_kbit_training -) from transformers.modeling_outputs import SequenceClassifierOutputWithPast from lmflow.args import ModelArguments @@ -40,6 +30,7 @@ ) from lmflow.utils.data_utils import RewardModelInferenceResultWithInput from lmflow.utils.versioning import is_ray_available, is_vllm_available +from lmflow.utils.envs import is_accelerate_env if is_ray_available(): import ray @@ -62,11 +53,9 @@ class HFTextRegressionModel(TextRegressionModel, HFModelMixin, Tunable): model_args : Model arguments such as model name, path, revision, etc. - tune_strategy : str or none, default="normal". - A string representing the dataset backend. Defaults to "huggingface". - - ds_config : - Deepspeed configuations. + do_train : bool, default True + Determines whether to prepare the model for training, including distribtued env, model placement, quantization, + lora, etc. args : Optional. Positional arguments. @@ -78,19 +67,11 @@ class HFTextRegressionModel(TextRegressionModel, HFModelMixin, Tunable): def __init__( self, model_args: ModelArguments, - tune_strategy: str='normal', - ds_config=None, - device="gpu", - use_accelerator=False, + do_train: bool = False, + device = "gpu", *args, **kwargs ): - """ - Initializes a HFTextRegressionModel instance. - :param model_args: dictionary with model arguments such as model name, path, revision, etc. - :param tune_strategy: tuning strategy: normal, none, lora or adapter - :param ds_config: deepspeed configuration for distributed training - """ assert model_args.arch_type == "text_regression", ( f"Invalid model architecture type: {model_args.arch_type}. " f"Expected: text_regression" @@ -99,10 +80,8 @@ def __init__( HFModelMixin.__init__( self, model_args=model_args, - do_train=True if tune_strategy == "normal" else False, - ds_config=ds_config, + do_train=do_train, device=device, - use_accelerator=use_accelerator, hf_auto_model_additional_args=config_additional_args, *args, **kwargs @@ -331,13 +310,14 @@ def __inference( The generated sequence output """ with torch.no_grad(): - if self.use_accelerator: + if is_accelerate_env(): outputs = self.backend_model( input_ids=inputs, **kwargs, ) else: - if self.device == "gpu": + if self.device == "gpu": + # for scripts that run using 'deepspeed script.py' outputs = self.ds_engine.module( input_ids=inputs, synced_gpus=True, diff --git a/src/lmflow/pipeline/evaluator.py b/src/lmflow/pipeline/evaluator.py index 6ca98fbf8..fc144e03b 100644 --- a/src/lmflow/pipeline/evaluator.py +++ b/src/lmflow/pipeline/evaluator.py @@ -2,23 +2,29 @@ The class has two methods: create_dataloader() that loads the data from the test file, creates a data loader, and returns it with the size of the data, and evaluate(model) that generates output text given input text. It uses the create_dataloader() method to load the data, iterates over the data in mini-batches, and encodes the input text with the encode() method of the HFDecoderModel class. Then, it generates output text using the evaluate() method of the HFDecoderModel class, decodes the generated output text using the decode() method of the HFDecoderModel class, and writes the output to a file in the output directory. The method also logs some information to the console and Weights and Biases if the use_wandb argument is True. """ +import datetime +import json import os + +import numpy as np import torch +import torch.distributed as dist import wandb -import deepspeed -import sys -import numpy as np -import datetime -import json # TODO: remove later from accelerate import Accelerator from transformers import AutoConfig -import torch.distributed as dist +from lmflow.args import ( + ModelArguments, + DatasetArguments, + EvaluatorArguments +) from lmflow.datasets.dataset import Dataset from lmflow.pipeline.base_pipeline import BasePipeline from lmflow.models.hf_decoder_model import HFDecoderModel from lmflow.utils.data_utils import set_random_seed, batchlize, answer_extraction +from lmflow.utils.versioning import is_deepspeed_available +from lmflow.utils.envs import is_accelerate_env os.environ["TOKENIZERS_PARALLELISM"] = "false" # To avoid warnings about parallelism in tokenizers class Evaluator(BasePipeline): @@ -38,7 +44,12 @@ class Evaluator(BasePipeline): """ - def __init__(self, model_args, data_args, evaluator_args): + def __init__( + self, + model_args: ModelArguments, + data_args: DatasetArguments, + evaluator_args: EvaluatorArguments, + ): # our method self.data_args = data_args self.evaluator_args = evaluator_args @@ -53,10 +64,14 @@ def __init__(self, model_args, data_args, evaluator_args): self.world_size = int(os.getenv("WORLD_SIZE", "1")) torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error - if evaluator_args.use_accelerator_for_evaluator: + if is_accelerate_env(): self.accelerator = Accelerator() self.accelerator.wait_for_everyone() else: + if is_deepspeed_available(): + import deepspeed + else: + raise ImportError("Deepspeed is not available, please install using `pip install -e \".[deepspeed]\"`") deepspeed.init_distributed() self.config = AutoConfig.from_pretrained(model_args.model_name_or_path) @@ -135,8 +150,8 @@ def evaluate( """ if metric in ["acc", "accuracy"]: - if self.evaluator_args.use_accelerator_for_evaluator: - acc = self._evaluate_acc_with_accelerator(model, dataset, verbose=verbose) + if is_accelerate_env(): + acc = self._evaluate_acc_with_accelerate(model, dataset, verbose=verbose) else: acc = self._evaluate_acc_with_deepspeed(model, dataset, verbose=verbose) print(f"Evaluating final accuracy: {acc}") @@ -153,7 +168,7 @@ def evaluate( raise NotImplementedError(f"metric {metric} is not supported") - def _evaluate_acc_with_accelerator(self, model, dataset, verbose=True): + def _evaluate_acc_with_accelerate(self, model, dataset, verbose=True): dataloader, data_size = self.create_dataloader(dataset) if self.accelerator.is_local_main_process: if not os.path.exists(self.evaluator_args.output_dir): @@ -176,7 +191,13 @@ def _evaluate_acc_with_accelerator(self, model, dataset, verbose=True): inputs = batch_input['input_ids'] mask = batch_input['attention_mask'] with self.accelerator.autocast(): - outputs = model.inference(inputs, max_new_tokens=self.evaluator_args.max_new_tokens,attention_mask=mask,temperature=self.evaluator_args.temperature, repetition_penalty=self.evaluator_args.repetition_penalty,use_accelerator=self.evaluator_args.use_accelerator_for_evaluator) + outputs = model.inference( + inputs, + max_new_tokens=self.evaluator_args.max_new_tokens, + attention_mask=mask, + temperature=self.evaluator_args.temperature, + repetition_penalty=self.evaluator_args.repetition_penalty, + ) text_out = model.decode(outputs, skip_special_tokens=True) decoded_input = model.decode(inputs, skip_special_tokens=True,) prompt_length = [len(i) for i in decoded_input] diff --git a/src/lmflow/pipeline/finetuner.py b/src/lmflow/pipeline/finetuner.py index a19886189..2d132d0ee 100644 --- a/src/lmflow/pipeline/finetuner.py +++ b/src/lmflow/pipeline/finetuner.py @@ -10,15 +10,16 @@ from typing import Any, Iterable, Optional, Tuple, Union import datasets -import transformers import evaluate +import numpy as np +import transformers +from copy import deepcopy from itertools import chain from transformers import ( Trainer, default_data_collator, set_seed, ) -from copy import deepcopy from transformers import PreTrainedModel, TrainingArguments from transformers.trainer_utils import get_last_checkpoint from transformers.trainer_callback import ( @@ -30,7 +31,6 @@ is_sagemaker_mp_enabled, send_example_telemetry, ) -import numpy as np import lmflow.optim.optimizers as optim from lmflow.args import OptimizerNames, DatasetArguments, ModelArguments, FinetunerArguments diff --git a/src/lmflow/pipeline/inferencer.py b/src/lmflow/pipeline/inferencer.py index 855284fa7..e74bd1683 100644 --- a/src/lmflow/pipeline/inferencer.py +++ b/src/lmflow/pipeline/inferencer.py @@ -6,7 +6,6 @@ import os import torch import wandb -import deepspeed import sys import numpy as np import datetime @@ -22,13 +21,19 @@ import torch.distributed as dist import torch.nn.functional as F -from lmflow.args import DatasetArguments +from lmflow.args import ( + DatasetArguments, + ModelArguments, + InferencerArguments, +) from lmflow.datasets.dataset import Dataset from lmflow.pipeline.base_pipeline import BasePipeline from lmflow.models.hf_decoder_model import HFDecoderModel from lmflow.utils.data_utils import (set_random_seed, batchlize, answer_extraction, process_image_flag) from lmflow.utils.constants import IMAGE_TOKEN_INDEX +from lmflow.utils.versioning import is_deepspeed_available +from lmflow.utils.envs import is_accelerate_env os.environ["TOKENIZERS_PARALLELISM"] = "false" # To avoid warnings about parallelism in tokenizers def rstrip_partial_utf8(string): return string.replace("\ufffd", "") @@ -57,7 +62,12 @@ class Inferencer(BasePipeline): """ - def __init__(self, model_args, data_args, inferencer_args): + def __init__( + self, + model_args: ModelArguments, + data_args: DatasetArguments, + inferencer_args: InferencerArguments, + ): self.data_args = data_args self.inferencer_args = inferencer_args self.model_args = model_args @@ -66,9 +76,11 @@ def __init__(self, model_args, data_args, inferencer_args): self.local_rank = int(os.getenv("LOCAL_RANK", "0")) self.world_size = int(os.getenv("WORLD_SIZE", "1")) - if inferencer_args.device == "gpu": + if inferencer_args.device == "gpu": # FIXME: a bit weird here torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error - deepspeed.init_distributed() + if not is_accelerate_env() and is_deepspeed_available(): + import deepspeed + deepspeed.init_distributed() else: os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "15000" @@ -83,7 +95,7 @@ def __init__(self, model_args, data_args, inferencer_args): print("Error in setting hidden size, use the default size 1024") self.model_hidden_size = 1024 # gpt2 seems do not have hidden_size in config - if inferencer_args.use_accelerator: + if is_accelerate_env(): self.accelerator = Accelerator() self.accelerator.wait_for_everyone() @@ -226,7 +238,7 @@ def inference( f"device \"{self.inferencer_args.device}\" is not supported" ) - if self.inferencer_args.use_accelerator: + if is_accelerate_env(): inputs = inputs.to(self.accelerator.device) @@ -234,7 +246,7 @@ def inference( inputs["image_token_indexes"] = image_token_indexes inputs["one_sample_multiple_images"] = True - if self.inferencer_args.use_accelerator: + if is_accelerate_env(): with self.accelerator.autocast(): outputs = model.inference( inputs, @@ -242,7 +254,6 @@ def inference( temperature=self.inferencer_args.temperature, repetition_penalty=self.inferencer_args.repetition_penalty, do_sample=self.inferencer_args.do_sample, - use_accelerator=True, ) else: outputs = model.inference( @@ -413,7 +424,6 @@ def predict_next_token(model: HFDecoderModel, input_ids: torch.Tensor, num_new_t """Predict the next token given the input_ids. """ output = model.inference(input_ids, - use_accelerator=True, max_new_tokens=num_new_tokens, return_dict_in_generate=True, output_scores=True, @@ -631,7 +641,6 @@ def inference( input_length = input_id.shape[1] output_id = model.inference( input_id, - use_accelerator=True, max_new_tokens=max_new_tokens, # pad_token_id=model.tokenizer.eos_token_id, ) diff --git a/src/lmflow/pipeline/iterative_dpo_aligner.py b/src/lmflow/pipeline/iterative_dpo_aligner.py index 59d122545..f0d0510cc 100644 --- a/src/lmflow/pipeline/iterative_dpo_aligner.py +++ b/src/lmflow/pipeline/iterative_dpo_aligner.py @@ -83,7 +83,7 @@ def _align_single_iteration( print_banner(f'Iterative DPO {iteration_name}: Generate responses') model = HFDecoderModel( model_args=target_model_args, - tune_strategy='none' + do_train=False ) self._do_target_model_inference( model=model, @@ -97,8 +97,7 @@ def _align_single_iteration( print_banner(f'Iterative DPO {iteration_name}: Reward model scoring') reward_model = HFTextRegressionModel( model_args=reward_model_args, - tune_strategy='none', - use_accelerator=self.aligner_args.use_accelerator, + do_train=False, ) target_model_inference_result_data_args = copy.deepcopy(dataset.data_args) target_model_inference_result_data_args.dataset_path = str(self.workspace_path/iteration_name/"target_model_inference_result") diff --git a/src/lmflow/pipeline/rm_inferencer.py b/src/lmflow/pipeline/rm_inferencer.py index 890e4ee20..160f83b77 100644 --- a/src/lmflow/pipeline/rm_inferencer.py +++ b/src/lmflow/pipeline/rm_inferencer.py @@ -5,7 +5,6 @@ import os import torch import wandb -import deepspeed import sys import numpy as np import datetime @@ -36,7 +35,8 @@ RewardModelInferenceResultWithInput, ) from lmflow.datasets.dataset import KEY_SCORE -from lmflow.utils.versioning import is_ray_available +from lmflow.utils.versioning import is_ray_available, is_deepspeed_available +from lmflow.utils.envs import is_accelerate_env if is_ray_available(): import ray @@ -78,15 +78,17 @@ def __init__( self.local_rank = int(os.getenv("LOCAL_RANK", "0")) self.world_size = int(os.getenv("WORLD_SIZE", "1")) - if inferencer_args.device == "gpu": + if inferencer_args.device == "gpu": # FIXME: a bit weird here torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error - deepspeed.init_distributed() + if not is_accelerate_env() and is_deepspeed_available(): + import deepspeed + deepspeed.init_distributed() else: dist.init_process_group( "gloo", rank=self.local_rank, world_size=self.world_size ) - if inferencer_args.use_accelerator: + if is_accelerate_env(): self.accelerator: Accelerator = kwargs.get('accelerator', Accelerator()) @@ -194,7 +196,7 @@ def __inference( ): # len(batch) = batch_size, and batch element is dataset sample model_input_tensor = torch.LongTensor(batched_input_ids).to("cpu" if model.device == "cpu" else "cuda") - if self.inferencer_args.use_accelerator: + if is_accelerate_env(): with self.accelerator.autocast(): batch_output = model.inference( inputs=model_input_tensor, @@ -256,8 +258,7 @@ def __init__( ): self.model = HFTextRegressionModel( model_args=model_args, - tune_strategy='none', - use_accelerator=True + do_train=False, ) self.model.activate_model_for_inference(use_vllm=False) diff --git a/src/lmflow/pipeline/utils/memory_safe_vllm_inference.py b/src/lmflow/pipeline/utils/memory_safe_vllm_inference.py index d7859d9a1..59ab69eba 100644 --- a/src/lmflow/pipeline/utils/memory_safe_vllm_inference.py +++ b/src/lmflow/pipeline/utils/memory_safe_vllm_inference.py @@ -47,7 +47,7 @@ def main(): model_args, data_args, pipeline_args = parser.parse_args_into_dataclasses() dataset = Dataset(data_args) - model = AutoModel.get_model(model_args, tune_strategy='none') + model = AutoModel.get_model(model_args, do_train=False) inferencer = VLLMInferencer(model_args, data_args, pipeline_args) res = inferencer.inference( diff --git a/src/lmflow/pipeline/utils/raft_trainer.py b/src/lmflow/pipeline/utils/raft_trainer.py index c3a5253f7..a28c659d7 100644 --- a/src/lmflow/pipeline/utils/raft_trainer.py +++ b/src/lmflow/pipeline/utils/raft_trainer.py @@ -62,7 +62,6 @@ from transformers.configuration_utils import PretrainedConfig from transformers.data.data_collator import DataCollator, DataCollatorWithPadding, default_data_collator from transformers.debug_utils import DebugOption, DebugUnderflowOverflow -from transformers.deepspeed import deepspeed_init, is_deepspeed_zero3_enabled from transformers.dependency_versions_check import dep_version_check from transformers.modelcard import TrainingSummary from transformers.modeling_utils import PreTrainedModel, load_sharded_checkpoint, unwrap_model @@ -150,6 +149,12 @@ ) from transformers.utils.generic import ContextManagers +from lmflow.utils.versioning import is_package_version_at_least +if is_package_version_at_least("transformers", "4.46.0"): + from transformers.integrations.deepspeed import deepspeed_init, is_deepspeed_zero3_enabled +else: + from transformers.deepspeed import deepspeed_init, is_deepspeed_zero3_enabled + _is_native_cpu_amp_available = is_torch_greater_or_equal_than_1_10 diff --git a/src/lmflow/utils/envs.py b/src/lmflow/utils/envs.py new file mode 100644 index 000000000..3736f7433 --- /dev/null +++ b/src/lmflow/utils/envs.py @@ -0,0 +1,7 @@ +import os + +def is_accelerate_env(): + for key, _ in os.environ.items(): + if key.startswith('ACCELERATE_'): + return True + return False \ No newline at end of file diff --git a/src/lmflow/utils/versioning.py b/src/lmflow/utils/versioning.py index bce50b6b8..d57567eba 100644 --- a/src/lmflow/utils/versioning.py +++ b/src/lmflow/utils/versioning.py @@ -75,4 +75,8 @@ def is_trl_available(): def is_multimodal_available(): - return _is_packages_available(["PIL"]) \ No newline at end of file + return _is_packages_available(["PIL"]) + + +def is_deepspeed_available(): + return _is_package_available("deepspeed") \ No newline at end of file diff --git a/tests/models/test_hf_decoder_model.py b/tests/models/test_hf_decoder_model.py index c6e8d096d..4a07c32ff 100644 --- a/tests/models/test_hf_decoder_model.py +++ b/tests/models/test_hf_decoder_model.py @@ -18,7 +18,6 @@ import json import os from pathlib import Path -from transformers.deepspeed import HfDeepSpeedConfig from lmflow.args import DatasetArguments, ModelArguments from lmflow.datasets.dataset import Dataset @@ -653,13 +652,12 @@ def test_inference(self): ds_config_path = "examples/ds_config.json" with open (ds_config_path, "r") as f: ds_config = json.load(f) - dschf = HfDeepSpeedConfig(ds_config) model_name = 'gpt2' model_args = ModelArguments( model_name_or_path=model_name, use_ram_optimized_load=False ) - model = HFDecoderModel(model_args, tune_strategy='none', ds_config=ds_config) + model = HFDecoderModel(model_args, do_train=False, ds_config=ds_config) self.local_rank = int(os.getenv("LOCAL_RANK", "0")) self.world_size = int(os.getenv("WORLD_SIZE", "1")) torch.cuda.set_device(self.local_rank)