|
from torch.utils.data import Dataset, DataLoader
|
|
import torchvision.transforms as transforms
|
|
import random
|
|
import numpy as np
|
|
from PIL import Image
|
|
import json
|
|
import torch
|
|
import os
|
|
import matplotlib
|
|
|
|
def unpickle(file):
|
|
fo = open(file, 'rb').read()
|
|
size = 64 * 64 * 3 + 1
|
|
for i in range(50000):
|
|
arr = np.fromstring(fo[i * size:(i + 1) * size], dtype=np.uint8)
|
|
lab = np.identity(10)[arr[0]]
|
|
img = arr[1:].reshape((3, 64, 64)).transpose((1, 2, 0))
|
|
return img, lab
|
|
|
|
class animal_dataset(Dataset):
|
|
def __init__(self, root, transform, mode, pred=[], path=[], probability=[], num_class=10):
|
|
|
|
self.root = root
|
|
self.transform = transform
|
|
self.mode = mode
|
|
|
|
self.train_dir = root + '/training/'
|
|
self.test_dir = root + '/testing/'
|
|
train_imgs = os.listdir(self.train_dir)
|
|
test_imgs = os.listdir(self.test_dir)
|
|
self.test_data = []
|
|
self.test_labels = []
|
|
noise_file1 = './training_batch.json'
|
|
noise_file2 = './testing_batch.json'
|
|
if mode == 'test':
|
|
if os.path.exists(noise_file2):
|
|
dict = json.load(open(noise_file2, "r"))
|
|
self.test_labels = dict['data']
|
|
self.test_data = dict['label']
|
|
else:
|
|
for img in test_imgs:
|
|
self.test_data.append(self.test_dir+img)
|
|
self.test_labels.append(int(img[0]))
|
|
dicts = {}
|
|
dicts['data'] = self.test_data
|
|
dicts['label'] = self.test_labels
|
|
|
|
else:
|
|
if os.path.exists(noise_file1):
|
|
dict = json.load(open(noise_file1, "r"))
|
|
train_data = dict['data']
|
|
train_labels = dict['label']
|
|
else:
|
|
train_data = []
|
|
train_labels = {}
|
|
for img in train_imgs:
|
|
img_path = self.train_dir+img
|
|
train_data.append(img_path)
|
|
train_labels[img_path] = (int(img[0]))
|
|
dicts = {}
|
|
dicts['data'] = train_data
|
|
dicts['label'] = train_labels
|
|
|
|
if self.mode == "all":
|
|
self.train_data = train_data
|
|
self.train_labels = train_labels
|
|
elif self.mode == "labeled":
|
|
pred_idx = pred.nonzero()[0]
|
|
train_img = path
|
|
self.train_data = [train_img[i] for i in pred_idx]
|
|
self.probability = probability[pred_idx]
|
|
|
|
print("%s data has a size of %d" % (self.mode, len(self.train_data)))
|
|
self.train_labels = train_labels
|
|
elif self.mode == "unlabeled":
|
|
pred_idx = (1 - pred).nonzero()[0]
|
|
train_img = path
|
|
self.train_data = [train_img[i] for i in pred_idx]
|
|
self.probability = probability[pred_idx]
|
|
|
|
print("%s data has a size of %d" % (self.mode, len(self.train_data)))
|
|
self.train_labels = train_labels
|
|
|
|
def __getitem__(self, index):
|
|
if self.mode == 'labeled':
|
|
img_path = self.train_data[index]
|
|
target = self.train_labels[img_path]
|
|
prob = self.probability[index]
|
|
image = Image.open(img_path).convert('RGB')
|
|
img1 = self.transform(image)
|
|
img2 = self.transform(image)
|
|
return img1, img2, target, prob
|
|
elif self.mode == 'unlabeled':
|
|
img_path = self.train_data[index]
|
|
image = Image.open(img_path).convert('RGB')
|
|
img1 = self.transform(image)
|
|
img2 = self.transform(image)
|
|
return img1, img2
|
|
elif self.mode == 'all':
|
|
img_path = self.train_data[index]
|
|
target = self.train_labels[img_path]
|
|
image = Image.open(img_path).convert('RGB')
|
|
img = self.transform(image)
|
|
return img, target,img_path
|
|
elif self.mode == 'test':
|
|
img_path = self.test_data[index]
|
|
target = self.test_labels[index]
|
|
image = Image.open(img_path).convert('RGB')
|
|
img = self.transform(image)
|
|
return img, target
|
|
|
|
def __len__(self):
|
|
if self.mode == 'test':
|
|
return len(self.test_data)
|
|
else:
|
|
return len(self.train_data)
|
|
|
|
|
|
class animal_dataloader():
|
|
def __init__(self, root='E:/2_Dataset_All/Animal-10N', batch_size=32, num_workers=0):
|
|
self.batch_size = batch_size
|
|
self.num_workers = num_workers
|
|
self.root = root
|
|
|
|
self.transform_train = transforms.Compose([
|
|
transforms.Resize(64),
|
|
transforms.RandomCrop(64),
|
|
transforms.RandomHorizontalFlip(),
|
|
transforms.ToTensor(),
|
|
transforms.Normalize((0.6959, 0.6537, 0.6371), (0.3113, 0.3192, 0.3214)),
|
|
])
|
|
self.transform_test = transforms.Compose([
|
|
|
|
|
|
transforms.ToTensor(),
|
|
transforms.Normalize((0.6959, 0.6537, 0.6371), (0.3113, 0.3192, 0.3214)),
|
|
])
|
|
|
|
def run(self, mode, pred=[], prob=[], paths=[]):
|
|
if mode == 'warmup':
|
|
warmup_dataset = animal_dataset(self.root, transform=self.transform_train, mode='all')
|
|
warmup_loader = DataLoader(
|
|
dataset=warmup_dataset,
|
|
batch_size=self.batch_size * 2,
|
|
shuffle=True,
|
|
num_workers=self.num_workers,
|
|
pin_memory=True)
|
|
return warmup_loader
|
|
elif mode == 'train':
|
|
labeled_dataset = animal_dataset(self.root, transform=self.transform_train, mode='labeled', pred=pred, path=paths,
|
|
probability=prob)
|
|
labeled_loader = DataLoader(
|
|
dataset=labeled_dataset,
|
|
batch_size=self.batch_size,
|
|
shuffle=True,
|
|
num_workers=self.num_workers,
|
|
pin_memory=True)
|
|
unlabeled_dataset = animal_dataset(self.root, transform=self.transform_train, mode='unlabeled', pred=pred,path=paths,
|
|
probability=prob)
|
|
unlabeled_loader = DataLoader(
|
|
dataset=unlabeled_dataset,
|
|
batch_size=int(self.batch_size),
|
|
shuffle=True,
|
|
num_workers=self.num_workers,
|
|
pin_memory=True)
|
|
return labeled_loader, unlabeled_loader
|
|
elif mode == 'eval_train':
|
|
eval_dataset = animal_dataset(self.root, transform=self.transform_test, mode='all')
|
|
eval_loader = DataLoader(
|
|
dataset=eval_dataset,
|
|
batch_size=self.batch_size,
|
|
shuffle=False,
|
|
num_workers=self.num_workers,
|
|
pin_memory=True)
|
|
return eval_loader
|
|
elif mode == 'test':
|
|
test_dataset = animal_dataset(self.root, transform=self.transform_test, mode='test')
|
|
test_loader = DataLoader(
|
|
dataset=test_dataset,
|
|
batch_size=1000,
|
|
shuffle=False,
|
|
num_workers=self.num_workers,
|
|
pin_memory=True)
|
|
return test_loader
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|