Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import sys | |
import cv2 | |
import math | |
import copy | |
import hashlib | |
import imageio | |
import numpy as np | |
import pandas as pd | |
from scipy import interpolate | |
from PIL import Image, ImageEnhance, ImageFile | |
import torch | |
import torch.nn.functional as F | |
from torch.utils.data import Dataset | |
ImageFile.LOAD_TRUNCATED_IMAGES = True | |
sys.path.append("./") | |
from external.landmark_detection.lib.dataset.augmentation import Augmentation | |
from external.landmark_detection.lib.dataset.encoder import get_encoder | |
class AlignmentDataset(Dataset): | |
def __init__(self, tsv_flie, image_dir="", transform=None, | |
width=256, height=256, channels=3, | |
means=(127.5, 127.5, 127.5), scale=1 / 127.5, | |
classes_num=None, crop_op=True, aug_prob=0.0, edge_info=None, flip_mapping=None, is_train=True, | |
encoder_type='default', | |
): | |
super(AlignmentDataset, self).__init__() | |
self.use_AAM = True | |
self.encoder_type = encoder_type | |
self.encoder = get_encoder(height, width, encoder_type=encoder_type) | |
self.items = pd.read_csv(tsv_flie, sep="\t") | |
self.image_dir = image_dir | |
self.landmark_num = classes_num[0] | |
self.transform = transform | |
self.image_width = width | |
self.image_height = height | |
self.channels = channels | |
assert self.image_width == self.image_height | |
self.means = means | |
self.scale = scale | |
self.aug_prob = aug_prob | |
self.edge_info = edge_info | |
self.is_train = is_train | |
std_lmk_5pts = np.array([ | |
196.0, 226.0, | |
316.0, 226.0, | |
256.0, 286.0, | |
220.0, 360.4, | |
292.0, 360.4], np.float32) / 256.0 - 1.0 | |
std_lmk_5pts = np.reshape(std_lmk_5pts, (5, 2)) # [-1 1] | |
target_face_scale = 1.0 if crop_op else 1.25 | |
self.augmentation = Augmentation( | |
is_train=self.is_train, | |
aug_prob=self.aug_prob, | |
image_size=self.image_width, | |
crop_op=crop_op, | |
std_lmk_5pts=std_lmk_5pts, | |
target_face_scale=target_face_scale, | |
flip_rate=0.5, | |
flip_mapping=flip_mapping, | |
random_shift_sigma=0.05, | |
random_rot_sigma=math.pi / 180 * 18, | |
random_scale_sigma=0.1, | |
random_gray_rate=0.2, | |
random_occ_rate=0.4, | |
random_blur_rate=0.3, | |
random_gamma_rate=0.2, | |
random_nose_fusion_rate=0.2) | |
def _circle(self, img, pt, sigma=1.0, label_type='Gaussian'): | |
# Check that any part of the gaussian is in-bounds | |
tmp_size = sigma * 3 | |
ul = [int(pt[0] - tmp_size), int(pt[1] - tmp_size)] | |
br = [int(pt[0] + tmp_size + 1), int(pt[1] + tmp_size + 1)] | |
if (ul[0] > img.shape[1] - 1 or ul[1] > img.shape[0] - 1 or | |
br[0] - 1 < 0 or br[1] - 1 < 0): | |
# If not, just return the image as is | |
return img | |
# Generate gaussian | |
size = 2 * tmp_size + 1 | |
x = np.arange(0, size, 1, np.float32) | |
y = x[:, np.newaxis] | |
x0 = y0 = size // 2 | |
# The gaussian is not normalized, we want the center value to equal 1 | |
if label_type == 'Gaussian': | |
g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * sigma ** 2)) | |
else: | |
g = sigma / (((x - x0) ** 2 + (y - y0) ** 2 + sigma ** 2) ** 1.5) | |
# Usable gaussian range | |
g_x = max(0, -ul[0]), min(br[0], img.shape[1]) - ul[0] | |
g_y = max(0, -ul[1]), min(br[1], img.shape[0]) - ul[1] | |
# Image range | |
img_x = max(0, ul[0]), min(br[0], img.shape[1]) | |
img_y = max(0, ul[1]), min(br[1], img.shape[0]) | |
img[img_y[0]:img_y[1], img_x[0]:img_x[1]] = 255 * g[g_y[0]:g_y[1], g_x[0]:g_x[1]] | |
return img | |
def _polylines(self, img, lmks, is_closed, color=255, thickness=1, draw_mode=cv2.LINE_AA, | |
interpolate_mode=cv2.INTER_AREA, scale=4): | |
h, w = img.shape | |
img_scale = cv2.resize(img, (w * scale, h * scale), interpolation=interpolate_mode) | |
lmks_scale = (lmks * scale + 0.5).astype(np.int32) | |
cv2.polylines(img_scale, [lmks_scale], is_closed, color, thickness * scale, draw_mode) | |
img = cv2.resize(img_scale, (w, h), interpolation=interpolate_mode) | |
return img | |
def _generate_edgemap(self, points, scale=0.25, thickness=1): | |
h, w = self.image_height, self.image_width | |
edgemaps = [] | |
for is_closed, indices in self.edge_info: | |
edgemap = np.zeros([h, w], dtype=np.float32) | |
# align_corners: False. | |
part = copy.deepcopy(points[np.array(indices)]) | |
part = self._fit_curve(part, is_closed) | |
part[:, 0] = np.clip(part[:, 0], 0, w - 1) | |
part[:, 1] = np.clip(part[:, 1], 0, h - 1) | |
edgemap = self._polylines(edgemap, part, is_closed, 255, thickness) | |
edgemaps.append(edgemap) | |
edgemaps = np.stack(edgemaps, axis=0) / 255.0 | |
edgemaps = torch.from_numpy(edgemaps).float().unsqueeze(0) | |
edgemaps = F.interpolate(edgemaps, size=(int(w * scale), int(h * scale)), mode='bilinear', | |
align_corners=False).squeeze() | |
return edgemaps | |
def _fit_curve(self, lmks, is_closed=False, density=5): | |
try: | |
x = lmks[:, 0].copy() | |
y = lmks[:, 1].copy() | |
if is_closed: | |
x = np.append(x, x[0]) | |
y = np.append(y, y[0]) | |
tck, u = interpolate.splprep([x, y], s=0, per=is_closed, k=3) | |
# bins = (x.shape[0] - 1) * density + 1 | |
# lmk_x, lmk_y = interpolate.splev(np.linspace(0, 1, bins), f) | |
intervals = np.array([]) | |
for i in range(len(u) - 1): | |
intervals = np.concatenate((intervals, np.linspace(u[i], u[i + 1], density, endpoint=False))) | |
if not is_closed: | |
intervals = np.concatenate((intervals, [u[-1]])) | |
lmk_x, lmk_y = interpolate.splev(intervals, tck, der=0) | |
# der_x, der_y = interpolate.splev(intervals, tck, der=1) | |
curve_lmks = np.stack([lmk_x, lmk_y], axis=-1) | |
# curve_ders = np.stack([der_x, der_y], axis=-1) | |
# origin_indices = np.arange(0, curve_lmks.shape[0], density) | |
return curve_lmks | |
except: | |
return lmks | |
def _image_id(self, image_path): | |
if not os.path.exists(image_path): | |
image_path = os.path.join(self.image_dir, image_path) | |
return hashlib.md5(open(image_path, "rb").read()).hexdigest() | |
def _load_image(self, image_path): | |
if not os.path.exists(image_path): | |
image_path = os.path.join(self.image_dir, image_path) | |
try: | |
# img = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_COLOR)#HWC, BGR, [0-255] | |
img = cv2.imread(image_path, cv2.IMREAD_COLOR) # HWC, BGR, [0-255] | |
assert img is not None and len(img.shape) == 3 and img.shape[2] == 3 | |
except: | |
try: | |
img = imageio.imread(image_path) # HWC, RGB, [0-255] | |
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # HWC, BGR, [0-255] | |
assert img is not None and len(img.shape) == 3 and img.shape[2] == 3 | |
except: | |
try: | |
gifImg = imageio.mimread(image_path) # BHWC, RGB, [0-255] | |
img = gifImg[0] # HWC, RGB, [0-255] | |
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # HWC, BGR, [0-255] | |
assert img is not None and len(img.shape) == 3 and img.shape[2] == 3 | |
except: | |
img = None | |
return img | |
def _compose_rotate_and_scale(self, angle, scale, shift_xy, from_center, to_center): | |
cosv = math.cos(angle) | |
sinv = math.sin(angle) | |
fx, fy = from_center | |
tx, ty = to_center | |
acos = scale * cosv | |
asin = scale * sinv | |
a0 = acos | |
a1 = -asin | |
a2 = tx - acos * fx + asin * fy + shift_xy[0] | |
b0 = asin | |
b1 = acos | |
b2 = ty - asin * fx - acos * fy + shift_xy[1] | |
rot_scale_m = np.array([ | |
[a0, a1, a2], | |
[b0, b1, b2], | |
[0.0, 0.0, 1.0] | |
], np.float32) | |
return rot_scale_m | |
def _transformPoints2D(self, points, matrix): | |
""" | |
points (nx2), matrix (3x3) -> points (nx2) | |
""" | |
dtype = points.dtype | |
# nx3 | |
points = np.concatenate([points, np.ones_like(points[:, [0]])], axis=1) | |
points = points @ np.transpose(matrix) # nx3 | |
points = points[:, :2] / points[:, [2, 2]] | |
return points.astype(dtype) | |
def _transformPerspective(self, image, matrix, target_shape): | |
""" | |
image, matrix3x3 -> transformed_image | |
""" | |
return cv2.warpPerspective( | |
image, matrix, | |
dsize=(target_shape[1], target_shape[0]), | |
flags=cv2.INTER_LINEAR, borderValue=0) | |
def _norm_points(self, points, h, w, align_corners=False): | |
if align_corners: | |
# [0, SIZE-1] -> [-1, +1] | |
des_points = points / torch.tensor([w - 1, h - 1]).to(points).view(1, 2) * 2 - 1 | |
else: | |
# [-0.5, SIZE-0.5] -> [-1, +1] | |
des_points = (points * 2 + 1) / torch.tensor([w, h]).to(points).view(1, 2) - 1 | |
des_points = torch.clamp(des_points, -1, 1) | |
return des_points | |
def _denorm_points(self, points, h, w, align_corners=False): | |
if align_corners: | |
# [-1, +1] -> [0, SIZE-1] | |
des_points = (points + 1) / 2 * torch.tensor([w - 1, h - 1]).to(points).view(1, 1, 2) | |
else: | |
# [-1, +1] -> [-0.5, SIZE-0.5] | |
des_points = ((points + 1) * torch.tensor([w, h]).to(points).view(1, 1, 2) - 1) / 2 | |
return des_points | |
def __len__(self): | |
return len(self.items) | |
def __getitem__(self, index): | |
sample = dict() | |
image_path = self.items.iloc[index, 0] | |
landmarks_5pts = self.items.iloc[index, 1] | |
landmarks_5pts = np.array(list(map(float, landmarks_5pts.split(","))), dtype=np.float32).reshape(5, 2) | |
landmarks_target = self.items.iloc[index, 2] | |
landmarks_target = np.array(list(map(float, landmarks_target.split(","))), dtype=np.float32).reshape( | |
self.landmark_num, 2) | |
scale = float(self.items.iloc[index, 3]) | |
center_w, center_h = float(self.items.iloc[index, 4]), float(self.items.iloc[index, 5]) | |
if len(self.items.iloc[index]) > 6: | |
tags = np.array(list(map(lambda x: int(float(x)), self.items.iloc[index, 6].split(",")))) | |
else: | |
tags = np.array([]) | |
# image & keypoints alignment | |
image_path = image_path.replace('\\', '/') | |
# wflw testset | |
image_path = image_path.replace( | |
'//msr-facestore/Workspace/MSRA_EP_Allergan/users/yanghuan/training_data/wflw/rawImages/', '') | |
# trainset | |
image_path = image_path.replace('./rawImages/', '') | |
image_path = os.path.join(self.image_dir, image_path) | |
# image path | |
sample["image_path"] = image_path | |
img = self._load_image(image_path) # HWC, BGR, [0, 255] | |
assert img is not None | |
# augmentation | |
# landmarks_target = [-0.5, edge-0.5] | |
img, landmarks_target, matrix = \ | |
self.augmentation.process(img, landmarks_target, landmarks_5pts, scale, center_w, center_h) | |
landmarks = self._norm_points(torch.from_numpy(landmarks_target), self.image_height, self.image_width) | |
sample["label"] = [landmarks, ] | |
if self.use_AAM: | |
pointmap = self.encoder.generate_heatmap(landmarks_target) | |
edgemap = self._generate_edgemap(landmarks_target) | |
sample["label"] += [pointmap, edgemap] | |
sample['matrix'] = matrix | |
# image normalization | |
img = img.transpose(2, 0, 1).astype(np.float32) # CHW, BGR, [0, 255] | |
img[0, :, :] = (img[0, :, :] - self.means[0]) * self.scale | |
img[1, :, :] = (img[1, :, :] - self.means[1]) * self.scale | |
img[2, :, :] = (img[2, :, :] - self.means[2]) * self.scale | |
sample["data"] = torch.from_numpy(img) # CHW, BGR, [-1, 1] | |
sample["tags"] = tags | |
return sample | |