""" # Copyright 2020 Adobe # All Rights Reserved. # NOTICE: Adobe permits you to use, modify, and distribute this file in # accordance with the terms of the Adobe license agreement accompanying # it. """ import numpy as np import os import ffmpeg import cv2 import face_alignment from src.dataset.utils import icp class Point: def __init__(self, x, y): self.x = x self.y = y class ShapeParts: def __init__(self, np_pts): self.data = np_pts def part(self, idx): return Point(self.data[idx, 0], self.data[idx, 1]) class Av2Flau_Convertor(): """ Any video to facial landmark and audio numpy data converter. """ def __init__(self, video_dir, out_dir, idx=0): self.video_dir = video_dir if ('\\' in video_dir): self.video_name = video_dir.split('\\')[-1] else: self.video_name = video_dir.split('/')[-1] self.out_dir = out_dir self.idx = idx self.input_format = self.video_dir[-4:] # landmark predictor = FANet self.predictor = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, device='cuda', flip_input=True) # landmark register self.t_shape_idx = (27, 28, 29, 30, 33, 36, 39, 42, 45) def convert(self, max_num_frames=250, save_audio=False, show=False, register=False): # Step 1: preclean video: check stream==2, convert fps/sample_rate, ret, wfn = self.__preclean_video__() if (not ret): return # Step 2: detect facial landmark wfn = self.video_dir.replace(self.input_format, '_preclean.mp4') ret, fl2d, fl3d = self.__video_facial_landmark_detection__(video_dir=wfn, display=False, max_num_frames=max_num_frames) if (not ret): return if (len(fl3d) < 9): print('The length of the landmark is too short, skip') return # Step 3: raw save landmark / audio fl3d = np.array(fl3d) np.savetxt(os.path.join(self.out_dir, 'raw_fl3d/fan_{:05d}_{}_3d.txt'.format(self.idx, self.video_name[:-4])), fl3d, fmt='%.2f') if (save_audio): self.__save_audio__(video_dir=self.video_dir.replace(self.input_format, '_preclean.mp4'), fl3d=fl3d) # Step 3.5: merge a/v together (optional) if (show): sf, ef = (fl3d[0][0], fl3d[-1][0]) if fl3d.shape[0] > 0 else (0, 0) print(sf, ef) print(self.video_dir.replace(self.input_format, '_fl_detect.mp4'), os.path.join(self.out_dir, 'tmp_v', '{:05d}_{}_fl_av.mp4'.format( self.idx, self.video_name[:-4])) ) self.__ffmpeg_merge_av__( video_dir=self.video_dir.replace(self.input_format, '_fl_detect.mp4'), audio_dir=self.video_dir.replace(self.input_format, '_preclean.mp4'), WriteFileName=os.path.join(self.out_dir, 'tmp_v', '{:05d}_{}_fl_av.mp4'.format( self.idx, self.video_name[:-4])), start_end_frame=(int(sf), int(ef))) # Step 4: remove tmp files os.remove(self.video_dir.replace(self.input_format, '_preclean.mp4')) if(os.path.isfile(self.video_dir.replace(self.input_format, '_fl_detect.mp4'))): os.remove(self.video_dir.replace(self.input_format, '_fl_detect.mp4')) # Step 5: register fl3d if (register): self.__single_landmark_3d_register__(fl3d) # TODO: visualize register fl3d ''' ======================================================================== STEP 1: Preclean video ======================================================================== ''' def __preclean_video__(self, WriteFileName='_preclean.mp4', fps=25, sample_rate=16000): ''' Pre-clean downloaded videos. Return false if more than 2 streams found. Then convert it to fps=25, sample_rate=16kHz ''' input_video_dir = self.video_dir if '_x_' not in self.video_dir else self.video_dir.replace('_x_', '/') probe = ffmpeg.probe(input_video_dir) # print(probe['streams']) # print(len(probe['streams'])) # if(len(probe['streams']) != 2): # print('Error: not valid for # of a/v channel == 2.') # return False, None # exit(0) # probe['streams'] = probe['streams'][0::2] codec = {'video': '', 'audio': ''} for i, stream in enumerate(probe['streams'][0:2]): codec[stream['codec_type']] = stream['codec_name'] # create preclean video ( ffmpeg .input(input_video_dir) .output(self.video_dir.replace(self.input_format, WriteFileName), # vcodec=codec['video'], # acodec=codec['audio'], r=fps, ar=sample_rate) .overwrite_output().global_args('-loglevel', 'quiet') .run() ) return True, self.video_dir.replace(self.input_format, WriteFileName) ''' ======================================================================== STEP 2: Detect facial landmark ======================================================================== ''' def __video_facial_landmark_detection__(self, video_dir=None, display=False, WriteFileName='_fl_detect.mp4', max_num_frames=250, write=False): ''' Get facial landmark from video. ''' # load video print('video_dir : ' + video_dir) video = cv2.VideoCapture(video_dir) # return false if cannot open if (video.isOpened() == False): print('Unable to open video file') return False, None # display info length = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) w = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) print('Process Video {}, len: {}, FPS: {:.2f}, W X H: {} x {}'.format(video_dir, length, fps, w, h)) if(write): writer = cv2.VideoWriter(self.video_dir.replace(self.input_format, WriteFileName), cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'), fps, (w, h)) video_facial_landmark = [] # face-landmark np array per frame =: idx + [x,y] * 68 video_facial_landmark_3d = [] # face-landmark np array per frame =: idx + [x,y,z] * 68 frame_id = 0 not_detected_frames = 0 while (video.isOpened()): ret, frame = video.read() # reach EOF if (ret == False): break # too many not-detected frames (in middle of video) if (not_detected_frames > 5): if (len(video_facial_landmark) < 10): # at beginning of the video video_facial_landmark = [] video_facial_landmark_3d = [] else: break # dlib facial landmark detect img_ret, shape, shape_3d = self.__image_facial_landmark_detection__(img=frame) # successfully detected if (img_ret): # print('\t ==> frame {}/{}'.format(frame_id, length)) # current frame xy coordinates xys = [] for part_i in range(68): xys.append(shape.part(part_i).x) xys.append(shape.part(part_i).y) # check any not_detected_frames, and interp them if (not_detected_frames > 0 and len(video_facial_landmark) > 0): # interpolate def interp(last, cur, num, dims=68 * 2 + 1): interp_xys_np = np.zeros((num, dims)) for dim in range(dims): interp_xys_np[:, dim] = np.interp(np.arange(0, num), [-1, num], [last[dim], cur[dim]]) interp_xys_np = np.round(interp_xys_np).astype('int') interp_xys = [list(xy) for xy in interp_xys_np] return interp_xys interp_xys = interp(video_facial_landmark[-1], [frame_id] + xys, not_detected_frames) video_facial_landmark += interp_xys not_detected_frames = 0 # save landmark/frame_index video_facial_landmark.append([frame_id] + xys) if (shape_3d.any()): video_facial_landmark_3d.append([frame_id] + list(np.reshape(shape_3d, -1))) if(write): frame = self.__vis_landmark_on_img__(frame, shape) else: print('\t ==> frame {}/{} Not detected'.format(frame_id, length)) not_detected_frames += 1 if (display): cv2.imshow('Frame', frame) if (cv2.waitKey(10) == ord('q')): break if(write): writer.write(frame) frame_id += 1 if(frame_id > max_num_frames): break video.release() if(write): writer.release() cv2.destroyAllWindows() print('\t ==> Final processed frames {}/{}'.format(frame_id, length)) return True, video_facial_landmark, video_facial_landmark_3d def __image_facial_landmark_detection__(self, img=None): ''' Get facial landmark from single image by FANet ''' shapes = self.predictor.get_landmarks(img) if (not shapes): return False, None, None max_size_idx = 0 shape = ShapeParts(shapes[max_size_idx][:, 0:2]) shape_3d = shapes[max_size_idx] # when use 2d estimator shape_3d = np.concatenate([shape_3d, np.ones(shape=(68, 1))], axis=1) return True, shape, shape_3d def __vis_landmark_on_img__(self, img, shape, linewidth=2): ''' Visualize landmark on images. ''' if (type(shape) == ShapeParts): def draw_curve(idx_list, color=(0, 255, 0), loop=False, lineWidth=linewidth): for i in idx_list: cv2.line(img, (shape.part(i).x, shape.part(i).y), (shape.part(i + 1).x, shape.part(i + 1).y), color, lineWidth) if (loop): cv2.line(img, (shape.part(idx_list[0]).x, shape.part(idx_list[0]).y), (shape.part(idx_list[-1] + 1).x, shape.part(idx_list[-1] + 1).y), color, lineWidth) draw_curve(list(range(0, 16))) # jaw draw_curve(list(range(17, 21))) # eye brow draw_curve(list(range(22, 26))) draw_curve(list(range(27, 35))) # nose draw_curve(list(range(36, 41)), loop=True) # eyes draw_curve(list(range(42, 47)), loop=True) draw_curve(list(range(48, 59)), loop=True) # mouth draw_curve(list(range(60, 67)), loop=True) else: def draw_curve(idx_list, color=(0, 255, 0), loop=False, lineWidth=linewidth): for i in idx_list: cv2.line(img, (shape[i, 0], shape[i, 1]), (shape[i + 1, 0], shape[i + 1, 1]), color, lineWidth) if (loop): cv2.line(img, (shape[idx_list[0], 0], shape[idx_list[0], 1]), (shape[idx_list[-1] + 1, 0], shape[idx_list[-1] + 1, 1]), color, lineWidth) draw_curve(list(range(0, 16))) # jaw draw_curve(list(range(17, 21))) # eye brow draw_curve(list(range(22, 26))) draw_curve(list(range(27, 35))) # nose draw_curve(list(range(36, 41)), loop=True) # eyes draw_curve(list(range(42, 47)), loop=True) draw_curve(list(range(48, 59)), loop=True) # mouth draw_curve(list(range(60, 67)), loop=True) return img def __ffmpeg_merge_av__(self, video_dir, audio_dir, WriteFileName, start_end_frame): probe = ffmpeg.probe(video_dir) fps = probe['streams'][0]['avg_frame_rate'] spf = float(fps.split('/')[1]) / float(fps.split('/')[0]) sf, ef = start_end_frame st, tt = sf * spf, ef * spf - sf * spf vin = ffmpeg.input(video_dir).video # ain = ffmpeg.input(audio_dir).audio # out = ffmpeg.output(vin, ain, WriteFileName, codec='copy', ss=st, t=tt, shortest=None) out = ffmpeg.output(vin, WriteFileName, codec='copy', ss=st, t=tt, shortest=None) out = out.overwrite_output().global_args('-loglevel', 'quiet') out.run() # os.system('ffmpeg -i {} -codec copy -ss {} -t {} {}'.format(video_dir, st, tt, WriteFileName)) def __save_audio__(self, video_dir, fl3d): """ Extract audio from preclean video. Used for creating audio-aware dataset. """ sf, ef = fl3d[0][0], fl3d[-1][0] probe = ffmpeg.probe(video_dir) fps = probe['streams'][0]['avg_frame_rate'] spf = float(fps.split('/')[1]) / float(fps.split('/')[0]) st, tt = sf * spf, ef * spf - sf * spf audio_dir = os.path.join(self.out_dir, 'raw_wav', '{:05d}_{}_audio.wav'.format(self.idx, self.video_name[:-4])) ( ffmpeg .input(video_dir) .output(audio_dir, ss=st, t=tt) .overwrite_output().global_args('-loglevel', 'quiet') .run() ) ''' ======================================================================== STEP 5: Landmark register ======================================================================== ''' def __single_landmark_3d_register__(self, fl3d, display=False): """ Register a single 3d landmark file """ # Step 1 : Load and Smooth from scipy.signal import savgol_filter lines = savgol_filter(fl3d, 7, 3, axis=0) all_landmarks = lines[:, 1:].reshape((-1, 68, 3)) # remove frame idx w, h = int(np.max(all_landmarks[:, :, 0])) + 20, int(np.max(all_landmarks[:, :, 1])) + 20 # Step 2 : setup anchor face print('Using exisiting ' + 'dataset/utils/ANCHOR_T_SHAPE_{}.txt'.format(len(self.t_shape_idx))) anchor_t_shape = np.loadtxt('dataset/utils/ANCHOR_T_SHAPE_{}.txt'.format(len(self.t_shape_idx))) registered_landmarks_to_save = [] registered_affine_mat_to_save = [] # for each line for line in lines: frame_id = line[0] landmarks = line[1:].reshape(68, 3) # Step 3 : ICP on (frame, anchor) frame_t_shape = landmarks[self.t_shape_idx, :] T, distance, itr = icp(frame_t_shape, anchor_t_shape) # Step 4 : Affine transform landmarks = np.hstack((landmarks, np.ones((68, 1)))) registered_landmarks = np.dot(T, landmarks.T).T err = np.mean(np.sqrt(np.sum((registered_landmarks[self.t_shape_idx, 0:3] - anchor_t_shape) ** 2, axis=1))) # print(err, distance, itr) # Step 5 : Save is requested registered_landmarks_to_save.append([frame_id] + list(registered_landmarks[:, 0:3].reshape(-1))) registered_affine_mat_to_save.append([frame_id] + list(T.reshape(-1))) # Step 5.5 (optional) : visualize ori / registered faces (Isolated in Black BG) if (display): img = np.zeros((h, w * 2, 3), np.uint8) self.__vis_landmark_on_img__(img, landmarks.astype(np.int)) registered_landmarks[:, 0] += w self.__vis_landmark_on_img__(img, registered_landmarks.astype(np.int)) cv2.imshow('img', img) if (cv2.waitKey(30) == ord('q')): break np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_fl_sm.txt' .format(self.idx, self.video_name[:-4])), lines, fmt='%.6f') np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_fl_reg.txt' .format(self.idx, self.video_name[:-4])), np.array(registered_landmarks_to_save), fmt='%.6f') np.savetxt(os.path.join(self.out_dir, 'register_fl3d', '{:05d}_{}_mat_reg.txt' .format(self.idx, self.video_name[:-4])), np.array(registered_affine_mat_to_save), fmt='%.6f') if __name__ == '__main__': video_dir = r'C:\Users\yangzhou\Videos\004_1.mp4' out_dir = r'C:\Users\yangzhou\Videos' c = Av2Flau_Convertor(video_dir, out_dir, idx=0) c.convert()