|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
import torch.multiprocessing as mp |
|
from apex.transformer import parallel_state |
|
from omegaconf import OmegaConf |
|
from omegaconf.omegaconf import open_dict |
|
from pytorch_lightning.trainer.trainer import Trainer |
|
|
|
from nemo.collections.nlp.models.language_modeling.megatron_t5_adapter_model import MegatronT5InfusedAdapterModel |
|
from nemo.collections.nlp.modules.common.megatron.megatron_init import fake_initialize_model_parallel |
|
from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy |
|
from nemo.core.config import hydra_runner |
|
from nemo.utils.app_state import AppState |
|
|
|
mp.set_start_method("spawn", force=True) |
|
|
|
""" |
|
This is the script to run an Adapter Tuned GPT Model for text generation. |
|
|
|
Usage: |
|
Assume the model has TP=1, PP=1 in the following use cases. |
|
a. run greedy inference using a base gpt nemo file, and an adapter nemo file: |
|
python megatron_gpt_ia3_eval.py \ |
|
gpt_model_file=PATH TO GPT MODEL NEMO FILE \ |
|
adapter_model_file=PATH TO ADAPTER MODEL NEMO FILE (generated by training script: ./megatron_gpt_ia3_tuning.py) \ |
|
data_paths=[PATH TO A JSONL FILE CONTAINING PROMPTS], \ |
|
pred_file_path=PATH TO OUTPUT FILE TO DUMP PREDICTIONS |
|
""" |
|
|
|
if not torch.cuda.is_available(): |
|
raise EnvironmentError("GPU is needed for the inference") |
|
|
|
|
|
@hydra_runner(config_path="conf", config_name="megatron_t5_ia3_inference") |
|
def main(cfg) -> None: |
|
|
|
|
|
trainer = Trainer(strategy=NLPDDPStrategy(), **cfg.trainer) |
|
|
|
app_state = AppState() |
|
if cfg.tensor_model_parallel_size > 1 or cfg.pipeline_model_parallel_size > 1: |
|
app_state.model_parallel_size = cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size |
|
( |
|
app_state.tensor_model_parallel_rank, |
|
app_state.pipeline_model_parallel_rank, |
|
app_state.model_parallel_size, |
|
app_state.data_parallel_size, |
|
app_state.pipeline_model_parallel_split_rank, |
|
app_state.virtual_pipeline_model_parallel_rank, |
|
) = fake_initialize_model_parallel( |
|
world_size=app_state.model_parallel_size, |
|
rank=trainer.global_rank, |
|
tensor_model_parallel_size_=cfg.tensor_model_parallel_size, |
|
pipeline_model_parallel_size_=cfg.pipeline_model_parallel_size, |
|
pipeline_model_parallel_split_rank_=cfg.pipeline_model_parallel_split_rank, |
|
) |
|
|
|
|
|
if cfg.get("adapter_model_file", None) is not None and cfg.get("language_model_path", None) is not None: |
|
|
|
ia3_tuning_cfg = MegatronT5InfusedAdapterModel.restore_from( |
|
cfg.adapter_model_file, trainer=trainer, return_config=True |
|
) |
|
with open_dict(ia3_tuning_cfg): |
|
ia3_tuning_cfg.language_model_path = cfg.language_model_path |
|
ia3_tuning_cfg.pretrained_language_model_path = cfg.language_model_path |
|
ia3_tuning_cfg.micro_batch_size = cfg.data.micro_batch_size |
|
ia3_tuning_cfg.global_batch_size = cfg.data.global_batch_size |
|
|
|
|
|
model = MegatronT5InfusedAdapterModel.restore_from( |
|
restore_path=cfg.adapter_model_file, trainer=trainer, override_config_path=ia3_tuning_cfg |
|
) |
|
|
|
|
|
else: |
|
raise NotImplementedError( |
|
"This script is meant for inference from an Infused Adapter Tuned T5 Model, config should contain an adapter_model_file and a language_model_path" |
|
) |
|
|
|
|
|
if parallel_state.is_unitialized(): |
|
|
|
def dummy(): |
|
return |
|
|
|
if trainer.strategy.launcher is not None: |
|
trainer.strategy.launcher.launch(dummy, trainer=trainer) |
|
trainer.strategy.setup_environment() |
|
|
|
model.freeze() |
|
|
|
|
|
try: |
|
model.model.language_model.encoder.activations_checkpoint_method = None |
|
except AttributeError: |
|
pass |
|
|
|
try: |
|
model.frozen_model.model.language_model.encoder.activations_checkpoint_method = None |
|
except AttributeError: |
|
pass |
|
|
|
test_ds, test_dl = model.build_virtual_prompt_dataset( |
|
dataset_paths=cfg.data.test_ds, |
|
batch_size=cfg.data.global_batch_size, |
|
for_train=False, |
|
drop_last=False, |
|
shuffle=False, |
|
num_workers=cfg.data.num_workers, |
|
pin_memory=True, |
|
) |
|
|
|
config = OmegaConf.to_container(cfg.inference) |
|
model.set_inference_config(config) |
|
response = trainer.predict(model, test_dl) |
|
print("***************************") |
|
if cfg.pred_file_path is not None: |
|
with open(cfg.pred_file_path, "w", encoding="utf-8") as f: |
|
for batch in response: |
|
for inp, pred in zip(batch['input_text'], batch['preds_text']): |
|
inp = ' '.join(inp.split('\n')) |
|
pred = ' '.join(pred.split('\n')) |
|
f.write(f'{inp} {pred}\n') |
|
print("predictions saved to {}".format(cfg.pred_file_path)) |
|
else: |
|
print(response) |
|
print("***************************") |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|