assi1 / ELR /data_loader /cifar100.py
uthurumella's picture
Upload 69 files
72fc481 verified
raw
history blame contribute delete
11.1 kB
import sys
import numpy as np
from PIL import Image
import torchvision
from torch.utils.data.dataset import Subset
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
import torch
import torch.nn.functional as F
import random
import os
import json
from numpy.testing import assert_array_almost_equal
def get_cifar100(root, cfg_trainer, train=True,
transform_train=None, transform_val=None,
download=False, noise_file = ''):
base_dataset = torchvision.datasets.CIFAR100(root, train=train, download=download)
if train:
train_idxs, val_idxs = train_val_split(base_dataset.targets)
train_dataset = CIFAR100_train(root, cfg_trainer, train_idxs, train=True, transform=transform_train)
val_dataset = CIFAR100_val(root, cfg_trainer, val_idxs, train=train, transform=transform_val)
if cfg_trainer['asym']:
train_dataset.asymmetric_noise()
val_dataset.asymmetric_noise()
else:
train_dataset.symmetric_noise()
val_dataset.symmetric_noise()
print(f"Train: {len(train_dataset)} Val: {len(val_dataset)}") # Train: 45000 Val: 5000
else:
train_dataset = []
val_dataset = CIFAR100_val(root, cfg_trainer, None, train=train, transform=transform_val)
print(f"Test: {len(val_dataset)}")
return train_dataset, val_dataset
def train_val_split(base_dataset: torchvision.datasets.CIFAR100):
num_classes = 100
base_dataset = np.array(base_dataset)
train_n = int(len(base_dataset) * 0.9 / num_classes)
train_idxs = []
val_idxs = []
for i in range(num_classes):
idxs = np.where(base_dataset == i)[0]
np.random.shuffle(idxs)
train_idxs.extend(idxs[:train_n])
val_idxs.extend(idxs[train_n:])
np.random.shuffle(train_idxs)
np.random.shuffle(val_idxs)
return train_idxs, val_idxs
class CIFAR100_train(torchvision.datasets.CIFAR100):
def __init__(self, root, cfg_trainer, indexs, train=True,
transform=None, target_transform=None,
download=False):
super(CIFAR100_train, self).__init__(root, train=train,
transform=transform, target_transform=target_transform,
download=download)
self.num_classes = 100
self.cfg_trainer = cfg_trainer
self.train_data = self.data[indexs]
self.train_labels = np.array(self.targets)[indexs]
self.indexs = indexs
self.prediction = np.zeros((len(self.train_data), self.num_classes, self.num_classes), dtype=np.float32)
self.noise_indx = []
#self.all_refs_encoded = torch.zeros(self.num_classes,self.num_ref,1024, dtype=np.float32)
self.count = 0
def symmetric_noise(self):
self.train_labels_gt = self.train_labels.copy()
indices = np.random.permutation(len(self.train_data))
for i, idx in enumerate(indices):
if i < self.cfg_trainer['percent'] * len(self.train_data):
self.noise_indx.append(idx)
self.train_labels[idx] = np.random.randint(self.num_classes, dtype=np.int32)
def multiclass_noisify(self, y, P, random_state=0):
""" Flip classes according to transition probability matrix T.
It expects a number between 0 and the number of classes - 1.
"""
assert P.shape[0] == P.shape[1]
assert np.max(y) < P.shape[0]
# row stochastic matrix
assert_array_almost_equal(P.sum(axis=1), np.ones(P.shape[1]))
assert (P >= 0.0).all()
m = y.shape[0]
new_y = y.copy()
flipper = np.random.RandomState(random_state)
for idx in np.arange(m):
i = y[idx]
# draw a vector with only an 1
flipped = flipper.multinomial(1, P[i, :], 1)[0]
new_y[idx] = np.where(flipped == 1)[0]
return new_y
# def build_for_cifar100(self, size, noise):
# """ random flip between two random classes.
# """
# assert(noise >= 0.) and (noise <= 1.)
# P = np.eye(size)
# cls1, cls2 = np.random.choice(range(size), size=2, replace=False)
# P[cls1, cls2] = noise
# P[cls2, cls1] = noise
# P[cls1, cls1] = 1.0 - noise
# P[cls2, cls2] = 1.0 - noise
# assert_array_almost_equal(P.sum(axis=1), 1, 1)
# return P
def build_for_cifar100(self, size, noise):
""" The noise matrix flips to the "next" class with probability 'noise'.
"""
assert(noise >= 0.) and (noise <= 1.)
P = (1. - noise) * np.eye(size)
for i in np.arange(size - 1):
P[i, i + 1] = noise
# adjust last row
P[size - 1, 0] = noise
assert_array_almost_equal(P.sum(axis=1), 1, 1)
return P
def asymmetric_noise(self, asym=False, random_shuffle=False):
self.train_labels_gt = self.train_labels.copy()
P = np.eye(self.num_classes)
n = self.cfg_trainer['percent']
nb_superclasses = 20
nb_subclasses = 5
if n > 0.0:
for i in np.arange(nb_superclasses):
init, end = i * nb_subclasses, (i+1) * nb_subclasses
P[init:end, init:end] = self.build_for_cifar100(nb_subclasses, n)
y_train_noisy = self.multiclass_noisify(self.train_labels, P=P,
random_state=0)
actual_noise = (y_train_noisy != self.train_labels).mean()
assert actual_noise > 0.0
self.train_labels = y_train_noisy
def __getitem__(self, index):
"""
Args:
index (int): Index
Returns:
tuple: (image, target) where target is index of the target class.
"""
img, target, target_gt = self.train_data[index], self.train_labels[index], self.train_labels_gt[index]
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img)
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target, index, target_gt
def __len__(self):
return len(self.train_data)
class CIFAR100_val(torchvision.datasets.CIFAR100):
def __init__(self, root, cfg_trainer, indexs, train=True,
transform=None, target_transform=None,
download=False):
super(CIFAR100_val, self).__init__(root, train=train,
transform=transform, target_transform=target_transform,
download=download)
# self.train_data = self.data[indexs]
# self.train_labels = np.array(self.targets)[indexs]
self.num_classes = 100
self.cfg_trainer = cfg_trainer
if train:
self.train_data = self.data[indexs]
self.train_labels = np.array(self.targets)[indexs]
else:
self.train_data = self.data
self.train_labels = np.array(self.targets)
self.train_labels_gt = self.train_labels.copy()
def symmetric_noise(self):
indices = np.random.permutation(len(self.train_data))
for i, idx in enumerate(indices):
if i < self.cfg_trainer['percent'] * len(self.train_data):
self.train_labels[idx] = np.random.randint(self.num_classes, dtype=np.int32)
def multiclass_noisify(self, y, P, random_state=0):
""" Flip classes according to transition probability matrix T.
It expects a number between 0 and the number of classes - 1.
"""
assert P.shape[0] == P.shape[1]
assert np.max(y) < P.shape[0]
# row stochastic matrix
assert_array_almost_equal(P.sum(axis=1), np.ones(P.shape[1]))
assert (P >= 0.0).all()
m = y.shape[0]
new_y = y.copy()
flipper = np.random.RandomState(random_state)
for idx in np.arange(m):
i = y[idx]
# draw a vector with only an 1
flipped = flipper.multinomial(1, P[i, :], 1)[0]
new_y[idx] = np.where(flipped == 1)[0]
return new_y
# def build_for_cifar100(self, size, noise):
# """ random flip between two random classes.
# """
# assert(noise >= 0.) and (noise <= 1.)
# P = np.eye(size)
# cls1, cls2 = np.random.choice(range(size), size=2, replace=False)
# P[cls1, cls2] = noise
# P[cls2, cls1] = noise
# P[cls1, cls1] = 1.0 - noise
# P[cls2, cls2] = 1.0 - noise
# assert_array_almost_equal(P.sum(axis=1), 1, 1)
# return P
def build_for_cifar100(self, size, noise):
""" The noise matrix flips to the "next" class with probability 'noise'.
"""
assert(noise >= 0.) and (noise <= 1.)
P = (1. - noise) * np.eye(size)
for i in np.arange(size - 1):
P[i, i + 1] = noise
# adjust last row
P[size - 1, 0] = noise
assert_array_almost_equal(P.sum(axis=1), 1, 1)
return P
def asymmetric_noise(self, asym=False, random_shuffle=False):
P = np.eye(self.num_classes)
n = self.cfg_trainer['percent']
nb_superclasses = 20
nb_subclasses = 5
if n > 0.0:
for i in np.arange(nb_superclasses):
init, end = i * nb_subclasses, (i+1) * nb_subclasses
P[init:end, init:end] = self.build_for_cifar100(nb_subclasses, n)
y_train_noisy = self.multiclass_noisify(self.train_labels, P=P,
random_state=0)
actual_noise = (y_train_noisy != self.train_labels).mean()
assert actual_noise > 0.0
self.train_labels = y_train_noisy
def __len__(self):
return len(self.train_data)
def __getitem__(self, index):
"""
Args:
index (int): Index
Returns:
tuple: (image, target) where target is index of the target class.
"""
img, target, target_gt = self.train_data[index], self.train_labels[index], self.train_labels_gt[index]
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img)
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target, index, target_gt