|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gc |
|
|
|
import pytest |
|
import torch |
|
from datasets import load_dataset |
|
from safetensors.torch import load_file |
|
from transformers import AutoImageProcessor, AutoModelForImageClassification |
|
|
|
from peft import LoHaConfig, LoKrConfig, LoraConfig, OFTConfig, PeftModel, get_peft_model |
|
|
|
|
|
CONFIGS = { |
|
"lora": LoraConfig(target_modules=["convolution"], modules_to_save=["classifier", "normalization"]), |
|
"loha": LoHaConfig(target_modules=["convolution"], modules_to_save=["classifier", "normalization"]), |
|
"lokr": LoKrConfig(target_modules=["convolution"], modules_to_save=["classifier", "normalization"]), |
|
"oft": OFTConfig(target_modules=["convolution"], modules_to_save=["classifier", "normalization"]), |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
class TestResnet: |
|
model_id = "microsoft/resnet-18" |
|
|
|
@pytest.fixture(autouse=True) |
|
def teardown(self): |
|
r""" |
|
Efficient mechanism to free GPU memory after each test. Based on |
|
https://github.com/huggingface/transformers/issues/21094 |
|
""" |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
@pytest.fixture(scope="class") |
|
def image_processor(self): |
|
image_processor = AutoImageProcessor.from_pretrained(self.model_id) |
|
return image_processor |
|
|
|
@pytest.fixture(scope="class") |
|
def data(self, image_processor): |
|
dataset = load_dataset("huggingface/cats-image", trust_remote_code=True) |
|
image = dataset["test"]["image"][0] |
|
return image_processor(image, return_tensors="pt") |
|
|
|
@pytest.mark.parametrize("config", CONFIGS.values(), ids=CONFIGS.keys()) |
|
def test_model_with_batchnorm_reproducibility(self, config, tmp_path, data): |
|
|
|
torch.manual_seed(0) |
|
model = AutoModelForImageClassification.from_pretrained(self.model_id) |
|
model = get_peft_model(model, config) |
|
|
|
|
|
model.eval() |
|
with torch.inference_mode(): |
|
output_before = model(**data) |
|
model.train() |
|
|
|
|
|
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3) |
|
batch_size = 4 |
|
max_steps = 5 * batch_size |
|
labels = torch.zeros(1, 1000) |
|
labels[0, 283] = 1 |
|
for i in range(0, max_steps, batch_size): |
|
optimizer.zero_grad() |
|
outputs = model(**data, labels=labels) |
|
loss = outputs.loss |
|
loss.backward() |
|
optimizer.step() |
|
|
|
|
|
model.eval() |
|
with torch.inference_mode(): |
|
output_after = model(**data) |
|
assert torch.isfinite(output_after.logits).all() |
|
atol, rtol = 1e-4, 1e-4 |
|
|
|
assert not torch.allclose(output_before.logits, output_after.logits, atol=atol, rtol=rtol) |
|
|
|
|
|
model.save_pretrained(tmp_path) |
|
del model |
|
|
|
torch.manual_seed(0) |
|
model = AutoModelForImageClassification.from_pretrained(self.model_id) |
|
model = PeftModel.from_pretrained(model, tmp_path).eval() |
|
with torch.inference_mode(): |
|
output_loaded = model(**data) |
|
assert torch.allclose(output_after.logits, output_loaded.logits, atol=atol, rtol=rtol) |
|
|
|
|
|
model_running_mean = len([k for k in model.state_dict().keys() if "running_mean" in k]) |
|
state_dict = load_file(tmp_path / "adapter_model.safetensors") |
|
checkpoint_running_mean = len([k for k in state_dict.keys() if "running_mean" in k]) |
|
|
|
|
|
assert model_running_mean == checkpoint_running_mean * 2 |
|
|