Spaces:
Runtime error
Runtime error
# Copyright (C) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# | |
# This work is made available under the Nvidia Source Code License-NC. | |
# To view a copy of this license, check out LICENSE.md | |
""" | |
Modified from https://github.com/abdulfatir/gan-metrics-pytorch | |
Copyright 2018 Institute of Bioinformatics, JKU Linz | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
""" | |
import os | |
import warnings | |
import numpy as np | |
import torch | |
from imaginaire.evaluation.common import get_activations, \ | |
load_or_compute_activations | |
from imaginaire.utils.distributed import is_master | |
from imaginaire.utils.distributed import master_only_print as print | |
def compute_kid(kid_path, data_loader, net_G, | |
key_real='images', key_fake='fake_images', | |
real_act=None, fake_act=None, | |
sample_size=None, preprocess=None, is_video=False, | |
save_act=True, num_subsets=1, subset_size=None, **kwargs): | |
r"""Compute the kid score. | |
Args: | |
kid_path (str): Location for store feature activations. | |
data_loader (obj): PyTorch dataloader object. | |
net_G (obj): For image generation modes, net_G is the PyTorch trainer | |
network. For video generation models, net_G is the trainer | |
because video generation requires more complicated processing. | |
key_real (str): Dictionary key value for the real data. | |
key_fake (str): Dictionary key value for the fake data. | |
real_act (torch.Tensor or None): Feature activations of real data. | |
fake_act (torch.Tensor or None): Feature activations of fake data. | |
sample_size (int): How many samples to be used for computing feature | |
activations. | |
preprocess (func): The preprocess function to be applied to the data. | |
is_video (bool): Whether we are handling video sequences. | |
save_act (bool): If ``True``, saves real activations to the disk and | |
reload them in the future. It might save some computation but will | |
cost storage. | |
num_subsets (int): Number of subsets to sample from all the samples. | |
subset_size (int): Number of samples in each subset. | |
Returns: | |
kid (float): KID value. | |
""" | |
print('Computing KID.') | |
act_path = os.path.join( | |
os.path.dirname(kid_path), 'activations_real.npy' | |
) if save_act else None | |
# Get the fake activations. | |
if fake_act is None: | |
fake_act = load_or_compute_activations(None, | |
data_loader, | |
key_real, key_fake, net_G, | |
sample_size, preprocess, | |
is_video=is_video, **kwargs) | |
else: | |
print(f"Using precomputed activations of size {fake_act.shape}.") | |
# Get the ground truth activations. | |
if real_act is None: | |
real_act = load_or_compute_activations(act_path, | |
data_loader, | |
key_real, key_fake, None, | |
sample_size, preprocess, | |
is_video=is_video, **kwargs) | |
else: | |
print(f"Using precomputed activations of size {real_act.shape}.") | |
if is_master(): | |
return _polynomial_mmd_averages(fake_act, real_act, | |
num_subsets, | |
subset_size, | |
ret_var=True)["KID"] | |
def compute_kid_data(kid_path, data_loader_a, data_loader_b, | |
key_a='images', key_b='images', sample_size=None, | |
is_video=False, num_subsets=1, subset_size=None, | |
**kwargs): | |
r"""Compute the kid score between two datasets. | |
Args: | |
kid_path (str): Location for store feature activations. | |
data_loader_a (obj): PyTorch dataloader object for dataset a. | |
data_loader_b (obj): PyTorch dataloader object for dataset b. | |
key_a (str): Dictionary key value for images in the dataset a. | |
key_b (str): Dictionary key value for images in the dataset b. | |
sample_size (int): How many samples to be used for computing the KID. | |
is_video (bool): Whether we are handling video sequences. | |
num_subsets (int): Number of subsets to sample from the whole data. | |
subset_size (int): Number of samples in each subset. | |
Returns: | |
kid (float): KID value. | |
""" | |
min_data_size = min(len(data_loader_a.dataset), | |
len(data_loader_b.dataset)) | |
if sample_size is None: | |
sample_size = min_data_size | |
else: | |
sample_size = min(sample_size, min_data_size) | |
print('Computing KID using {} images from both distributions.'. | |
format(sample_size)) | |
path_a = os.path.join(os.path.dirname(kid_path), | |
'activations_a.npy') | |
act_a = load_or_compute_activations(path_a, data_loader_a, | |
key_a, key_a, | |
sample_size=sample_size, | |
is_video=is_video, **kwargs) | |
act_b = get_activations(data_loader_b, key_b, key_b, | |
None, sample_size, None, **kwargs) | |
if is_master(): | |
return _polynomial_mmd_averages(act_a, act_b, | |
num_subsets, | |
subset_size, | |
ret_var=True)["KID"] | |
def _polynomial_mmd_averages(codes_g, codes_r, n_subsets, subset_size, | |
ret_var=True, **kernel_args): | |
r"""Computes MMD between two sets of features using polynomial kernels. It | |
performs a number of repetitions of subset sampling without replacement. | |
Args: | |
codes_g (Tensor): Feature activations of generated images. | |
codes_r (Tensor): Feature activations of real images. | |
n_subsets (int): The number of subsets. | |
subset_size (int): The number of samples in each subset. | |
ret_var (bool): If ``True``, returns both mean and variance of MMDs, | |
otherwise only returns the mean. | |
Returns: | |
(tuple): | |
- mmds (Tensor): Mean of MMDs. | |
- mmd_vars (Tensor): Variance of MMDs. | |
""" | |
mmds = np.zeros(n_subsets) | |
if ret_var: | |
mmd_vars = np.zeros(n_subsets) | |
choice = np.random.choice | |
if subset_size is None: | |
subset_size = min(len(codes_r), len(codes_r)) | |
print("Subset size not provided, " | |
"setting it to the data size ({}).".format(subset_size)) | |
if subset_size > len(codes_g) or subset_size > len(codes_r): | |
subset_size = min(len(codes_r), len(codes_r)) | |
warnings.warn( | |
"Subset size is large than the actual data size, " | |
"setting it to the data size ({}).".format(subset_size)) | |
for i in range(n_subsets): | |
g = codes_g[choice(len(codes_g), subset_size, replace=False)] | |
r = codes_r[choice(len(codes_r), subset_size, replace=False)] | |
o = _polynomial_mmd(g, r, **kernel_args, ret_var=ret_var) | |
if ret_var: | |
# noinspection PyUnboundLocalVariable | |
mmds[i], mmd_vars[i] = o | |
else: | |
mmds[i] = o | |
return {'KID': mmds.mean()} | |
def _polynomial_kernel(X, Y=None, degree=3, gamma=None, coef0=1.): | |
r"""Compute the polynomial kernel between X and Y""" | |
if gamma is None: | |
gamma = 1.0 / X.shape[1] | |
if Y is None: | |
Y = X | |
# K = safe_sparse_dot(X, Y.T, dense_output=True) | |
K = torch.matmul(X, Y.t()) | |
K *= gamma | |
K += coef0 | |
K = K ** degree | |
return K | |
def _polynomial_mmd(codes_g, codes_r, degree=3, gamma=None, coef0=1, | |
ret_var=True): | |
r"""Computes MMD between two sets of features using polynomial kernels. It | |
performs a number of repetitions of subset sampling without replacement. | |
Args: | |
codes_g (torch.Tensor): Feature activations of generated images. | |
codes_r (torch.Tensor): Feature activations of real images. | |
degree (int): The degree of the polynomial kernel. | |
gamma (float or None): Scale of the polynomial kernel. | |
coef0 (float or None): Bias of the polynomial kernel. | |
ret_var (bool): If ``True``, returns both mean and variance of MMDs, | |
otherwise only returns the mean. | |
Returns: | |
(tuple): | |
- mmds (torch.Tensor): Mean of MMDs. | |
- mmd_vars (torch.Tensor): Variance of MMDs. | |
""" | |
# use k(x, y) = (gamma <x, y> + coef0)^degree | |
# default gamma is 1 / dim | |
X = codes_g | |
Y = codes_r | |
# with warnings.catch_warnings(): | |
# warnings.simplefilter('ignore') | |
K_XX = _polynomial_kernel(X, degree=degree, gamma=gamma, coef0=coef0) | |
K_YY = _polynomial_kernel(Y, degree=degree, gamma=gamma, coef0=coef0) | |
K_XY = _polynomial_kernel(X, Y, degree=degree, gamma=gamma, coef0=coef0) | |
return _mmd2_and_variance(K_XX, K_XY, K_YY, ret_var=ret_var) | |
def _mmd2_and_variance(K_XX, K_XY, K_YY, unit_diagonal=False, | |
mmd_est='unbiased', ret_var=True): | |
r"""Based on | |
https://github.com/dougalsutherland/opt-mmd/blob/master/two_sample/mmd.py | |
but changed to not compute the full kernel matrix at once | |
""" | |
m = K_XX.shape[0] | |
assert K_XX.shape == (m, m) | |
assert K_XY.shape == (m, m) | |
assert K_YY.shape == (m, m) | |
var_at_m = m | |
# Get the various sums of kernels that we'll use | |
# Kts drop the diagonal, but we don't need to compute them explicitly | |
if unit_diagonal: | |
diag_X = diag_Y = 1 | |
sum_diag_X = sum_diag_Y = m | |
sum_diag2_X = sum_diag2_Y = m | |
else: | |
diag_X = torch.diagonal(K_XX) | |
diag_Y = torch.diagonal(K_YY) | |
sum_diag_X = diag_X.sum() | |
sum_diag_Y = diag_Y.sum() | |
sum_diag2_X = _sqn(diag_X) | |
sum_diag2_Y = _sqn(diag_Y) | |
Kt_XX_sums = K_XX.sum(dim=1) - diag_X | |
Kt_YY_sums = K_YY.sum(dim=1) - diag_Y | |
K_XY_sums_0 = K_XY.sum(dim=0) | |
K_XY_sums_1 = K_XY.sum(dim=1) | |
Kt_XX_sum = Kt_XX_sums.sum() | |
Kt_YY_sum = Kt_YY_sums.sum() | |
K_XY_sum = K_XY_sums_0.sum() | |
if mmd_est == 'biased': | |
mmd2 = ((Kt_XX_sum + sum_diag_X) / (m * m) | |
+ (Kt_YY_sum + sum_diag_Y) / (m * m) | |
- 2 * K_XY_sum / (m * m)) | |
else: | |
assert mmd_est in {'unbiased', 'u-statistic'} | |
mmd2 = (Kt_XX_sum + Kt_YY_sum) / (m * (m - 1)) | |
if mmd_est == 'unbiased': | |
mmd2 -= 2 * K_XY_sum / (m * m) | |
else: | |
mmd2 -= 2 * (K_XY_sum - torch.trace(K_XY)) / (m * (m - 1)) | |
if not ret_var: | |
return mmd2 | |
Kt_XX_2_sum = _sqn(K_XX) - sum_diag2_X | |
Kt_YY_2_sum = _sqn(K_YY) - sum_diag2_Y | |
K_XY_2_sum = _sqn(K_XY) | |
dot_XX_XY = Kt_XX_sums.dot(K_XY_sums_1) | |
dot_YY_YX = Kt_YY_sums.dot(K_XY_sums_0) | |
m1 = m - 1 | |
m2 = m - 2 | |
zeta1_est = ( | |
1 / (m * m1 * m2) * | |
(_sqn(Kt_XX_sums) - Kt_XX_2_sum + _sqn(Kt_YY_sums) - Kt_YY_2_sum) | |
- 1 / (m * m1) ** 2 * (Kt_XX_sum ** 2 + Kt_YY_sum ** 2) | |
+ 1 / (m * m * m1) * ( | |
_sqn(K_XY_sums_1) + _sqn(K_XY_sums_0) - 2 * K_XY_2_sum) | |
- 2 / m ** 4 * K_XY_sum ** 2 | |
- 2 / (m * m * m1) * (dot_XX_XY + dot_YY_YX) | |
+ 2 / (m ** 3 * m1) * (Kt_XX_sum + Kt_YY_sum) * K_XY_sum | |
) | |
zeta2_est = ( | |
1 / (m * m1) * (Kt_XX_2_sum + Kt_YY_2_sum) | |
- 1 / (m * m1) ** 2 * (Kt_XX_sum ** 2 + Kt_YY_sum ** 2) | |
+ 2 / (m * m) * K_XY_2_sum | |
- 2 / m ** 4 * K_XY_sum ** 2 | |
- 4 / (m * m * m1) * (dot_XX_XY + dot_YY_YX) | |
+ 4 / (m ** 3 * m1) * (Kt_XX_sum + Kt_YY_sum) * K_XY_sum | |
) | |
var_est = (4 * (var_at_m - 2) / (var_at_m * (var_at_m - 1)) * zeta1_est | |
+ 2 / (var_at_m * (var_at_m - 1)) * zeta2_est) | |
return mmd2.cpu().numpy(), var_est.cpu().numpy() | |
def _sqn(arr): | |
r"""Squared norm.""" | |
flat = arr.view(-1) | |
return flat.dot(flat) | |