|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import filecmp |
|
import os |
|
import shutil |
|
import tempfile |
|
from typing import Dict, Optional, Set, Union |
|
|
|
import pytest |
|
import torch |
|
from huggingface_hub.hf_api import ModelFilter |
|
from omegaconf import DictConfig, OmegaConf, open_dict |
|
|
|
from nemo.collections.asr.models import EncDecCTCModel, EncDecCTCModelBPE |
|
from nemo.collections.nlp.models import PunctuationCapitalizationModel |
|
from nemo.core.classes import ModelPT |
|
from nemo.core.connectors import save_restore_connector |
|
from nemo.utils.app_state import AppState |
|
from nemo.utils.exceptions import NeMoBaseException |
|
|
|
|
|
def classpath(cls): |
|
return f'{cls.__module__}.{cls.__name__}' |
|
|
|
|
|
def get_dir_size(path='.'): |
|
total = 0 |
|
with os.scandir(path) as it: |
|
for entry in it: |
|
if entry.is_file(): |
|
total += entry.stat().st_size |
|
elif entry.is_dir(): |
|
total += get_dir_size(entry.path) |
|
return total |
|
|
|
|
|
def get_size(path='.'): |
|
if os.path.isfile(path): |
|
return os.path.getsize(path) |
|
elif os.path.isdir(path): |
|
return get_dir_size(path) |
|
|
|
|
|
def getattr2(object, attr): |
|
if not '.' in attr: |
|
return getattr(object, attr) |
|
else: |
|
arr = attr.split('.') |
|
return getattr2(getattr(object, arr[0]), '.'.join(arr[1:])) |
|
|
|
|
|
class MockModel(ModelPT): |
|
def __init__(self, cfg, trainer=None): |
|
super(MockModel, self).__init__(cfg=cfg, trainer=trainer) |
|
self.w = torch.nn.Linear(10, 1) |
|
|
|
if 'temp_file' in self.cfg and self.cfg.temp_file is not None: |
|
self.setup_data_from_file(self.cfg.temp_file) |
|
else: |
|
self.temp_file = None |
|
self.temp_data = None |
|
|
|
def setup_data_from_file(self, temp_file): |
|
""" |
|
Load data from temp_file to `self.temp_data` |
|
Allows to test changing resource after instantiation |
|
""" |
|
with open_dict(self.cfg): |
|
self.cfg.temp_file = temp_file |
|
self.temp_file = self.register_artifact('temp_file', self.cfg.temp_file) |
|
with open(self.temp_file, 'r', encoding='utf-8') as f: |
|
self.temp_data = f.readlines() |
|
|
|
def change_stub_number(self, new_number: int): |
|
""" |
|
Change stub number in config, useful for testing nested models, |
|
since child can mutate config independently |
|
""" |
|
self.cfg.stub_number = new_number |
|
|
|
def forward(self, x): |
|
y = self.w(x) |
|
return y, self.cfg.temp_file |
|
|
|
def setup_training_data(self, train_data_config: Union[DictConfig, Dict]): |
|
self._train_dl = None |
|
|
|
def setup_validation_data(self, val_data_config: Union[DictConfig, Dict]): |
|
self._validation_dl = None |
|
|
|
def setup_test_data(self, test_data_config: Union[DictConfig, Dict]): |
|
self._test_dl = None |
|
|
|
@classmethod |
|
def list_available_models(cls): |
|
return [] |
|
|
|
|
|
class MockModelWithChildren(MockModel): |
|
""" |
|
Mock Model, can contain 2 children (other NeMo models) |
|
""" |
|
|
|
def __init__(self, cfg, trainer=None): |
|
super().__init__(cfg=cfg, trainer=trainer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.child1_model: Optional[MockModel] |
|
if cfg.get("child1_model") is not None: |
|
self.register_nemo_submodule( |
|
"child1_model", config_field="child1_model", model=MockModel(self.cfg.child1_model), |
|
) |
|
elif cfg.get("child1_model_path") is not None: |
|
self.register_nemo_submodule( |
|
"child1_model", config_field="child1_model", model=MockModel.restore_from(self.cfg.child1_model_path), |
|
) |
|
else: |
|
self.child1_model = None |
|
|
|
|
|
|
|
self.child2_model: Optional[MockModelWithChildren] |
|
if cfg.get("child2_model") is not None: |
|
self.register_nemo_submodule( |
|
"child2_model", config_field="child2_model", model=MockModelWithChildren(self.cfg.child2_model), |
|
) |
|
elif cfg.get("child2_model_path") is not None: |
|
self.register_nemo_submodule( |
|
"child2_model", |
|
config_field="child2_model", |
|
model=MockModelWithChildren.restore_from(self.cfg.child2_model_path), |
|
) |
|
else: |
|
self.child2_model = None |
|
|
|
|
|
class MockModelWithChildEncDecCTCBPE(MockModel): |
|
""" |
|
Mock Model, will contain EncDecCTC model as a child |
|
Useful for testing nested models with children initialized from pretrained NeMo models |
|
""" |
|
|
|
def __init__(self, cfg, trainer=None): |
|
super().__init__(cfg=cfg, trainer=trainer) |
|
|
|
|
|
|
|
|
|
|
|
self.ctc_model: EncDecCTCModelBPE |
|
|
|
if cfg.get("ctc_model", None) is not None: |
|
self.register_nemo_submodule( |
|
"ctc_model", config_field="ctc_model", model=EncDecCTCModelBPE(self.cfg.ctc_model), |
|
) |
|
else: |
|
|
|
assert cfg.get("ctc_model_pretrained", None) is not None |
|
self.register_nemo_submodule( |
|
"ctc_model", |
|
config_field="ctc_model", |
|
model=EncDecCTCModelBPE.from_pretrained(self.cfg.ctc_model_pretrained), |
|
) |
|
|
|
|
|
class MockModelWithChildCustomConfigPath(MockModel): |
|
""" |
|
Mock Model, can contain 1 child |
|
Path in config is not equal to name of the attribute |
|
Config is stored in `child1_model_config` |
|
Child model is stored in `child1_model` attribute |
|
NB: This is not recommended if it's not necessary. But here we test that it works. |
|
""" |
|
|
|
def __init__(self, cfg, trainer=None): |
|
super().__init__(cfg=cfg, trainer=trainer) |
|
|
|
self.child1_model: Optional[MockModel] |
|
if cfg.get("child1_model_config") is not None: |
|
self.register_nemo_submodule( |
|
"child1_model", config_field="child1_model_config", model=MockModel(self.cfg.child1_model_config), |
|
) |
|
else: |
|
self.child1_model = None |
|
|
|
|
|
class MockModelIncorrectWithNemoArtifact(MockModel): |
|
""" |
|
Incorrect model that tries to use .nemo model checkpoint as an artifact |
|
Expected to fail, since it is not supported |
|
""" |
|
|
|
def __init__(self, cfg, trainer=None): |
|
super().__init__(cfg=cfg, trainer=trainer) |
|
|
|
assert cfg.get("child_model_path") is not None |
|
|
|
child_model_path = self.register_artifact("child_model_path", cfg.child_model_path) |
|
self.child_model = ModelPT.restore_from(child_model_path) |
|
|
|
|
|
def _mock_model_config(): |
|
conf = {'temp_file': None, 'target': classpath(MockModel), 'stub_number': 1} |
|
conf = OmegaConf.create({'model': conf}) |
|
OmegaConf.set_struct(conf, True) |
|
return conf |
|
|
|
|
|
def _mock_model_with_children_config( |
|
child1_model_path: Optional[str] = None, |
|
child2_model_path: Optional[str] = None, |
|
child2_model_cfg: Optional[DictConfig] = None, |
|
) -> DictConfig: |
|
""" |
|
Child 1 always constructed from .nemo model checkpoint (optional) |
|
Child 2 can be constructed directly from subconfig (optional) or from .nemo model checkpoint (optional) |
|
""" |
|
conf = { |
|
'temp_file': None, |
|
'target': classpath(MockModelWithChildren), |
|
'child1_model': None, |
|
'child1_model_path': child1_model_path, |
|
'child2_model': child2_model_cfg, |
|
'child2_model_path': child2_model_path, |
|
'stub_number': 1, |
|
} |
|
conf = OmegaConf.create({'model': conf}) |
|
OmegaConf.set_struct(conf, True) |
|
return conf |
|
|
|
|
|
def _mock_model_with_child_encdecctcbpe_config(pretrained_model_name: str) -> DictConfig: |
|
conf = {'temp_file': None, 'ctc_model_pretrained': pretrained_model_name, 'stub_number': 1} |
|
conf = OmegaConf.create({'model': conf}) |
|
OmegaConf.set_struct(conf, True) |
|
return conf |
|
|
|
|
|
def _mock_model_with_child_custom_config_path_config(): |
|
conf = { |
|
'temp_file': None, |
|
'child1_model_config': _mock_model_config().model, |
|
'target': classpath(MockModelWithChildCustomConfigPath), |
|
'stub_number': 1, |
|
} |
|
conf = OmegaConf.create({'model': conf}) |
|
OmegaConf.set_struct(conf, True) |
|
return conf |
|
|
|
|
|
def _mock_model_incorrect_with_nemo_artifact_config(child_model_path: str): |
|
conf = {'temp_file': None, 'child_model_path': child_model_path, 'stub_number': 1} |
|
conf = OmegaConf.create({'model': conf}) |
|
OmegaConf.set_struct(conf, True) |
|
return conf |
|
|
|
|
|
class TestSaveRestore: |
|
def __test_restore_elsewhere( |
|
self, |
|
model: ModelPT, |
|
attr_for_eq_check: Set[str] = None, |
|
override_config_path: Optional[Union[str, DictConfig]] = None, |
|
map_location: Optional[Union[torch.device, str]] = None, |
|
strict: bool = False, |
|
return_config: bool = False, |
|
): |
|
"""Test's logic: |
|
1. Save model into temporary folder (save_folder) |
|
2. Copy .nemo file from save_folder to restore_folder |
|
3. Delete save_folder |
|
4. Attempt to restore from .nemo file in restore_folder and compare to original instance |
|
""" |
|
|
|
with tempfile.TemporaryDirectory() as restore_folder: |
|
with tempfile.TemporaryDirectory() as save_folder: |
|
save_folder_path = save_folder |
|
|
|
model_save_path = os.path.join(save_folder, f"{model.__class__.__name__}.nemo") |
|
model.save_to(save_path=model_save_path) |
|
|
|
model_restore_path = os.path.join(restore_folder, f"{model.__class__.__name__}.nemo") |
|
shutil.copy(model_save_path, model_restore_path) |
|
|
|
assert save_folder_path is not None and not os.path.exists(save_folder_path) |
|
assert not os.path.exists(model_save_path) |
|
assert os.path.exists(model_restore_path) |
|
|
|
model_copy = model.__class__.restore_from( |
|
restore_path=model_restore_path, |
|
map_location=map_location, |
|
strict=strict, |
|
return_config=return_config, |
|
override_config_path=override_config_path, |
|
) |
|
|
|
if return_config: |
|
return model_copy |
|
|
|
assert model.num_weights == model_copy.num_weights |
|
if attr_for_eq_check is not None and len(attr_for_eq_check) > 0: |
|
for attr in attr_for_eq_check: |
|
assert getattr2(model, attr) == getattr2(model_copy, attr) |
|
|
|
return model_copy |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_EncDecCTCModel(self): |
|
|
|
qn = EncDecCTCModel.from_pretrained(model_name="QuartzNet15x5Base-En") |
|
self.__test_restore_elsewhere(model=qn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_EncDecCTCModelBPE(self): |
|
|
|
cn = EncDecCTCModelBPE.from_pretrained(model_name="stt_en_citrinet_256") |
|
self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_EncDecCTCModelBPE_v2(self): |
|
|
|
cn = EncDecCTCModelBPE.from_pretrained(model_name="stt_en_conformer_ctc_small") |
|
self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_EncDecCTCModelBPE_v3(self): |
|
|
|
cn = EncDecCTCModelBPE.from_pretrained(model_name="stt_en_squeezeformer_ctc_xsmall_ls") |
|
self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_EncDecCTCModelBPE_HF(self): |
|
|
|
|
|
cn = ModelPT.from_pretrained(model_name="nvidia/stt_en_citrinet_256_ls") |
|
self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_PunctuationCapitalization(self): |
|
|
|
pn = PunctuationCapitalizationModel.from_pretrained(model_name='punctuation_en_distilbert') |
|
self.__test_restore_elsewhere( |
|
model=pn, attr_for_eq_check=set(["punct_classifier.log_softmax", "punct_classifier.log_softmax"]) |
|
) |
|
|
|
@pytest.mark.unit |
|
def test_mock_save_to_restore_from(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
|
|
|
|
model_copy = self.__test_restore_elsewhere(model, map_location='cpu') |
|
|
|
|
|
diff = model.w.weight - model_copy.w.weight |
|
|
|
assert os.path.basename(model_copy.temp_file).endswith(os.path.basename(model.temp_file)) |
|
assert diff.mean() <= 1e-9 |
|
|
|
assert model_copy.temp_data == ["*****\n"] |
|
|
|
@pytest.mark.unit |
|
def test_mock_restore_from_config_only(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = os.path.abspath(empty_file.name) |
|
|
|
|
|
with open_dict(cfg.model): |
|
cfg.model.xyz = "abc" |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
model_copy = self.__test_restore_elsewhere(model, map_location='cpu', return_config=False) |
|
|
|
assert os.path.basename(model_copy.temp_file).endswith(os.path.basename(model.temp_file)) |
|
|
|
assert model.cfg.xyz == model_copy.cfg.xyz |
|
|
|
@pytest.mark.unit |
|
def test_mock_restore_from_config_override_with_OmegaConf(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
|
|
|
|
with open_dict(cfg.model): |
|
cfg.model.xyz = "abc" |
|
|
|
|
|
model_copy = self.__test_restore_elsewhere(model, map_location='cpu', override_config_path=cfg) |
|
|
|
|
|
diff = model.w.weight - model_copy.w.weight |
|
assert diff.mean() <= 1e-9 |
|
assert model_copy.temp_data == ["*****\n"] |
|
|
|
|
|
assert model_copy.cfg.xyz == "abc" |
|
|
|
@pytest.mark.unit |
|
def test_mock_restore_from_config_override_with_yaml(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as config_file: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
|
|
|
|
with open_dict(cfg.model): |
|
cfg.model.xyz = "abc" |
|
|
|
|
|
OmegaConf.save(cfg, config_file) |
|
|
|
|
|
model_copy = self.__test_restore_elsewhere( |
|
model, map_location='cpu', override_config_path=config_file.name |
|
) |
|
|
|
|
|
diff = model.w.weight - model_copy.w.weight |
|
assert diff.mean() <= 1e-9 |
|
assert filecmp.cmp(model.temp_file, model_copy.temp_file) |
|
assert model_copy.temp_data == ["*****\n"] |
|
|
|
|
|
assert model_copy.cfg.xyz == "abc" |
|
|
|
@pytest.mark.unit |
|
def test_mock_save_to_restore_from_with_target_class(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
|
|
|
|
with tempfile.TemporaryDirectory() as save_folder: |
|
save_path = os.path.join(save_folder, "temp.nemo") |
|
model.save_to(save_path) |
|
|
|
|
|
|
|
model_copy = ModelPT.restore_from(save_path, map_location='cpu') |
|
|
|
assert os.path.basename(model_copy.temp_file).endswith(os.path.basename(model.temp_file)) |
|
|
|
|
|
diff = model.w.weight - model_copy.w.weight |
|
assert diff.mean() <= 1e-9 |
|
assert isinstance(model_copy, MockModel) |
|
assert model_copy.temp_data == ["*****\n"] |
|
|
|
@pytest.mark.unit |
|
def test_mock_save_to_restore_from_multiple_models(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as empty_file2: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
empty_file2.writelines(["+++++\n"]) |
|
empty_file2.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
cfg2 = _mock_model_config() |
|
cfg2.model.temp_file = empty_file2.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
model2 = MockModel(cfg=cfg2.model, trainer=None) |
|
model2 = model2.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
assert model2.temp_file == empty_file2.name |
|
|
|
|
|
model_copy = self.__test_restore_elsewhere(model, map_location='cpu') |
|
model2_copy = self.__test_restore_elsewhere(model2, map_location='cpu') |
|
|
|
|
|
assert model_copy.temp_data == ["*****\n"] |
|
assert model2_copy.temp_data == ["+++++\n"] |
|
|
|
@pytest.mark.unit |
|
def test_mock_save_to_restore_from_multiple_models_inverted_order(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as empty_file2: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
empty_file2.writelines(["+++++\n"]) |
|
empty_file2.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
cfg2 = _mock_model_config() |
|
cfg2.model.temp_file = empty_file2.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
model2 = MockModel(cfg=cfg2.model, trainer=None) |
|
model2 = model2.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
assert model2.temp_file == empty_file2.name |
|
|
|
|
|
model2_copy = self.__test_restore_elsewhere(model2, map_location='cpu') |
|
model_copy = self.__test_restore_elsewhere(model, map_location='cpu') |
|
|
|
|
|
assert model_copy.temp_data == ["*****\n"] |
|
assert model2_copy.temp_data == ["+++++\n"] |
|
|
|
@pytest.mark.unit |
|
def test_mock_save_to_restore_chained(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as empty_file2: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
|
|
def save_copy(model, save_folder, restore_folder): |
|
|
|
model_save_path = os.path.join(save_folder, f"{model.__class__.__name__}.nemo") |
|
model.save_to(save_path=model_save_path) |
|
|
|
model_restore_path = os.path.join(restore_folder, f"{model.__class__.__name__}.nemo") |
|
shutil.copy(model_save_path, model_restore_path) |
|
return model_restore_path |
|
|
|
|
|
with tempfile.TemporaryDirectory() as level4: |
|
with tempfile.TemporaryDirectory() as level3: |
|
with tempfile.TemporaryDirectory() as level2: |
|
with tempfile.TemporaryDirectory() as level1: |
|
path = save_copy(model, level1, level2) |
|
model_copy2 = model.__class__.restore_from(path) |
|
path = save_copy(model_copy2, level2, level3) |
|
model_copy3 = model.__class__.restore_from(path) |
|
path = save_copy(model_copy3, level3, level4) |
|
model_copy = model.__class__.restore_from(path) |
|
|
|
|
|
assert model_copy.temp_data == ["*****\n"] |
|
|
|
|
|
appstate = AppState() |
|
metadata = appstate.get_model_metadata_from_guid(model_copy.model_guid) |
|
assert metadata.guid != model.model_guid |
|
assert metadata.restoration_path == path |
|
|
|
@pytest.mark.unit |
|
def test_mock_save_to_multiple_times(self): |
|
with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.TemporaryDirectory() as tmpdir: |
|
|
|
empty_file.writelines(["*****\n"]) |
|
empty_file.flush() |
|
|
|
|
|
cfg = _mock_model_config() |
|
cfg.model.temp_file = empty_file.name |
|
|
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
assert model.temp_file == empty_file.name |
|
|
|
|
|
model.save_to(os.path.join(tmpdir, 'save_0.nemo')) |
|
model.save_to(os.path.join(tmpdir, 'save_1.nemo')) |
|
model.save_to(os.path.join(tmpdir, 'save_2.nemo')) |
|
|
|
@pytest.mark.unit |
|
def test_multiple_model_save_restore_connector(self): |
|
class MySaveRestoreConnector(save_restore_connector.SaveRestoreConnector): |
|
def save_to(self, model, save_path: str): |
|
save_path = save_path.replace(".nemo", "_XYZ.nemo") |
|
super(MySaveRestoreConnector, self).save_to(model, save_path) |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
|
cfg = _mock_model_config() |
|
|
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model_with_custom_connector = MockModel(cfg=cfg.model, trainer=None) |
|
model_with_custom_connector._save_restore_connector = MySaveRestoreConnector() |
|
model_with_custom_connector.save_to(os.path.join(tmpdir, 'save_custom.nemo')) |
|
|
|
assert os.path.exists(os.path.join(tmpdir, 'save_custom_XYZ.nemo')) |
|
assert isinstance(model._save_restore_connector, save_restore_connector.SaveRestoreConnector) |
|
assert isinstance(model_with_custom_connector._save_restore_connector, MySaveRestoreConnector) |
|
|
|
assert type(MockModel._save_restore_connector) == save_restore_connector.SaveRestoreConnector |
|
|
|
@pytest.mark.unit |
|
def test_restore_from_save_restore_connector(self): |
|
class MySaveRestoreConnector(save_restore_connector.SaveRestoreConnector): |
|
def save_to(self, model, save_path: str): |
|
save_path = save_path.replace(".nemo", "_XYZ.nemo") |
|
super().save_to(model, save_path) |
|
|
|
class MockModelV2(MockModel): |
|
pass |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
|
cfg = _mock_model_config() |
|
|
|
|
|
save_path = os.path.join(tmpdir, 'save_custom.nemo') |
|
model_with_custom_connector = MockModel(cfg=cfg.model, trainer=None) |
|
model_with_custom_connector._save_restore_connector = MySaveRestoreConnector() |
|
model_with_custom_connector.save_to(save_path) |
|
|
|
assert os.path.exists(os.path.join(tmpdir, 'save_custom_XYZ.nemo')) |
|
|
|
restored_model = MockModelV2.restore_from( |
|
save_path.replace(".nemo", "_XYZ.nemo"), save_restore_connector=MySaveRestoreConnector() |
|
) |
|
assert type(restored_model) == MockModelV2 |
|
assert type(restored_model._save_restore_connector) == MySaveRestoreConnector |
|
|
|
@pytest.mark.unit |
|
def test_mock_model_model_collision(self): |
|
|
|
cfg = _mock_model_config() |
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
|
|
cfg = _mock_model_config() |
|
OmegaConf.set_struct(cfg, False) |
|
cfg.model.model = 'aaa' |
|
OmegaConf.set_struct(cfg, True) |
|
|
|
|
|
with pytest.raises(ValueError, match="Creating model config node is forbidden"): |
|
model = MockModel(cfg=cfg.model, trainer=None) |
|
model = model.to('cpu') |
|
|
|
@pytest.mark.unit |
|
@pytest.mark.parametrize("change_child_number", [False, True]) |
|
@pytest.mark.parametrize("child2_model_from_path", [False, True]) |
|
def test_mock_model_nested(self, change_child_number: bool, child2_model_from_path: bool): |
|
""" |
|
Test model with 2 children |
|
Model and each child can be saved/restored separately |
|
Model is constructed using saved child models (.nemo checkpoints) |
|
|
|
Args: |
|
change_child_number: if change_child_number is True, child model changes its config |
|
without notifying parent model, and saved parent model should handle this correctly. |
|
child2_model_from_path: if child2_model_from_path is True, child2 model is restored from .nemo checkpoint, |
|
otherwise constructed directly from config. Child1 model always loaded from checkpoint. |
|
""" |
|
|
|
cfg_child1 = _mock_model_config() |
|
cfg_child2 = _mock_model_with_children_config() |
|
|
|
|
|
child1 = MockModel(cfg=cfg_child1.model, trainer=None) |
|
child1 = child1.to('cpu') |
|
with tempfile.TemporaryDirectory() as tmpdir_parent: |
|
parent_path = os.path.join(tmpdir_parent, "parent.nemo") |
|
with tempfile.TemporaryDirectory() as tmpdir_child: |
|
|
|
child1_path = os.path.join(tmpdir_child, 'child1.nemo') |
|
child1.save_to(child1_path) |
|
if child2_model_from_path: |
|
child2 = MockModelWithChildren(cfg=cfg_child2.model, trainer=None) |
|
child2 = child2.to('cpu') |
|
child2_path = os.path.join(tmpdir_child, 'child2.nemo') |
|
child2.save_to(child2_path) |
|
|
|
|
|
cfg_parent = _mock_model_with_children_config( |
|
child1_model_path=child1_path, child2_model_path=child2_path |
|
) |
|
else: |
|
|
|
cfg_parent = _mock_model_with_children_config( |
|
child1_model_path=child1_path, child2_model_path=None, child2_model_cfg=cfg_child2.get("model") |
|
) |
|
|
|
parent = MockModelWithChildren(cfg_parent.model) |
|
if change_child_number: |
|
parent.child2_model.change_stub_number(10) |
|
parent.save_to(parent_path) |
|
|
|
parent = ModelPT.restore_from(parent_path) |
|
|
|
_ = self.__test_restore_elsewhere(parent.child1_model, map_location='cpu') |
|
child2 = self.__test_restore_elsewhere(parent.child2_model, map_location='cpu') |
|
if change_child_number: |
|
assert child2.cfg.stub_number == 10 |
|
|
|
|
|
parent = self.__test_restore_elsewhere(parent, map_location='cpu') |
|
if change_child_number: |
|
assert parent.child2_model.cfg.stub_number == 10 |
|
|
|
@pytest.mark.unit |
|
@pytest.mark.parametrize("change_child_resource", [False, True]) |
|
@pytest.mark.parametrize("child2_model_from_path", [False, True]) |
|
def test_mock_model_nested_with_resources(self, change_child_resource: bool, child2_model_from_path: bool): |
|
""" |
|
Test nested model with 2 children: model and each child can be saved/restored separately |
|
child models and parent model itself contain resources |
|
|
|
Args: |
|
change_child_resource: if change_child_resource is True, |
|
child model resources are changed after instantiation parent model. |
|
child2_model_from_path: if child2_model_from_path is True, child2 model is restored from .nemo checkpoint, |
|
otherwise constructed directly from config. Child1 model always loaded from checkpoint. |
|
""" |
|
with tempfile.NamedTemporaryFile('w') as file_child1, tempfile.NamedTemporaryFile( |
|
'w' |
|
) as file_child2, tempfile.NamedTemporaryFile('w') as file_child2_other, tempfile.NamedTemporaryFile( |
|
'w' |
|
) as file_parent: |
|
|
|
parent_data = ["*****\n"] |
|
child1_data = ["+++++\n"] |
|
child2_data = ["-----\n"] |
|
child2_data_other = [".....\n"] |
|
file_parent.writelines(parent_data) |
|
file_parent.flush() |
|
file_child1.writelines(child1_data) |
|
file_child1.flush() |
|
file_child2.writelines(child2_data) |
|
file_child2.flush() |
|
file_child2_other.writelines(child2_data_other) |
|
file_child2_other.flush() |
|
|
|
|
|
|
|
cfg_child1 = _mock_model_config() |
|
cfg_child1.model.temp_file = file_child1.name |
|
cfg_child2 = _mock_model_with_children_config() |
|
cfg_child2.model.temp_file = file_child2.name |
|
|
|
child1 = MockModel(cfg=cfg_child1.model, trainer=None) |
|
child1 = child1.to('cpu') |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir_parent: |
|
parent_path = os.path.join(tmpdir_parent, "parent.nemo") |
|
with tempfile.TemporaryDirectory() as tmpdir_child: |
|
|
|
child1_path = os.path.join(tmpdir_child, 'child1.nemo') |
|
child1.save_to(child1_path) |
|
if child2_model_from_path: |
|
child2 = MockModelWithChildren(cfg=cfg_child2.model, trainer=None) |
|
child2 = child2.to('cpu') |
|
child2_path = os.path.join(tmpdir_child, 'child2.nemo') |
|
child2.save_to(child2_path) |
|
|
|
|
|
cfg_parent = _mock_model_with_children_config( |
|
child1_model_path=child1_path, child2_model_path=child2_path |
|
) |
|
else: |
|
|
|
cfg_parent = _mock_model_with_children_config( |
|
child1_model_path=child1_path, |
|
child2_model_path=None, |
|
child2_model_cfg=cfg_child2.get("model"), |
|
) |
|
|
|
cfg_parent.model.temp_file = file_parent.name |
|
parent = MockModelWithChildren(cfg_parent.model) |
|
if change_child_resource: |
|
parent.child2_model.setup_data_from_file(file_child2_other.name) |
|
parent.save_to(parent_path) |
|
|
|
|
|
parent = ModelPT.restore_from(parent_path) |
|
|
|
child1 = self.__test_restore_elsewhere(parent.child1_model, map_location='cpu') |
|
child2 = self.__test_restore_elsewhere(parent.child2_model, map_location='cpu') |
|
|
|
parent = self.__test_restore_elsewhere(parent, map_location='cpu') |
|
|
|
|
|
|
|
assert child1.temp_data == child1_data |
|
if change_child_resource: |
|
assert child2.temp_data == child2_data_other |
|
else: |
|
assert child2.temp_data == child2_data |
|
|
|
assert parent.temp_data == parent_data |
|
assert parent.child1_model.temp_data == child1_data |
|
if change_child_resource: |
|
assert parent.child2_model.temp_data == child2_data_other |
|
else: |
|
assert parent.child2_model.temp_data == child2_data |
|
|
|
@pytest.mark.unit |
|
def test_mock_model_nested_with_resources_multiple_passes(self): |
|
""" |
|
Test nested model with 2 children: multiple save-restore passes |
|
child models and parent model itself contain resources |
|
""" |
|
with tempfile.NamedTemporaryFile('w') as file_child1, tempfile.NamedTemporaryFile( |
|
'w' |
|
) as file_child2, tempfile.NamedTemporaryFile('w') as file_child2_other, tempfile.NamedTemporaryFile( |
|
'w' |
|
) as file_parent: |
|
|
|
parent_data = ["*****\n"] |
|
child1_data = ["+++++\n"] |
|
child2_data = ["-----\n"] |
|
child2_data_other = [".....\n"] |
|
file_parent.writelines(parent_data) |
|
file_parent.flush() |
|
file_child1.writelines(child1_data) |
|
file_child1.flush() |
|
file_child2.writelines(child2_data) |
|
file_child2.flush() |
|
file_child2_other.writelines(child2_data_other) |
|
file_child2_other.flush() |
|
|
|
|
|
|
|
cfg_child1 = _mock_model_config() |
|
cfg_child1.model.temp_file = file_child1.name |
|
cfg_child2 = _mock_model_with_children_config() |
|
cfg_child2.model.temp_file = file_child2.name |
|
|
|
child1 = MockModel(cfg=cfg_child1.model, trainer=None) |
|
child1 = child1.to('cpu') |
|
child2 = MockModelWithChildren(cfg=cfg_child2.model, trainer=None) |
|
child2 = child2.to('cpu') |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir_parent1, tempfile.TemporaryDirectory() as tmpdir_parent2, tempfile.TemporaryDirectory() as tmpdir_parent3, tempfile.TemporaryDirectory() as tmpdir_parent4: |
|
parent_path1 = os.path.join(tmpdir_parent1, "parent.nemo") |
|
parent_path2 = os.path.join(tmpdir_parent2, "parent.nemo") |
|
with tempfile.TemporaryDirectory() as tmpdir_child: |
|
|
|
child1_path = os.path.join(tmpdir_child, 'child1.nemo') |
|
child1.save_to(child1_path) |
|
child2_path = os.path.join(tmpdir_child, 'child2.nemo') |
|
child2.save_to(child2_path) |
|
|
|
|
|
cfg_parent = _mock_model_with_children_config( |
|
child1_model_path=child1_path, child2_model_path=child2_path |
|
) |
|
cfg_parent.model.temp_file = file_parent.name |
|
parent = MockModelWithChildren(cfg_parent.model) |
|
|
|
|
|
|
|
parent.save_to(parent_path1) |
|
parent.save_to(parent_path2) |
|
|
|
|
|
parent1 = ModelPT.restore_from(parent_path1) |
|
parent2 = ModelPT.restore_from(parent_path2) |
|
|
|
|
|
for parent in (parent1, parent2): |
|
assert parent.temp_data == parent_data |
|
assert parent.child1_model.temp_data == child1_data |
|
assert parent.child2_model.temp_data == child2_data |
|
|
|
del parent2 |
|
|
|
|
|
parent_path3 = os.path.join(tmpdir_parent3, "parent.nemo") |
|
parent_path4 = os.path.join(tmpdir_parent4, "parent.nemo") |
|
parent1.save_to(parent_path3) |
|
parent1.save_to(parent_path4) |
|
|
|
parent3 = ModelPT.restore_from(parent_path3) |
|
parent4 = ModelPT.restore_from(parent_path4) |
|
|
|
|
|
for parent in (parent3, parent4): |
|
assert parent.temp_data == parent_data |
|
assert parent.child1_model.temp_data == child1_data |
|
assert parent.child2_model.temp_data == child2_data |
|
|
|
@pytest.mark.unit |
|
def test_mock_model_nested_double_with_resources(self): |
|
""" |
|
test nested model: parent -> child_with_child -> child; model and each child can be saved/restored separately |
|
all models can contain resources |
|
""" |
|
with tempfile.NamedTemporaryFile('w') as file_child, tempfile.NamedTemporaryFile( |
|
'w' |
|
) as file_child_with_child, tempfile.NamedTemporaryFile('w') as file_parent: |
|
|
|
parent_data = ["*****\n"] |
|
child_with_child_data = ["+++++\n"] |
|
child_data = ["-----\n"] |
|
file_parent.writelines(parent_data) |
|
file_parent.flush() |
|
file_child_with_child.writelines(child_with_child_data) |
|
file_child_with_child.flush() |
|
file_child.writelines(child_data) |
|
file_child.flush() |
|
|
|
|
|
cfg_child = _mock_model_config() |
|
cfg_child.model.temp_file = file_child.name |
|
child = MockModel(cfg=cfg_child.model, trainer=None) |
|
child = child.to('cpu') |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir_parent: |
|
parent_path = os.path.join(tmpdir_parent, "parent.nemo") |
|
with tempfile.TemporaryDirectory() as tmpdir_child_with_child: |
|
child_with_child_path = os.path.join(tmpdir_child_with_child, 'child_with_child.nemo') |
|
with tempfile.TemporaryDirectory() as tmpdir_child: |
|
|
|
child_path = os.path.join(tmpdir_child, 'child.nemo') |
|
child.save_to(child_path) |
|
|
|
|
|
cfg_child_with_child = _mock_model_with_children_config( |
|
child1_model_path=None, child2_model_path=child_path |
|
) |
|
cfg_child_with_child.model.temp_file = file_child_with_child.name |
|
child_with_child = MockModelWithChildren(cfg_child_with_child.model) |
|
child_with_child.save_to(child_with_child_path) |
|
|
|
cfg_parent = _mock_model_with_children_config( |
|
child1_model_path=None, child2_model_path=child_with_child_path |
|
) |
|
cfg_parent.model.temp_file = file_parent.name |
|
parent = MockModelWithChildren(cfg_parent.model) |
|
parent.save_to(parent_path) |
|
|
|
|
|
|
|
parent = ModelPT.restore_from(parent_path) |
|
|
|
child = self.__test_restore_elsewhere(parent.child2_model.child2_model, map_location='cpu') |
|
child_with_child = self.__test_restore_elsewhere(parent.child2_model, map_location='cpu') |
|
parent = self.__test_restore_elsewhere(parent, map_location='cpu') |
|
|
|
|
|
|
|
assert child.temp_data == child_data |
|
|
|
assert child_with_child.temp_data == child_with_child_data |
|
assert child_with_child.child2_model.temp_data == child_data |
|
|
|
assert parent.temp_data == parent_data |
|
assert parent.child2_model.temp_data == child_with_child_data |
|
assert parent.child2_model.child2_model.temp_data == child_data |
|
|
|
|
|
|
|
named_nemo_modules = list(parent.named_nemo_modules()) |
|
etalon_nemo_modules = [ |
|
("", "", parent), |
|
("child2_model", "child2_model", parent.child2_model), |
|
("child2_model.child2_model", "child2_model.child2_model", parent.child2_model.child2_model), |
|
] |
|
assert len(named_nemo_modules) == len(etalon_nemo_modules) |
|
for etalon, actual in zip(etalon_nemo_modules, named_nemo_modules): |
|
assert etalon[0] == actual[0] |
|
assert etalon[1] == actual[1] |
|
assert etalon[2] is actual[2] |
|
|
|
@pytest.mark.unit |
|
@pytest.mark.with_downloads |
|
def test_mock_model_nested_child_from_pretrained(self): |
|
""" |
|
Test nested model with child initialized from pretrained model |
|
""" |
|
cfg = _mock_model_with_child_encdecctcbpe_config("stt_en_conformer_ctc_small") |
|
parent = MockModelWithChildEncDecCTCBPE(cfg=cfg.model, trainer=None) |
|
with tempfile.TemporaryDirectory() as tmpdir_parent: |
|
parent_path = os.path.join(tmpdir_parent, "parent.nemo") |
|
|
|
|
|
parent.save_to(parent_path) |
|
parent = ModelPT.restore_from(parent_path) |
|
|
|
|
|
_ = self.__test_restore_elsewhere(parent.ctc_model, map_location='cpu') |
|
|
|
parent = self.__test_restore_elsewhere(parent, map_location='cpu') |
|
assert isinstance(parent.ctc_model, EncDecCTCModel) |
|
|
|
@pytest.mark.unit |
|
def test_mock_model_nested_custom_config_field(self): |
|
""" |
|
Test nested model with custom config field not equal to attribute name |
|
Config is stored in `child1_model_config` |
|
Child model is stored in `child1_model` attribute |
|
""" |
|
with tempfile.NamedTemporaryFile('w') as file_child1, tempfile.NamedTemporaryFile('w') as file_parent: |
|
|
|
parent_data = ["*****\n"] |
|
child1_data = ["+++++\n"] |
|
file_parent.writelines(parent_data) |
|
file_parent.flush() |
|
file_child1.writelines(child1_data) |
|
file_child1.flush() |
|
|
|
cfg = _mock_model_with_child_custom_config_path_config() |
|
cfg.model.temp_file = file_parent.name |
|
cfg.model.child1_model_config.temp_file = file_child1.name |
|
|
|
|
|
parent = MockModelWithChildCustomConfigPath(cfg=cfg.model, trainer=None) |
|
with tempfile.TemporaryDirectory() as tmpdir_parent: |
|
parent_path = os.path.join(tmpdir_parent, "parent.nemo") |
|
|
|
|
|
parent.save_to(parent_path) |
|
parent = ModelPT.restore_from(parent_path) |
|
|
|
_ = self.__test_restore_elsewhere(parent.child1_model, map_location='cpu') |
|
|
|
parent = self.__test_restore_elsewhere(parent, map_location='cpu') |
|
|
|
|
|
assert parent.temp_data == parent_data |
|
assert parent.child1_model.temp_data == child1_data |
|
|
|
|
|
named_nemo_modules = list(parent.named_nemo_modules()) |
|
etalon_nemo_modules = [("", "", parent), ("child1_model", "child1_model_config", parent.child1_model)] |
|
assert len(named_nemo_modules) == len(etalon_nemo_modules) |
|
for etalon, actual in zip(etalon_nemo_modules, named_nemo_modules): |
|
assert etalon[0] == actual[0] |
|
assert etalon[1] == actual[1] |
|
assert etalon[2] is actual[2] |
|
|
|
@pytest.mark.unit |
|
def test_using_nemo_checkpoint_as_artifact_disallowed(self): |
|
""" |
|
Test that using nemo checkpoint as artifact is disallowed |
|
""" |
|
cfg_child = _mock_model_config() |
|
child = MockModel(cfg=cfg_child.model, trainer=None).to("cpu") |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
child_path = os.path.join(tmpdir, "child.nemo") |
|
child.save_to(child_path) |
|
|
|
cfg_parent = _mock_model_incorrect_with_nemo_artifact_config(child_path) |
|
with pytest.raises(NeMoBaseException): |
|
|
|
_ = MockModelIncorrectWithNemoArtifact(cfg=cfg_parent.model, trainer=None) |
|
|
|
@pytest.mark.unit |
|
def test_restore_from_save_restore_connector_extracted_dir(self): |
|
class MySaveRestoreConnector(save_restore_connector.SaveRestoreConnector): |
|
def save_to(self, model, save_path: str): |
|
save_path = save_path.replace(".nemo", "_XYZ.nemo") |
|
super().save_to(model, save_path) |
|
|
|
class MockModelV2(MockModel): |
|
pass |
|
|
|
with tempfile.TemporaryDirectory() as extracted_tempdir: |
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
|
cfg = _mock_model_config() |
|
|
|
|
|
save_path = os.path.join(tmpdir, 'save_custom.nemo') |
|
model_with_custom_connector = MockModel(cfg=cfg.model, trainer=None) |
|
model_with_custom_connector._save_restore_connector = MySaveRestoreConnector() |
|
model_with_custom_connector.save_to(save_path) |
|
|
|
nemo_filepath = os.path.join(tmpdir, 'save_custom_XYZ.nemo') |
|
assert os.path.exists(nemo_filepath) |
|
|
|
|
|
|
|
connector = MySaveRestoreConnector() |
|
MySaveRestoreConnector._unpack_nemo_file(nemo_filepath, extracted_tempdir) |
|
assert get_size(extracted_tempdir) > 0 |
|
|
|
|
|
|
|
|
|
connector.model_extracted_dir = extracted_tempdir |
|
|
|
|
|
|
|
restored_model = MockModelV2.restore_from(nemo_filepath, save_restore_connector=connector) |
|
assert type(restored_model) == MockModelV2 |
|
assert type(restored_model._save_restore_connector) == MySaveRestoreConnector |
|
|
|
|
|
appstate = AppState() |
|
original_metadata = appstate.get_model_metadata_from_guid(model_with_custom_connector.model_guid) |
|
assert original_metadata.restoration_path is None |
|
|
|
restored_metadata = appstate.get_model_metadata_from_guid(restored_model.model_guid) |
|
assert restored_metadata.restoration_path is not None |
|
|
|
|
|
|
|
assert extracted_tempdir in restored_metadata.restoration_path |
|
assert extracted_tempdir not in nemo_filepath |
|
assert not os.path.exists(nemo_filepath) |
|
|
|
|
|
model_with_custom_connector = model_with_custom_connector.to('cpu') |
|
restored_model = restored_model.to('cpu') |
|
|
|
original_state_dict = model_with_custom_connector.state_dict() |
|
restored_state_dict = restored_model.state_dict() |
|
for orig, restored in zip(original_state_dict.keys(), restored_state_dict.keys()): |
|
assert (original_state_dict[orig] - restored_state_dict[restored]).abs().mean() < 1e-6 |
|
|
|
@pytest.mark.unit |
|
def test_hf_model_filter(self): |
|
filt = ModelPT.get_hf_model_filter() |
|
assert isinstance(filt, ModelFilter) |
|
assert filt.library == 'nemo' |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_hf_model_info(self): |
|
filt = ModelPT.get_hf_model_filter() |
|
|
|
|
|
model_infos = ModelPT.search_huggingface_models(model_filter=None) |
|
assert len(model_infos) > 0 |
|
|
|
|
|
default_model_infos = ModelPT.search_huggingface_models(model_filter=filt) |
|
assert len(model_infos) == len(default_model_infos) |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_hf_model_info_with_card_data(self): |
|
filt = ModelPT.get_hf_model_filter() |
|
|
|
|
|
model_infos = ModelPT.search_huggingface_models(model_filter=filt) |
|
assert len(model_infos) > 0 |
|
assert not hasattr(model_infos[0], 'cardData') |
|
|
|
|
|
filt.resolve_card_info = True |
|
model_infos = ModelPT.search_huggingface_models(model_filter=filt) |
|
assert len(model_infos) > 0 |
|
|
|
for info in model_infos: |
|
if hasattr(info, 'cardData'): |
|
assert info.cardData is not None |
|
break |
|
|
|
@pytest.mark.with_downloads() |
|
@pytest.mark.unit |
|
def test_hf_model_info_with_limited_results(self): |
|
filt = ModelPT.get_hf_model_filter() |
|
|
|
|
|
model_infos = ModelPT.search_huggingface_models(model_filter=filt) |
|
assert len(model_infos) > 0 |
|
|
|
|
|
filt.limit_results = 5 |
|
new_model_infos = ModelPT.search_huggingface_models(model_filter=filt) |
|
assert len(new_model_infos) <= 5 |
|
assert len(new_model_infos) < len(model_infos) |
|
|