# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from dataclasses import dataclass from threading import Lock from typing import Dict, Optional from nemo.utils.metaclasses import Singleton @dataclass() class ModelMetadataRegistry: guid: str gidx: int restoration_path: Optional[str] = None class AppState(metaclass=Singleton): def __init__(self): # method call lock self.__lock = Lock() # TODO: should we store global config in hydra_runner? self._app_cfg = None # World info self._device_id = None self._local_rank = None self._global_rank = None self._tensor_model_parallel_rank = None self._pipeline_model_parallel_rank = None self._data_parallel_rank = None self._world_size = None self._model_parallel_size = None self._tensor_model_parallel_size = None self._tensor_model_parallel_group = None self._pipeline_model_parallel_size = None self._virtual_pipeline_model_parallel_size = None self._pipeline_model_parallel_group = None self._pipeline_model_parallel_split_rank = None self._is_megatron_initialized = False self._data_parallel_size = None self._data_parallel_group = None self._megatron_checkpoint_version = None self._use_fp8 = False self._random_seed = None # Logging info self._log_dir = None self._exp_dir = None self._name = None self._checkpoint_name = None self._version = None self._create_checkpoint_callback = None self._checkpoint_callback_params = None # Save and Restore (.nemo) self._tmpdir_name = None self._is_model_being_restored = False self._nemo_file_folder = None self._model_restore_path = None self._all_model_restore_paths = [] self._model_guid_map = {} # type: Dict[str, ModelMetadataRegistry] @property def device_id(self): """ Property returns the device_id Returns: device_id """ return self._device_id @device_id.setter def device_id(self, id): """ Property sets the device_id. Args: size (int): The device id. """ self._device_id = id @property def world_size(self): """ Property returns the total number of GPUs. Returns: Total number of GPUs. """ return self._world_size @world_size.setter def world_size(self, size): """ Property sets the total number of GPUs. Args: size (int): Total number of GPUs. """ self._world_size = size @property def model_parallel_size(self): """ Property returns the number of GPUs in each model parallel group. Returns: Number of GPUs in each model parallel group. """ return self._model_parallel_size @model_parallel_size.setter def model_parallel_size(self, size): """ Property sets the number of GPUs in each model parallel group. Args: size (int): Number of GPUs in each model parallel group. """ self._model_parallel_size = size @property def tensor_model_parallel_size(self): """ Property returns the number of GPUs in each model parallel group. Returns: Number of GPUs in each model parallel group. """ return self._tensor_model_parallel_size @tensor_model_parallel_size.setter def tensor_model_parallel_size(self, size): """ Property sets the number of GPUs in each model parallel group. Args: size (int): Number of GPUs in each model parallel group. """ self._tensor_model_parallel_size = size @property def pipeline_model_parallel_size(self): """ Property returns the number of GPUs in each model parallel group. Returns: Number of GPUs in each model parallel group. """ return self._pipeline_model_parallel_size @pipeline_model_parallel_size.setter def pipeline_model_parallel_size(self, size): """ Property sets the number of GPUs in each model parallel group. Args: size (int): Number of GPUs in each model parallel group. """ self._pipeline_model_parallel_size = size @property def virtual_pipeline_model_parallel_size(self): """ Property returns the number of GPUs in each model parallel group. Returns: Number of GPUs in each model parallel group. """ return self._virtual_pipeline_model_parallel_size @virtual_pipeline_model_parallel_size.setter def virtual_pipeline_model_parallel_size(self, size): """ Property sets the size of the virtual pipeline parallel model. Args: size (int): Number of modules in each pipeline parallel model. """ self._virtual_pipeline_model_parallel_size = size @property def data_parallel_size(self): """ Property returns the number of GPUs in each data parallel group. Returns: Number of GPUs in each data parallel group. """ return self._data_parallel_size @data_parallel_size.setter def data_parallel_size(self, size): """ Property sets the number of GPUs in each data parallel group. Args: size (int): Number of GPUs in each data parallel group. """ self._data_parallel_size = size @property def local_rank(self): """ Property returns the local rank. Returns: Local rank. """ return self._local_rank @local_rank.setter def local_rank(self, rank): """ Property sets the local rank. Args: rank (int): Local rank. """ self._local_rank = rank @property def global_rank(self): """ Property returns the global rank. Returns: Global rank. """ return self._global_rank @global_rank.setter def global_rank(self, rank): """ Property sets the global rank. Args: rank (int): Global rank. """ self._global_rank = rank @property def tensor_model_parallel_rank(self): """ Property returns the tensor model parallel rank. Returns: Tensor model parallel rank. """ return self._tensor_model_parallel_rank @tensor_model_parallel_rank.setter def tensor_model_parallel_rank(self, rank): """ Property sets the tensor model parallel rank. Args: rank (int): Tensor model parallel rank. """ self._tensor_model_parallel_rank = rank @property def tensor_model_parallel_group(self): """ Property returns the tensor model parallel group. Returns: Tensor model parallel group. """ return self._tensor_model_parallel_group @tensor_model_parallel_group.setter def tensor_model_parallel_group(self, group): """ Property sets the tensor model parallel group. Args: group: Tensor model parallel group. """ self._tensor_model_parallel_group = group @property def pipeline_model_parallel_rank(self): """ Property returns the pipeline model parallel rank. Returns: Pipeline model parallel rank. """ return self._pipeline_model_parallel_rank @pipeline_model_parallel_rank.setter def pipeline_model_parallel_rank(self, rank): """ Property sets the pipeline model parallel rank. Args: rank (int): Pipeline model parallel rank. """ self._pipeline_model_parallel_rank = rank @property def virtual_pipeline_model_parallel_rank(self): """ Property returns the virtual pipeline parallel rank. Returns: Model parallel rank. """ return self._virtual_pipeline_model_parallel_rank @virtual_pipeline_model_parallel_rank.setter def virtual_pipeline_model_parallel_rank(self, rank): """ Property sets the virtual pipeline parallel rank. Args: rank (int): Virtual pipeline parallel rank. """ self._virtual_pipeline_model_parallel_rank = rank @property def pipeline_model_parallel_split_rank(self): """ Property returns the rank at which Encoder and Decoder are split into different pipelines for Megatrron Encoder-Decoder models. Returns: Pipeline model parallel split rank. """ return self._pipeline_model_parallel_split_rank @pipeline_model_parallel_split_rank.setter def pipeline_model_parallel_split_rank(self, rank): """ Property sets the rank at which Encoder and Decoder are split into different pipelines for Megatrron Encoder-Decoder models. Args: rank (int): Model parallel split rank. """ self._pipeline_model_parallel_split_rank = rank @property def pipeline_model_parallel_group(self): """ Property returns the pipeline model parallel group. Returns: Pipeline model parallel group. """ return self._pipeline_model_parallel_group @pipeline_model_parallel_group.setter def pipeline_model_parallel_group(self, group): """ Property sets the pipeline model parallel group. Args: group: Pipeline model parallel group. """ self._pipeline_model_parallel_group = group @property def data_parallel_rank(self): """ Property returns the data parallel rank. Returns: Data parallel rank. """ return self._data_parallel_rank @data_parallel_rank.setter def data_parallel_rank(self, rank): """ Property sets the data parallel rank. Args: rank (int): Data parallel rank. """ self._data_parallel_rank = rank @property def data_parallel_group(self): """ Property returns the data parallel group. Returns: Data parallel group. """ return self._data_parallel_group @data_parallel_group.setter def data_parallel_group(self, group): """ Property sets the data parallel group. Args: group: Data parallel group. """ self._data_parallel_group = group @property def use_fp8(self): """ Property returns the use of fp8 precision. Returns: Use of FP8. """ return self._use_fp8 @use_fp8.setter def use_fp8(self, use_fp8): """ Property sets the use of fp8 precision. Args: use_fp8: Use of FP8. """ self._use_fp8 = use_fp8 @property def random_seed(self): """ Property returns the random seed. Returns: Random seed. """ return self._random_seed @random_seed.setter def random_seed(self, seed): """ Property sets the random seed. Args: seed (int): Random seed. """ self._random_seed = seed @property def log_dir(self): """Returns the log_dir set by exp_manager. """ return self._log_dir @log_dir.setter def log_dir(self, dir): """Sets the log_dir property. Args: dir (str): Log_dir set by exp_manager. """ self._log_dir = dir @property def exp_dir(self): """Returns the exp_dir set by exp_manager. """ return self._exp_dir @exp_dir.setter def exp_dir(self, dir): """Sets the log_dir property. Args: dir (str): Log_dir set by exp_manager. """ self._exp_dir = dir @property def name(self): """Returns the name set by exp_manager. """ return self._name @name.setter def name(self, name): """Sets the name property. Args: dir (str): name set by exp_manager. """ self._name = name @property def checkpoint_name(self): """Returns the name set by exp_manager. """ return self._checkpoint_name @checkpoint_name.setter def checkpoint_name(self, name): """Sets the name property. Args: dir (str): name set by exp_manager. """ self._checkpoint_name = name @property def version(self): """Returns the version set by exp_manager. """ return self._version @version.setter def version(self, version): """Sets the version property. Args: dir (str): version set by exp_manager. """ self._version = version @property def create_checkpoint_callback(self): """Returns the create_checkpoint_callback set by exp_manager. """ return self._create_checkpoint_callback @create_checkpoint_callback.setter def create_checkpoint_callback(self, create_checkpoint_callback): """Sets the create_checkpoint_callback property. Args: dir (bool): create_checkpoint_callback set by exp_manager. """ self._create_checkpoint_callback = create_checkpoint_callback @property def checkpoint_callback_params(self): """Returns the version set by exp_manager. """ return self._checkpoint_callback_params @checkpoint_callback_params.setter def checkpoint_callback_params(self, params): """Sets the name property. Args: params (dict): checkpoint_callback_params set by exp_manager. """ self._checkpoint_callback_params = params @property def model_restore_path(self): restore_path = self._all_model_restore_paths[-1] if len(self._all_model_restore_paths) > 0 else None return restore_path @model_restore_path.setter def model_restore_path(self, path): with self.__lock: self._model_restore_path = path self._all_model_restore_paths.append(path) def register_model_guid(self, guid: str, restoration_path: Optional[str] = None): # Maps a guid to its restore path (None or last absolute path) with self.__lock: if guid in self._model_guid_map: idx = self._model_guid_map[guid].gidx else: idx = len(self._model_guid_map) self._model_guid_map[guid] = ModelMetadataRegistry(guid, idx, restoration_path=restoration_path) def reset_model_guid_registry(self): # Reset the guid mapping with self.__lock: self._model_guid_map.clear() def get_model_metadata_from_guid(self, guid) -> ModelMetadataRegistry: # Returns the global model idx and restoration path metadata = self._model_guid_map[guid] return metadata @property def is_model_being_restored(self) -> bool: return self._is_model_being_restored @is_model_being_restored.setter def is_model_being_restored(self, is_restored: bool): self._is_model_being_restored = is_restored @property def nemo_file_folder(self) -> str: return self._nemo_file_folder @nemo_file_folder.setter def nemo_file_folder(self, path: str): self._nemo_file_folder = path