camenduru's picture
thanks to NVIDIA ❤
7934b29
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import hashlib
import os
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Sequence
import torch
from hydra.core.config_store import ConfigStore
from hydra.core.hydra_config import HydraConfig
from hydra.core.plugins import Plugins
from hydra.core.singleton import Singleton
from hydra.core.utils import JobReturn, JobStatus, configure_log, filter_overrides, setup_globals
from hydra.plugins.launcher import Launcher
from hydra.types import HydraContext, TaskFunction
from omegaconf import DictConfig, OmegaConf, open_dict
from nemo.utils import logging
# monkey-patch hydra func
def is_in_toplevel_plugins_module(*args, **kwargs) -> bool:
return True
# Monkey-patch Hydra
Plugins.instance().is_in_toplevel_plugins_module = is_in_toplevel_plugins_module
@dataclass
class ProcessLauncherConfig:
_target_: str = "nemo.core.utils.process_launcher.launcher.ProcessLauncher"
num_gpus: int = -1
jobs_per_gpu: int = 1
def execute_job(
idx: int,
overrides: Sequence[str],
hydra_context: HydraContext,
config: DictConfig,
singleton_state: Dict[Any, Any],
gpu_idx: int,
):
"""
Creates a process that launches a "single" job that is identical in config + updated with sweep hyperparams.
Since a different process is being used, CUDA can work in non-ddp mode without issue.
Attempting ddp when using this script will not work as ddp cannot be used in shared contexts.
Args:
idx: Global index of the job.
overrides: List of str overrides that correspond to this job
hydra_context: Hydra Context used to load the sweep params into the global config
config: Global config that will be updated with sweep hyper parameters.
singleton_state: Hydra state.
gpu_idx: The GPU ID on which this process will be run.
Returns:
- The Process object that corresponds to this sweep
- The JobReturn object holding some metadata about this run
"""
# Required by Hydra (lookup other Hydra Launchers for details)
setup_globals()
Singleton.set_state(singleton_state)
# Update base config with overrides to create sweep config
sweep_config = hydra_context.config_loader.load_sweep_config(config, list(overrides))
with open_dict(sweep_config):
sweep_config.hydra.job.id = "{}_{}".format(sweep_config.hydra.job.name, idx)
sweep_config.hydra.job.num = idx
HydraConfig.instance().set_config(sweep_config)
# Setup a directory where the config will temporarily be stored.
script_path = os.path.join(os.getcwd(), sys.argv[0])
script_path = os.path.abspath(script_path)
hash_salt = "|".join([script_path, str(OmegaConf.to_yaml(config))]).encode('utf-8')
hash_val = hashlib.sha256(hash_salt).hexdigest()
config_dir = os.path.join(os.getcwd(), "hydra_cfg", str(hash_val))
if not os.path.exists(config_dir):
os.makedirs(config_dir, exist_ok=True)
task_cfg = copy.deepcopy(sweep_config)
# Remove hydra from sweep config
# This is done to prevent recursive call to multirun launcher in Hydra.
with open_dict(task_cfg):
task_cfg.pop('hydra', '')
# Save the current jobs config to directory
temp_config_name = f"config_{idx}.yaml"
temp_config = os.path.join(config_dir, temp_config_name)
OmegaConf.save(task_cfg, temp_config)
# Compute the overides as a dict
overrides = OmegaConf.to_container(config.hydra.overrides.task)
# Check and replace trainer.devices in config with gpu_idx
found_devices = False
gpu_override = f'trainer.devices=[{gpu_idx}]'
for oidx, val in enumerate(overrides):
if 'trainer.devices' in val:
overrides[oidx] = gpu_override
found_devices = True
if not found_devices:
overrides.append(gpu_override)
# Build launch command
# Note: We depend on PTL doing the right thing since this command has global visibility of all CUDA_VISIBLE_DEVICES
cmd = [
'python',
script_path,
"--config-path",
config_dir,
"--config-name",
temp_config_name,
*overrides,
]
# Launch the subprocess; pipe the stderr
# NOTE: If this hangs due to some reason after prolonged training, it means that the stderr pipe buffer
# has become full at the OS level and we need to explicitly empty it (either parallel thread or manually
# call proc.communicate(). It should not happen in general case as stderr is filled only in case retcode != 0
# If it does happen though, implement the code here
# https://stackoverflow.com/questions/39607172/python-subprocess-popen-poll-seems-to-hang-but-communicate-works
proc = subprocess.Popen(cmd, stderr=subprocess.PIPE)
# Setup data thread for stderr
std_error_buffer = []
# Trivial thread just reads lines from stdout into the list
drainerthread = threading.Thread(target=std_error_buffer.extend, args=(proc.stderr,))
drainerthread.daemon = True
drainerthread.start()
# Construct JobReturn object for Hydra
res = JobReturn()
res.cfg = task_cfg
res.overrides = overrides
res.hydra_cfg = config
res.working_dir = os.getcwd()
res.return_value = None
return proc, res, (std_error_buffer, drainerthread)
def launch(launcher, job_overrides: Sequence[Sequence[str]], initial_job_idx: int,) -> Sequence[JobReturn]:
"""
Args:
launcher: Reference to the Launched subclass
job_overrides: A List of List<String>, where each inner list is the arguments for one job run
initial_job_idx: Initial job idx in batch
Returns:
A list of JobReturn objects.
"""
# Needed for Hydra, lookup JoblibLauncher in Hydra
setup_globals()
assert launcher.config is not None
assert launcher.task_function is not None
assert launcher.hydra_context is not None
configure_log(launcher.config.hydra.hydra_logging, launcher.config.hydra.verbose)
sweep_dir = Path(str(launcher.config.hydra.sweep.dir))
sweep_dir.mkdir(parents=True, exist_ok=True)
# Extraact the runner's config (its actually a DictConfig, but type is used for autocomplete)
runner_cfg = launcher.runner # type: ProcessLauncherConfig
logging.info(
"ProcessLauncher({}) is launching {} jobs".format(
",".join([f"{k}={v}" for k, v in runner_cfg.items()]), len(job_overrides),
)
)
logging.info("Launching jobs, sweep output dir : {}".format(sweep_dir))
for idx, overrides in enumerate(job_overrides):
logging.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides))))
# Needed by Hydra
singleton_state = Singleton.get_state()
# Process the runner's config to build up the multiplex config
num_gpus = runner_cfg.get('num_gpus', -1)
jobs_per_gpu = runner_cfg.get('jobs_per_gpu', 1)
# Only GPUs are supported for now.
if num_gpus <= 0:
if torch.cuda.is_available():
num_gpus = torch.cuda.device_count()
else:
raise ValueError(f"{launcher.__class__.__name__} only supports GPU operations.")
# Setup arguments for multiplex runner
overrides = list(job_overrides)
num_overrides = len(overrides)
job_idx = 0
batch_size = num_gpus * jobs_per_gpu
gpu_idx = 0
ret = [] # List of returned JobResult
subprocess_list = [] # Buffer of subprocess
results = [] # Buffer of JobResult
# STD ERROR cache
std_error_buffers = [] # type: List[List[str]]
std_error_threads = [] # type: threading.Thread
# Run over all job combinations
while job_idx < num_overrides:
# Fill up subprocess buffer while its size is smaller than multiplex batch size
while len(subprocess_list) < batch_size:
# If we run out of jobs, stop trying to submit more jobs
if job_idx >= num_overrides:
break
# Submit a job as a new process
process, res, error_tup = execute_job(
initial_job_idx + job_idx,
overrides[job_idx],
launcher.hydra_context,
launcher.config,
singleton_state,
gpu_idx % num_gpus, # This will evenly distribute GPU load
)
# Store the subprocesses and JobResults
subprocess_list.append(process)
results.append(res)
# Manage stderror thread data
std_error_buffers.append(error_tup[0])
std_error_threads.append(error_tup[1])
job_idx += 1
gpu_idx += 1
# Poll for samples in batch to finish.
if len(subprocess_list) > 0:
finished_processes = [0] * len(subprocess_list)
# Check if all processes are completed or not
# This is busy waiting, this is actually quite necessary
# Turns out that when you do proc.communicate(), you block all other threads immediately.
# IE they may fill up their buffers entirely, and hang while they wait for the first thread
# who called communicate() to finish its work or crash.
# Effectively it entirely stops multiprocessing jobs or multiplexed runs.
# Must poll and busy wait to keep threads alive, along with drain the pipes with thread buffers.
while sum(finished_processes) < len(subprocess_list):
# Check all processes to make sure they have a retcode (doesnt matter yet if 0 or not)
for proc_idx, proc in enumerate(subprocess_list):
# poll() is cheaper op than communicate()
retcode = proc.poll()
if retcode is not None:
# Log that the process with some ID has finished
if finished_processes[proc_idx] == 0:
logging.info(f"Processed job : {len(ret) + proc_idx} :: Ret code = {retcode}")
finished_processes[proc_idx] = 1
# Join this thread and merge its stderror buffer
proc.wait()
std_error_threads[proc_idx].join()
error_data = std_error_buffers[proc_idx]
error_data = [
str(data, encoding='utf-8').encode('utf-8').decode('utf-8').encode('utf-8')
for data in error_data
]
std_error_buffers[proc_idx] = error_data
time.sleep(1.0)
# Process all the subprocess results
for proc_idx, (proc, res) in enumerate(zip(subprocess_list, results)):
# Wait until completion of process
output, error = proc.communicate()
# 0 is for successful run
if proc.returncode == 0:
res.status = JobStatus.COMPLETED
else:
# > 0 is for error, log the error.
# Note: For the sake of efficiency while we log the error and raise an exception,
# It will only raise the 1st wrong job in all the jobs.
# If multiple jobs fail, it will still try to execute every job first before
# raising the error for the first one.
# This is done so that even if some jobs fail (say OOM or something),
# other jobs can still run.
err_buffer = std_error_buffers[proc_idx]
if isinstance(err_buffer, (list, tuple)):
err_string = ""
for err_line in err_buffer:
err_string = (
err_string + f"{str(err_line, encoding='utf-8').encode('utf-8').decode('utf-8')}"
)
else:
err_string = err_buffer
error_msg = (
f"\nHyperparameter Arguments : {proc.args}\n"
f"Process Return code : {proc.returncode}\n"
f"Error Trace :\n"
f"{err_string}"
)
res.return_value = Exception(error_msg)
res.status = JobStatus.FAILED
logging.info(f"Finished executing job : {len(ret)}. Return Code = {proc.returncode}")
ret.append(res)
# Reset for next batch
subprocess_list.clear()
results.clear()
return ret
class ProcessLauncher(Launcher):
def __init__(self, **kwargs: Any) -> None:
"""Process Launcher
Based on the JoblibLauncher, but uses processes to scatter jobs in a multiplexed manner across
some number of GPUs on a single machine.
"""
self.config: Optional[DictConfig] = None
self.task_function: Optional[TaskFunction] = None
self.hydra_context: Optional[HydraContext] = None
self.runner = kwargs # type: ProcessLauncherConfig
def setup(self, *, hydra_context: HydraContext, task_function: TaskFunction, config: DictConfig,) -> None:
self.config = config
self.task_function = task_function
self.hydra_context = hydra_context
def launch(self, job_overrides: Sequence[Sequence[str]], initial_job_idx: int) -> Sequence[JobReturn]:
return launch(launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx)
ConfigStore.instance().store(
group="hydra/launcher", name="nemo_launcher", node=ProcessLauncherConfig, provider="nemo_process_launcher",
)
Plugins.instance().register(ProcessLauncher)