|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import copy |
|
import hashlib |
|
import os |
|
import subprocess |
|
import sys |
|
import threading |
|
import time |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from typing import Any, Dict, Optional, Sequence |
|
|
|
import torch |
|
from hydra.core.config_store import ConfigStore |
|
from hydra.core.hydra_config import HydraConfig |
|
from hydra.core.plugins import Plugins |
|
from hydra.core.singleton import Singleton |
|
from hydra.core.utils import JobReturn, JobStatus, configure_log, filter_overrides, setup_globals |
|
from hydra.plugins.launcher import Launcher |
|
from hydra.types import HydraContext, TaskFunction |
|
from omegaconf import DictConfig, OmegaConf, open_dict |
|
|
|
from nemo.utils import logging |
|
|
|
|
|
|
|
def is_in_toplevel_plugins_module(*args, **kwargs) -> bool: |
|
return True |
|
|
|
|
|
|
|
Plugins.instance().is_in_toplevel_plugins_module = is_in_toplevel_plugins_module |
|
|
|
|
|
@dataclass |
|
class ProcessLauncherConfig: |
|
_target_: str = "nemo.core.utils.process_launcher.launcher.ProcessLauncher" |
|
num_gpus: int = -1 |
|
jobs_per_gpu: int = 1 |
|
|
|
|
|
def execute_job( |
|
idx: int, |
|
overrides: Sequence[str], |
|
hydra_context: HydraContext, |
|
config: DictConfig, |
|
singleton_state: Dict[Any, Any], |
|
gpu_idx: int, |
|
): |
|
""" |
|
Creates a process that launches a "single" job that is identical in config + updated with sweep hyperparams. |
|
Since a different process is being used, CUDA can work in non-ddp mode without issue. |
|
Attempting ddp when using this script will not work as ddp cannot be used in shared contexts. |
|
|
|
Args: |
|
idx: Global index of the job. |
|
overrides: List of str overrides that correspond to this job |
|
hydra_context: Hydra Context used to load the sweep params into the global config |
|
config: Global config that will be updated with sweep hyper parameters. |
|
singleton_state: Hydra state. |
|
gpu_idx: The GPU ID on which this process will be run. |
|
|
|
Returns: |
|
- The Process object that corresponds to this sweep |
|
- The JobReturn object holding some metadata about this run |
|
""" |
|
|
|
setup_globals() |
|
Singleton.set_state(singleton_state) |
|
|
|
|
|
sweep_config = hydra_context.config_loader.load_sweep_config(config, list(overrides)) |
|
with open_dict(sweep_config): |
|
sweep_config.hydra.job.id = "{}_{}".format(sweep_config.hydra.job.name, idx) |
|
sweep_config.hydra.job.num = idx |
|
HydraConfig.instance().set_config(sweep_config) |
|
|
|
|
|
script_path = os.path.join(os.getcwd(), sys.argv[0]) |
|
script_path = os.path.abspath(script_path) |
|
|
|
hash_salt = "|".join([script_path, str(OmegaConf.to_yaml(config))]).encode('utf-8') |
|
hash_val = hashlib.sha256(hash_salt).hexdigest() |
|
|
|
config_dir = os.path.join(os.getcwd(), "hydra_cfg", str(hash_val)) |
|
if not os.path.exists(config_dir): |
|
os.makedirs(config_dir, exist_ok=True) |
|
|
|
task_cfg = copy.deepcopy(sweep_config) |
|
|
|
|
|
|
|
with open_dict(task_cfg): |
|
task_cfg.pop('hydra', '') |
|
|
|
|
|
temp_config_name = f"config_{idx}.yaml" |
|
temp_config = os.path.join(config_dir, temp_config_name) |
|
OmegaConf.save(task_cfg, temp_config) |
|
|
|
|
|
overrides = OmegaConf.to_container(config.hydra.overrides.task) |
|
|
|
|
|
found_devices = False |
|
gpu_override = f'trainer.devices=[{gpu_idx}]' |
|
for oidx, val in enumerate(overrides): |
|
if 'trainer.devices' in val: |
|
overrides[oidx] = gpu_override |
|
found_devices = True |
|
|
|
if not found_devices: |
|
overrides.append(gpu_override) |
|
|
|
|
|
|
|
cmd = [ |
|
'python', |
|
script_path, |
|
"--config-path", |
|
config_dir, |
|
"--config-name", |
|
temp_config_name, |
|
*overrides, |
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
proc = subprocess.Popen(cmd, stderr=subprocess.PIPE) |
|
|
|
|
|
std_error_buffer = [] |
|
|
|
drainerthread = threading.Thread(target=std_error_buffer.extend, args=(proc.stderr,)) |
|
drainerthread.daemon = True |
|
drainerthread.start() |
|
|
|
|
|
res = JobReturn() |
|
res.cfg = task_cfg |
|
res.overrides = overrides |
|
res.hydra_cfg = config |
|
res.working_dir = os.getcwd() |
|
res.return_value = None |
|
|
|
return proc, res, (std_error_buffer, drainerthread) |
|
|
|
|
|
def launch(launcher, job_overrides: Sequence[Sequence[str]], initial_job_idx: int,) -> Sequence[JobReturn]: |
|
""" |
|
Args: |
|
launcher: Reference to the Launched subclass |
|
job_overrides: A List of List<String>, where each inner list is the arguments for one job run |
|
initial_job_idx: Initial job idx in batch |
|
|
|
Returns: |
|
A list of JobReturn objects. |
|
""" |
|
|
|
setup_globals() |
|
assert launcher.config is not None |
|
assert launcher.task_function is not None |
|
assert launcher.hydra_context is not None |
|
|
|
configure_log(launcher.config.hydra.hydra_logging, launcher.config.hydra.verbose) |
|
sweep_dir = Path(str(launcher.config.hydra.sweep.dir)) |
|
sweep_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
runner_cfg = launcher.runner |
|
|
|
logging.info( |
|
"ProcessLauncher({}) is launching {} jobs".format( |
|
",".join([f"{k}={v}" for k, v in runner_cfg.items()]), len(job_overrides), |
|
) |
|
) |
|
logging.info("Launching jobs, sweep output dir : {}".format(sweep_dir)) |
|
for idx, overrides in enumerate(job_overrides): |
|
logging.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides)))) |
|
|
|
|
|
singleton_state = Singleton.get_state() |
|
|
|
|
|
num_gpus = runner_cfg.get('num_gpus', -1) |
|
jobs_per_gpu = runner_cfg.get('jobs_per_gpu', 1) |
|
|
|
|
|
if num_gpus <= 0: |
|
if torch.cuda.is_available(): |
|
num_gpus = torch.cuda.device_count() |
|
else: |
|
raise ValueError(f"{launcher.__class__.__name__} only supports GPU operations.") |
|
|
|
|
|
overrides = list(job_overrides) |
|
num_overrides = len(overrides) |
|
|
|
job_idx = 0 |
|
batch_size = num_gpus * jobs_per_gpu |
|
gpu_idx = 0 |
|
|
|
ret = [] |
|
subprocess_list = [] |
|
results = [] |
|
|
|
|
|
std_error_buffers = [] |
|
std_error_threads = [] |
|
|
|
|
|
while job_idx < num_overrides: |
|
|
|
while len(subprocess_list) < batch_size: |
|
|
|
if job_idx >= num_overrides: |
|
break |
|
|
|
|
|
process, res, error_tup = execute_job( |
|
initial_job_idx + job_idx, |
|
overrides[job_idx], |
|
launcher.hydra_context, |
|
launcher.config, |
|
singleton_state, |
|
gpu_idx % num_gpus, |
|
) |
|
|
|
|
|
subprocess_list.append(process) |
|
results.append(res) |
|
|
|
|
|
std_error_buffers.append(error_tup[0]) |
|
std_error_threads.append(error_tup[1]) |
|
|
|
job_idx += 1 |
|
gpu_idx += 1 |
|
|
|
|
|
if len(subprocess_list) > 0: |
|
finished_processes = [0] * len(subprocess_list) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while sum(finished_processes) < len(subprocess_list): |
|
|
|
for proc_idx, proc in enumerate(subprocess_list): |
|
|
|
retcode = proc.poll() |
|
|
|
if retcode is not None: |
|
|
|
if finished_processes[proc_idx] == 0: |
|
logging.info(f"Processed job : {len(ret) + proc_idx} :: Ret code = {retcode}") |
|
|
|
finished_processes[proc_idx] = 1 |
|
|
|
|
|
proc.wait() |
|
std_error_threads[proc_idx].join() |
|
error_data = std_error_buffers[proc_idx] |
|
error_data = [ |
|
str(data, encoding='utf-8').encode('utf-8').decode('utf-8').encode('utf-8') |
|
for data in error_data |
|
] |
|
|
|
std_error_buffers[proc_idx] = error_data |
|
|
|
time.sleep(1.0) |
|
|
|
|
|
for proc_idx, (proc, res) in enumerate(zip(subprocess_list, results)): |
|
|
|
output, error = proc.communicate() |
|
|
|
|
|
if proc.returncode == 0: |
|
res.status = JobStatus.COMPLETED |
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
err_buffer = std_error_buffers[proc_idx] |
|
if isinstance(err_buffer, (list, tuple)): |
|
err_string = "" |
|
for err_line in err_buffer: |
|
err_string = ( |
|
err_string + f"{str(err_line, encoding='utf-8').encode('utf-8').decode('utf-8')}" |
|
) |
|
else: |
|
err_string = err_buffer |
|
|
|
error_msg = ( |
|
f"\nHyperparameter Arguments : {proc.args}\n" |
|
f"Process Return code : {proc.returncode}\n" |
|
f"Error Trace :\n" |
|
f"{err_string}" |
|
) |
|
res.return_value = Exception(error_msg) |
|
res.status = JobStatus.FAILED |
|
|
|
logging.info(f"Finished executing job : {len(ret)}. Return Code = {proc.returncode}") |
|
ret.append(res) |
|
|
|
|
|
subprocess_list.clear() |
|
results.clear() |
|
|
|
return ret |
|
|
|
|
|
class ProcessLauncher(Launcher): |
|
def __init__(self, **kwargs: Any) -> None: |
|
"""Process Launcher |
|
Based on the JoblibLauncher, but uses processes to scatter jobs in a multiplexed manner across |
|
some number of GPUs on a single machine. |
|
""" |
|
self.config: Optional[DictConfig] = None |
|
self.task_function: Optional[TaskFunction] = None |
|
self.hydra_context: Optional[HydraContext] = None |
|
|
|
self.runner = kwargs |
|
|
|
def setup(self, *, hydra_context: HydraContext, task_function: TaskFunction, config: DictConfig,) -> None: |
|
self.config = config |
|
self.task_function = task_function |
|
self.hydra_context = hydra_context |
|
|
|
def launch(self, job_overrides: Sequence[Sequence[str]], initial_job_idx: int) -> Sequence[JobReturn]: |
|
|
|
return launch(launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx) |
|
|
|
|
|
ConfigStore.instance().store( |
|
group="hydra/launcher", name="nemo_launcher", node=ProcessLauncherConfig, provider="nemo_process_launcher", |
|
) |
|
|
|
Plugins.instance().register(ProcessLauncher) |
|
|