# Copyright 2025 the LlamaFactory team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import os import signal from collections import defaultdict from datetime import datetime from typing import Any, Optional, Union from psutil import Process from yaml import safe_dump, safe_load from ..extras import logging from ..extras.constants import ( DATA_CONFIG, DEFAULT_TEMPLATE, MULTIMODAL_SUPPORTED_MODELS, SUPPORTED_MODELS, TRAINING_ARGS, DownloadSource, ) from ..extras.misc import use_modelscope, use_openmind logger = logging.get_logger(__name__) DEFAULT_CACHE_DIR = "cache" DEFAULT_CONFIG_DIR = "config" DEFAULT_DATA_DIR = "data" DEFAULT_SAVE_DIR = "saves" USER_CONFIG = "user_config.yaml" def abort_process(pid: int) -> None: r"""Abort the processes recursively in a bottom-up way.""" try: children = Process(pid).children() if children: for child in children: abort_process(child.pid) os.kill(pid, signal.SIGABRT) except Exception: pass def get_save_dir(*paths: str) -> os.PathLike: r"""Get the path to saved model checkpoints.""" if os.path.sep in paths[-1]: logger.warning_rank0("Found complex path, some features may be not available.") return paths[-1] paths = (path.replace(" ", "").strip() for path in paths) return os.path.join(DEFAULT_SAVE_DIR, *paths) def _get_config_path() -> os.PathLike: r"""Get the path to user config.""" return os.path.join(DEFAULT_CACHE_DIR, USER_CONFIG) def load_config() -> dict[str, Union[str, dict[str, Any]]]: r"""Load user config if exists.""" try: with open(_get_config_path(), encoding="utf-8") as f: return safe_load(f) except Exception: return {"lang": None, "last_model": None, "path_dict": {}, "cache_dir": None} def save_config(lang: str, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None: r"""Save user config.""" os.makedirs(DEFAULT_CACHE_DIR, exist_ok=True) user_config = load_config() user_config["lang"] = lang or user_config["lang"] if model_name: user_config["last_model"] = model_name if model_name and model_path: user_config["path_dict"][model_name] = model_path with open(_get_config_path(), "w", encoding="utf-8") as f: safe_dump(user_config, f) def get_model_path(model_name: str) -> str: r"""Get the model path according to the model name.""" user_config = load_config() path_dict: dict[DownloadSource, str] = SUPPORTED_MODELS.get(model_name, defaultdict(str)) model_path = user_config["path_dict"].get(model_name, "") or path_dict.get(DownloadSource.DEFAULT, "") if ( use_modelscope() and path_dict.get(DownloadSource.MODELSCOPE) and model_path == path_dict.get(DownloadSource.DEFAULT) ): # replace hf path with ms path model_path = path_dict.get(DownloadSource.MODELSCOPE) if ( use_openmind() and path_dict.get(DownloadSource.OPENMIND) and model_path == path_dict.get(DownloadSource.DEFAULT) ): # replace hf path with om path model_path = path_dict.get(DownloadSource.OPENMIND) return model_path def get_template(model_name: str) -> str: r"""Get the template name if the model is a chat/distill/instruct model.""" return DEFAULT_TEMPLATE.get(model_name, "default") def get_time() -> str: r"""Get current date and time.""" return datetime.now().strftime(r"%Y-%m-%d-%H-%M-%S") def is_multimodal(model_name: str) -> bool: r"""Judge if the model is a vision language model.""" return model_name in MULTIMODAL_SUPPORTED_MODELS def load_dataset_info(dataset_dir: str) -> dict[str, dict[str, Any]]: r"""Load dataset_info.json.""" if dataset_dir == "ONLINE" or dataset_dir.startswith("REMOTE:"): logger.info_rank0(f"dataset_dir is {dataset_dir}, using online dataset.") return {} try: with open(os.path.join(dataset_dir, DATA_CONFIG), encoding="utf-8") as f: return json.load(f) except Exception as err: logger.warning_rank0(f"Cannot open {os.path.join(dataset_dir, DATA_CONFIG)} due to {str(err)}.") return {} def load_args(config_path: str) -> Optional[dict[str, Any]]: r"""Load the training configuration from config path.""" try: with open(config_path, encoding="utf-8") as f: return safe_load(f) except Exception: return None def save_args(config_path: str, config_dict: dict[str, Any]) -> None: r"""Save the training configuration to config path.""" with open(config_path, "w", encoding="utf-8") as f: safe_dump(config_dict, f) def _clean_cmd(args: dict[str, Any]) -> dict[str, Any]: r"""Remove args with NoneType or False or empty string value.""" no_skip_keys = ["packing"] return {k: v for k, v in args.items() if (k in no_skip_keys) or (v is not None and v is not False and v != "")} def gen_cmd(args: dict[str, Any]) -> str: r"""Generate CLI commands for previewing.""" cmd_lines = ["llamafactory-cli train "] for k, v in _clean_cmd(args).items(): if isinstance(v, dict): cmd_lines.append(f" --{k} {json.dumps(v, ensure_ascii=False)} ") elif isinstance(v, list): cmd_lines.append(f" --{k} {' '.join(map(str, v))} ") else: cmd_lines.append(f" --{k} {str(v)} ") if os.name == "nt": cmd_text = "`\n".join(cmd_lines) else: cmd_text = "\\\n".join(cmd_lines) cmd_text = f"```bash\n{cmd_text}\n```" return cmd_text def save_cmd(args: dict[str, Any]) -> str: r"""Save CLI commands to launch training.""" output_dir = args["output_dir"] os.makedirs(output_dir, exist_ok=True) with open(os.path.join(output_dir, TRAINING_ARGS), "w", encoding="utf-8") as f: safe_dump(_clean_cmd(args), f) return os.path.join(output_dir, TRAINING_ARGS) def load_eval_results(path: os.PathLike) -> str: r"""Get scores after evaluation.""" with open(path, encoding="utf-8") as f: result = json.dumps(json.load(f), indent=4) return f"```json\n{result}\n```\n" def create_ds_config() -> None: r"""Create deepspeed config in the current directory.""" os.makedirs(DEFAULT_CACHE_DIR, exist_ok=True) ds_config = { "train_batch_size": "auto", "train_micro_batch_size_per_gpu": "auto", "gradient_accumulation_steps": "auto", "gradient_clipping": "auto", "zero_allow_untested_optimizer": True, "fp16": { "enabled": "auto", "loss_scale": 0, "loss_scale_window": 1000, "initial_scale_power": 16, "hysteresis": 2, "min_loss_scale": 1, }, "bf16": {"enabled": "auto"}, } offload_config = { "device": "cpu", "pin_memory": True, } ds_config["zero_optimization"] = { "stage": 2, "allgather_partitions": True, "allgather_bucket_size": 5e8, "overlap_comm": True, "reduce_scatter": True, "reduce_bucket_size": 5e8, "contiguous_gradients": True, "round_robin_gradients": True, } with open(os.path.join(DEFAULT_CACHE_DIR, "ds_z2_config.json"), "w", encoding="utf-8") as f: json.dump(ds_config, f, indent=2) ds_config["zero_optimization"]["offload_optimizer"] = offload_config with open(os.path.join(DEFAULT_CACHE_DIR, "ds_z2_offload_config.json"), "w", encoding="utf-8") as f: json.dump(ds_config, f, indent=2) ds_config["zero_optimization"] = { "stage": 3, "overlap_comm": True, "contiguous_gradients": True, "sub_group_size": 1e9, "reduce_bucket_size": "auto", "stage3_prefetch_bucket_size": "auto", "stage3_param_persistence_threshold": "auto", "stage3_max_live_parameters": 1e9, "stage3_max_reuse_distance": 1e9, "stage3_gather_16bit_weights_on_model_save": True, } with open(os.path.join(DEFAULT_CACHE_DIR, "ds_z3_config.json"), "w", encoding="utf-8") as f: json.dump(ds_config, f, indent=2) ds_config["zero_optimization"]["offload_optimizer"] = offload_config ds_config["zero_optimization"]["offload_param"] = offload_config with open(os.path.join(DEFAULT_CACHE_DIR, "ds_z3_offload_config.json"), "w", encoding="utf-8") as f: json.dump(ds_config, f, indent=2)