Spaces:
Sleeping
Sleeping
""" | |
# Copyright 2020 Adobe | |
# All Rights Reserved. | |
# NOTICE: Adobe permits you to use, modify, and distribute this file in | |
# accordance with the terms of the Adobe license agreement accompanying | |
# it. | |
""" | |
import numpy as np | |
import os | |
import ffmpeg | |
import cv2 | |
import face_alignment | |
from src.dataset.utils import icp | |
class Point: | |
def __init__(self, x, y): | |
self.x = x | |
self.y = y | |
class ShapeParts: | |
def __init__(self, np_pts): | |
self.data = np_pts | |
def part(self, idx): | |
return Point(self.data[idx, 0], self.data[idx, 1]) | |
class Av2Flau_Convertor(): | |
""" | |
Any video to facial landmark and audio numpy data converter. | |
""" | |
def __init__(self, video_dir, out_dir, idx=0): | |
self.video_dir = video_dir | |
if ('\\' in video_dir): | |
self.video_name = video_dir.split('\\')[-1] | |
else: | |
self.video_name = video_dir.split('/')[-1] | |
self.out_dir = out_dir | |
self.idx = idx | |
self.input_format = self.video_dir[-4:] | |
# landmark predictor = FANet | |
self.predictor = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, device='cuda', flip_input=True) | |
# landmark register | |
self.t_shape_idx = (27, 28, 29, 30, 33, 36, 39, 42, 45) | |
def convert(self, max_num_frames=250, save_audio=False, show=False, register=False): | |
# Step 1: preclean video: check stream==2, convert fps/sample_rate, | |
ret, wfn = self.__preclean_video__() | |
if (not ret): | |
return | |
# Step 2: detect facial landmark | |
wfn = self.video_dir.replace(self.input_format, '_preclean.mp4') | |
ret, fl2d, fl3d = self.__video_facial_landmark_detection__(video_dir=wfn, display=False, max_num_frames=max_num_frames) | |
if (not ret): | |
return | |
if (len(fl3d) < 9): | |
print('The length of the landmark is too short, skip') | |
return | |
# Step 3: raw save landmark / audio | |
fl3d = np.array(fl3d) | |
np.savetxt(os.path.join(self.out_dir, 'raw_fl3d/fan_{:05d}_{}_3d.txt'.format(self.idx, self.video_name[:-4])), | |
fl3d, fmt='%.2f') | |
if (save_audio): | |
self.__save_audio__(video_dir=self.video_dir.replace(self.input_format, '_preclean.mp4'), fl3d=fl3d) | |
# Step 3.5: merge a/v together (optional) | |
if (show): | |
sf, ef = (fl3d[0][0], fl3d[-1][0]) if fl3d.shape[0] > 0 else (0, 0) | |
print(sf, ef) | |
print(self.video_dir.replace(self.input_format, '_fl_detect.mp4'), | |
os.path.join(self.out_dir, 'tmp_v', '{:05d}_{}_fl_av.mp4'.format( | |
self.idx, self.video_name[:-4])) | |
) | |
self.__ffmpeg_merge_av__( | |
video_dir=self.video_dir.replace(self.input_format, '_fl_detect.mp4'), | |
audio_dir=self.video_dir.replace(self.input_format, '_preclean.mp4'), | |
WriteFileName=os.path.join(self.out_dir, 'tmp_v', '{:05d}_{}_fl_av.mp4'.format( | |
self.idx, self.video_name[:-4])), | |
start_end_frame=(int(sf), int(ef))) | |
# Step 4: remove tmp files | |
os.remove(self.video_dir.replace(self.input_format, '_preclean.mp4')) | |
if(os.path.isfile(self.video_dir.replace(self.input_format, '_fl_detect.mp4'))): | |
os.remove(self.video_dir.replace(self.input_format, '_fl_detect.mp4')) | |
# Step 5: register fl3d | |
if (register): | |
self.__single_landmark_3d_register__(fl3d) | |
# TODO: visualize register fl3d | |
''' ======================================================================== | |
STEP 1: Preclean video | |
======================================================================== ''' | |
def __preclean_video__(self, WriteFileName='_preclean.mp4', fps=25, sample_rate=16000): | |
''' | |
Pre-clean downloaded videos. Return false if more than 2 streams found. | |
Then convert it to fps=25, sample_rate=16kHz | |
''' | |
input_video_dir = self.video_dir if '_x_' not in self.video_dir else self.video_dir.replace('_x_', '/') | |
probe = ffmpeg.probe(input_video_dir) | |
# print(probe['streams']) | |
# print(len(probe['streams'])) | |
# if(len(probe['streams']) != 2): | |
# print('Error: not valid for # of a/v channel == 2.') | |
# return False, None | |
# exit(0) | |
# probe['streams'] = probe['streams'][0::2] | |
codec = {'video': '', 'audio': ''} | |
for i, stream in enumerate(probe['streams'][0:2]): | |
codec[stream['codec_type']] = stream['codec_name'] | |
# create preclean video | |
( | |
ffmpeg | |
.input(input_video_dir) | |
.output(self.video_dir.replace(self.input_format, WriteFileName), | |
# vcodec=codec['video'], | |
# acodec=codec['audio'], | |
r=fps, ar=sample_rate) | |
.overwrite_output().global_args('-loglevel', 'quiet') | |
.run() | |
) | |
return True, self.video_dir.replace(self.input_format, WriteFileName) | |
''' ======================================================================== | |
STEP 2: Detect facial landmark | |
======================================================================== ''' | |
def __video_facial_landmark_detection__(self, video_dir=None, display=False, WriteFileName='_fl_detect.mp4', | |
max_num_frames=250, write=False): | |
''' | |
Get facial landmark from video. | |
''' | |
# load video | |
print('video_dir : ' + video_dir) | |
video = cv2.VideoCapture(video_dir) | |
# return false if cannot open | |
if (video.isOpened() == False): | |
print('Unable to open video file') | |
return False, None | |
# display info | |
length = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = video.get(cv2.CAP_PROP_FPS) | |
w = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
h = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
print('Process Video {}, len: {}, FPS: {:.2f}, W X H: {} x {}'.format(video_dir, length, fps, w, h)) | |
if(write): | |
writer = cv2.VideoWriter(self.video_dir.replace(self.input_format, WriteFileName), | |
cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'), fps, (w, h)) | |
video_facial_landmark = [] # face-landmark np array per frame =: idx + [x,y] * 68 | |
video_facial_landmark_3d = [] # face-landmark np array per frame =: idx + [x,y,z] * 68 | |
frame_id = 0 | |
not_detected_frames = 0 | |
while (video.isOpened()): | |
ret, frame = video.read() | |
# reach EOF | |
if (ret == False): | |
break | |
# too many not-detected frames (in middle of video) | |
if (not_detected_frames > 5): | |
if (len(video_facial_landmark) < 10): | |
# at beginning of the video | |
video_facial_landmark = [] | |
video_facial_landmark_3d = [] | |
else: | |
break | |
# dlib facial landmark detect | |
img_ret, shape, shape_3d = self.__image_facial_landmark_detection__(img=frame) | |
# successfully detected | |
if (img_ret): | |
# print('\t ==> frame {}/{}'.format(frame_id, length)) | |
# current frame xy coordinates | |
xys = [] | |
for part_i in range(68): | |
xys.append(shape.part(part_i).x) | |
xys.append(shape.part(part_i).y) | |
# check any not_detected_frames, and interp them | |
if (not_detected_frames > 0 and len(video_facial_landmark) > 0): | |
# interpolate | |
def interp(last, cur, num, dims=68 * 2 + 1): | |
interp_xys_np = np.zeros((num, dims)) | |
for dim in range(dims): | |
interp_xys_np[:, dim] = np.interp(np.arange(0, num), [-1, num], [last[dim], cur[dim]]) | |
interp_xys_np = np.round(interp_xys_np).astype('int') | |
interp_xys = [list(xy) for xy in interp_xys_np] | |
return interp_xys | |
interp_xys = interp(video_facial_landmark[-1], [frame_id] + xys, not_detected_frames) | |
video_facial_landmark += interp_xys | |
not_detected_frames = 0 | |
# save landmark/frame_index | |
video_facial_landmark.append([frame_id] + xys) | |
if (shape_3d.any()): | |
video_facial_landmark_3d.append([frame_id] + list(np.reshape(shape_3d, -1))) | |
if(write): | |
frame = self.__vis_landmark_on_img__(frame, shape) | |
else: | |
print('\t ==> frame {}/{} Not detected'.format(frame_id, length)) | |
not_detected_frames += 1 | |
if (display): | |
cv2.imshow('Frame', frame) | |
if (cv2.waitKey(10) == ord('q')): | |
break | |
if(write): | |
writer.write(frame) | |
frame_id += 1 | |
if(frame_id > max_num_frames): | |
break | |
video.release() | |
if(write): | |
writer.release() | |
cv2.destroyAllWindows() | |
print('\t ==> Final processed frames {}/{}'.format(frame_id, length)) | |
return True, video_facial_landmark, video_facial_landmark_3d | |
def __image_facial_landmark_detection__(self, img=None): | |
''' | |
Get facial landmark from single image by FANet | |
''' | |
shapes = self.predictor.get_landmarks(img) | |
if (not shapes): | |
return False, None, None | |
max_size_idx = 0 | |
shape = ShapeParts(shapes[max_size_idx][:, 0:2]) | |
shape_3d = shapes[max_size_idx] | |
# when use 2d estimator | |
shape_3d = np.concatenate([shape_3d, np.ones(shape=(68, 1))], axis=1) | |
return True, shape, shape_3d | |
def __vis_landmark_on_img__(self, img, shape, linewidth=2): | |
''' | |
Visualize landmark on images. | |
''' | |
if (type(shape) == ShapeParts): | |
def draw_curve(idx_list, color=(0, 255, 0), loop=False, lineWidth=linewidth): | |
for i in idx_list: | |
cv2.line(img, (shape.part(i).x, shape.part(i).y), (shape.part(i + 1).x, shape.part(i + 1).y), | |
color, lineWidth) | |
if (loop): | |
cv2.line(img, (shape.part(idx_list[0]).x, shape.part(idx_list[0]).y), | |
(shape.part(idx_list[-1] + 1).x, shape.part(idx_list[-1] + 1).y), color, lineWidth) | |
draw_curve(list(range(0, 16))) # jaw | |
draw_curve(list(range(17, 21))) # eye brow | |
draw_curve(list(range(22, 26))) | |
draw_curve(list(range(27, 35))) # nose | |
draw_curve(list(range(36, 41)), loop=True) # eyes | |
draw_curve(list(range(42, 47)), loop=True) | |
draw_curve(list(range(48, 59)), loop=True) # mouth | |
draw_curve(list(range(60, 67)), loop=True) | |
else: | |
def draw_curve(idx_list, color=(0, 255, 0), loop=False, lineWidth=linewidth): | |
for i in idx_list: | |
cv2.line(img, (shape[i, 0], shape[i, 1]), (shape[i + 1, 0], shape[i + 1, 1]), color, lineWidth) | |
if (loop): | |
cv2.line(img, (shape[idx_list[0], 0], shape[idx_list[0], 1]), | |
(shape[idx_list[-1] + 1, 0], shape[idx_list[-1] + 1, 1]), color, lineWidth) | |
draw_curve(list(range(0, 16))) # jaw | |
draw_curve(list(range(17, 21))) # eye brow | |
draw_curve(list(range(22, 26))) | |
draw_curve(list(range(27, 35))) # nose | |
draw_curve(list(range(36, 41)), loop=True) # eyes | |
draw_curve(list(range(42, 47)), loop=True) | |
draw_curve(list(range(48, 59)), loop=True) # mouth | |
draw_curve(list(range(60, 67)), loop=True) | |
return img | |
def __ffmpeg_merge_av__(self, video_dir, audio_dir, WriteFileName, start_end_frame): | |
probe = ffmpeg.probe(video_dir) | |
fps = probe['streams'][0]['avg_frame_rate'] | |
spf = float(fps.split('/')[1]) / float(fps.split('/')[0]) | |
sf, ef = start_end_frame | |
st, tt = sf * spf, ef * spf - sf * spf | |
vin = ffmpeg.input(video_dir).video | |
# ain = ffmpeg.input(audio_dir).audio | |
# out = ffmpeg.output(vin, ain, WriteFileName, codec='copy', ss=st, t=tt, shortest=None) | |
out = ffmpeg.output(vin, WriteFileName, codec='copy', ss=st, t=tt, shortest=None) | |
out = out.overwrite_output().global_args('-loglevel', 'quiet') | |
out.run() | |
# os.system('ffmpeg -i {} -codec copy -ss {} -t {} {}'.format(video_dir, st, tt, WriteFileName)) | |
def __save_audio__(self, video_dir, fl3d): | |
""" | |
Extract audio from preclean video. Used for creating audio-aware dataset. | |
""" | |
sf, ef = fl3d[0][0], fl3d[-1][0] | |
probe = ffmpeg.probe(video_dir) | |
fps = probe['streams'][0]['avg_frame_rate'] | |
spf = float(fps.split('/')[1]) / float(fps.split('/')[0]) | |
st, tt = sf * spf, ef * spf - sf * spf | |
audio_dir = os.path.join(self.out_dir, 'raw_wav', '{:05d}_{}_audio.wav'.format(self.idx, self.video_name[:-4])) | |
( | |
ffmpeg | |
.input(video_dir) | |
.output(audio_dir, ss=st, t=tt) | |
.overwrite_output().global_args('-loglevel', 'quiet') | |
.run() | |
) | |
''' ======================================================================== | |
STEP 5: Landmark register | |
======================================================================== ''' | |
def __single_landmark_3d_register__(self, fl3d, display=False): | |
""" | |
Register a single 3d landmark file | |
""" | |
# Step 1 : Load and Smooth | |
from scipy.signal import savgol_filter | |
lines = savgol_filter(fl3d, 7, 3, axis=0) | |
all_landmarks = lines[:, 1:].reshape((-1, 68, 3)) # remove frame idx | |
w, h = int(np.max(all_landmarks[:, :, 0])) + 20, int(np.max(all_landmarks[:, :, 1])) + 20 | |
# Step 2 : setup anchor face | |
print('Using exisiting ' + 'dataset/utils/ANCHOR_T_SHAPE_{}.txt'.format(len(self.t_shape_idx))) | |
anchor_t_shape = np.loadtxt('dataset/utils/ANCHOR_T_SHAPE_{}.txt'.format(len(self.t_shape_idx))) | |
registered_landmarks_to_save = [] | |
registered_affine_mat_to_save = [] | |
# for each line | |
for line in lines: | |
frame_id = line[0] | |
landmarks = line[1:].reshape(68, 3) | |
# Step 3 : ICP on (frame, anchor) | |
frame_t_shape = landmarks[self.t_shape_idx, :] | |
T, distance, itr = icp(frame_t_shape, anchor_t_shape) | |
# Step 4 : Affine transform | |
landmarks = np.hstack((landmarks, np.ones((68, 1)))) | |
registered_landmarks = np.dot(T, landmarks.T).T | |
err = np.mean(np.sqrt(np.sum((registered_landmarks[self.t_shape_idx, 0:3] - anchor_t_shape) ** 2, axis=1))) | |
# print(err, distance, itr) | |
# Step 5 : Save is requested | |
registered_landmarks_to_save.append([frame_id] + list(registered_landmarks[:, 0:3].reshape(-1))) | |
registered_affine_mat_to_save.append([frame_id] + list(T.reshape(-1))) | |
# Step 5.5 (optional) : visualize ori / registered faces (Isolated in Black BG) | |
if (display): | |
img = np.zeros((h, w * 2, 3), np.uint8) | |
self.__vis_landmark_on_img__(img, landmarks.astype(np.int)) | |
registered_landmarks[:, 0] += w | |
self.__vis_landmark_on_img__(img, registered_landmarks.astype(np.int)) | |
cv2.imshow('img', img) | |
if (cv2.waitKey(30) == ord('q')): | |
break | |
np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_fl_sm.txt' | |
.format(self.idx, self.video_name[:-4])), | |
lines, fmt='%.6f') | |
np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_fl_reg.txt' | |
.format(self.idx, self.video_name[:-4])), | |
np.array(registered_landmarks_to_save), fmt='%.6f') | |
np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_mat_reg.txt' | |
.format(self.idx, self.video_name[:-4])), | |
np.array(registered_affine_mat_to_save), fmt='%.6f') | |
if __name__ == '__main__': | |
video_dir = r'C:\Users\yangzhou\Videos\004_1.mp4' | |
out_dir = r'C:\Users\yangzhou\Videos' | |
c = Av2Flau_Convertor(video_dir, out_dir, idx=0) | |
c.convert() | |