|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import collections |
|
import os |
|
from inspect import signature |
|
from typing import Any, Callable, Optional, Union |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
__all__ = [ |
|
"is_parallel", |
|
"get_device", |
|
"get_same_padding", |
|
"resize", |
|
"build_kwargs_from_config", |
|
"load_state_dict_from_file", |
|
"get_submodule_weights", |
|
] |
|
|
|
|
|
def is_parallel(model: nn.Module) -> bool: |
|
return isinstance(model, (nn.parallel.DataParallel, nn.parallel.DistributedDataParallel)) |
|
|
|
|
|
def get_device(model: nn.Module) -> torch.device: |
|
return model.parameters().__next__().device |
|
|
|
|
|
def get_dtype(model: nn.Module) -> torch.dtype: |
|
return model.parameters().__next__().dtype |
|
|
|
|
|
def get_same_padding(kernel_size: Union[int, tuple[int, ...]]) -> Union[int, tuple[int, ...]]: |
|
if isinstance(kernel_size, tuple): |
|
return tuple([get_same_padding(ks) for ks in kernel_size]) |
|
else: |
|
assert kernel_size % 2 > 0, "kernel size should be odd number" |
|
return kernel_size // 2 |
|
|
|
|
|
def resize( |
|
x: torch.Tensor, |
|
size: Optional[Any] = None, |
|
scale_factor: Optional[list[float]] = None, |
|
mode: str = "bicubic", |
|
align_corners: Optional[bool] = False, |
|
) -> torch.Tensor: |
|
if mode in {"bilinear", "bicubic"}: |
|
return F.interpolate( |
|
x, |
|
size=size, |
|
scale_factor=scale_factor, |
|
mode=mode, |
|
align_corners=align_corners, |
|
) |
|
elif mode in {"nearest", "area"}: |
|
return F.interpolate(x, size=size, scale_factor=scale_factor, mode=mode) |
|
else: |
|
raise NotImplementedError(f"resize(mode={mode}) not implemented.") |
|
|
|
|
|
def build_kwargs_from_config(config: dict, target_func: Callable) -> dict[str, Any]: |
|
valid_keys = list(signature(target_func).parameters) |
|
kwargs = {} |
|
for key in config: |
|
if key in valid_keys: |
|
kwargs[key] = config[key] |
|
return kwargs |
|
|
|
|
|
def load_state_dict_from_file(file: str, only_state_dict=True) -> dict[str, torch.Tensor]: |
|
file = os.path.realpath(os.path.expanduser(file)) |
|
checkpoint = torch.load(file, map_location="cpu", weights_only=True) |
|
if only_state_dict and "state_dict" in checkpoint: |
|
checkpoint = checkpoint["state_dict"] |
|
return checkpoint |
|
|
|
|
|
def get_submodule_weights(weights: collections.OrderedDict, prefix: str): |
|
submodule_weights = collections.OrderedDict() |
|
len_prefix = len(prefix) |
|
for key, weight in weights.items(): |
|
if key.startswith(prefix): |
|
submodule_weights[key[len_prefix:]] = weight |
|
return submodule_weights |
|
|
|
|
|
def get_dtype_from_str(dtype: str) -> torch.dtype: |
|
if dtype == "fp32": |
|
return torch.float32 |
|
if dtype == "fp16": |
|
return torch.float16 |
|
if dtype == "bf16": |
|
return torch.bfloat16 |
|
raise NotImplementedError(f"dtype {dtype} is not supported") |
|
|