import gradio as gr import cv2 import dlib import shutil import numpy as np import random from datetime import datetime import torch import torch.nn.functional as F from facenet_pytorch import MTCNN, InceptionResnetV1 from PIL import Image from pytorch_grad_cam import GradCAM from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget from pytorch_grad_cam.utils.image import show_cam_on_image import os import warnings import tempfile import glob from concurrent.futures import ThreadPoolExecutor import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed import re from PIL import Image from PIL.ExifTags import TAGS import tempfile import librosa import plotly.express as px import torchaudio from tortoise.models.classifier import AudioMiniEncoderWithClassifierHead warnings.filterwarnings("ignore") def inputseparation(video, image, audio): if video is not None: return save_video(video) elif image is not None: return predictimage(image) else: return audiopredict(audio) def load_audio(uploaded_file, sampling_rate=22000): # Handle MP3 files with torchaudio with tempfile.NamedTemporaryFile(delete=False) as tmp: with open(uploaded_file, 'rb') as audio_file: # Open in binary mode tmp.write(audio_file.read()) tmp_path = tmp.name audio, sr = torchaudio.load(tmp_path) audio = audio.mean(dim=0) if sr != sampling_rate: audio = torchaudio.transforms.Resample(sr, sampling_rate)(audio) audio = audio.clamp_(-1, 1) return audio.unsqueeze(0) def classify_audio_clip(clip): classifier = AudioMiniEncoderWithClassifierHead(2, spec_dim=1, embedding_dim=512, depth=5, downsample_factor=4, resnet_blocks=2, attn_blocks=4, num_attn_heads=4, base_channels=32, dropout=0, kernel_size=5, distribute_zero_label=False) state_dict = torch.load('classifier.pth', map_location=torch.device('cpu')) classifier.load_state_dict(state_dict) classifier.eval() clip = clip.cpu().unsqueeze(0) with torch.no_grad(): results = classifier(clip) probabilities = F.softmax(results, dim=-1) ai_generated_probability = probabilities[0][1].item() return ai_generated_probability def audiopredict(audio): if audio is not None: audio_clip = load_audio(audio) ai_generated_probability = classify_audio_clip(audio_clip) image_path = os.path.join("./wave.jpg") image = Image.open(image_path) if ai_generated_probability < 0.5: return "Real", "The audio is likely to be Real", "No EXIF data found in the audio", image else: return "Deepfake", "The audio is likely to be AI Generated", "No EXIF data found in the audio", image # Video Input Code def save_video(video_path): # Create a temporary directory to save the video with tempfile.TemporaryDirectory() as temp_dir: # Extract filename from path filename = os.path.basename(video_path) # Save video to the temporary folder temp_video_path = os.path.join(temp_dir, filename) with open(temp_video_path, "wb") as f: f.write(open(video_path, "rb").read()) # Process frames, select faces, and perform deepfake identification textoutput, exif, face_with_mask = process_video(temp_dir, filename) print(textoutput) string = textoutput # Extract percentages and convert them to floats percentages = re.findall(r"(\d+\.\d+)%", string) real_percentage = float(percentages[0]) fake_percentage = float(percentages[1]) # Determine which percentage is higher if real_percentage > fake_percentage: print("Real") val = "Real" else: print("Fake") val = "Deepfake" return val, textoutput, exif, face_with_mask def process_video(video_folder, video_filename): # Additional Processing (Frames, Faces, Deepfake Identification) frames_base_dir = "./frames" faces_base_dir = "./faces" selected_faces_base_dir = "./selected_faces" # Find the latest video video_path = os.path.join(video_folder, video_filename) # Create session folders session_name = datetime.now().strftime("%Y%m%d_%H%M%S") frames_session_dir = create_session_folder(frames_base_dir, session_name) faces_session_dir = create_session_folder(faces_base_dir, session_name) selected_faces_session_dir = create_session_folder(selected_faces_base_dir, session_name) # Extract frames and faces video_to_frames_and_extract_faces(video_path, frames_session_dir, faces_session_dir) # Select random faces select_random_faces(faces_session_dir, selected_faces_session_dir) # Perform deepfake identification textoutput, exif, face_with_mask = identify_deepfake(selected_faces_session_dir) return textoutput, exif, face_with_mask def create_session_folder(parent_dir, session_name=None): if not session_name: session_name = datetime.now().strftime("%Y%m%d_%H%M%S") session_path = os.path.join(parent_dir, session_name) os.makedirs(session_path, exist_ok=True) return session_path def extract_faces(frame_path, faces_dir): frame = cv2.imread(frame_path) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) detector = dlib.get_frontal_face_detector() faces = detector(gray, 1) faces_extracted = 0 for (i, face) in enumerate(faces): (x, y, w, h) = (face.left(), face.top(), face.width(), face.height()) face_image = frame[y:y+h, x:x+w] face_file_path = os.path.join(faces_dir, f"face_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg") cv2.imwrite(face_file_path, face_image) faces_extracted += 1 return faces_extracted def video_to_frames_and_extract_faces(video_path, frames_dir, faces_dir): video_capture = cv2.VideoCapture(video_path) success, frame = video_capture.read() frame_count = 0 processed_frame_count = 0 futures = [] num_workers = min(multiprocessing.cpu_count(), 8) with ProcessPoolExecutor(max_workers=num_workers) as executor: while success: if frame_count % 2 == 0: frame_file = os.path.join(frames_dir, f"frame_{processed_frame_count}.jpg") cv2.imwrite(frame_file, frame) processed_frame_count += 1 if processed_frame_count % 4 == 0: future = executor.submit(extract_faces, frame_file, faces_dir) futures.append(future) success, frame = video_capture.read() frame_count += 1 total_faces = sum(f.result() for f in as_completed(futures)) print(f"Saved frames: {processed_frame_count}, Processed for face extraction: {len(futures)}, Extracted faces: {total_faces}") video_capture.release() return total_faces def select_random_faces(faces_dir, selected_faces_dir): face_files = [os.path.join(faces_dir, f) for f in os.listdir(faces_dir) if f.endswith('.jpg')] selected_faces = random.sample(face_files, min(20, len(face_files))) for face_file in selected_faces: basename = os.path.basename(face_file) destination_file = os.path.join(selected_faces_dir, basename) shutil.copy(face_file, destination_file) print(f"Selected random faces: {len(selected_faces)}") # Find Deepfake or Not def identify_deepfake(selected_faces_dir): # Setup device DEVICE = 'cpu' if not torch.cuda.is_available() else 'cuda' # Initialize MTCNN and InceptionResnetV1 with pre-trained models mtcnn = MTCNN(select_largest=False, post_process=False, device=DEVICE).to(DEVICE).eval() model = InceptionResnetV1(pretrained="vggface2", classify=True, num_classes=1, device=DEVICE) # Load the model checkpoint checkpoint_path = "./resnetinceptionv1_epoch_32.pth" # Update this path checkpoint = torch.load(checkpoint_path, map_location=DEVICE) model.load_state_dict(checkpoint['model_state_dict']) model.to(DEVICE) model.eval() # Define prediction function def predict(input_image: Image.Image): try: face = mtcnn(input_image) if face is None: raise Exception('No face detected') face = F.interpolate(face.unsqueeze(0), size=(256, 256), mode='bilinear', align_corners=False) face = face.to(DEVICE).to(torch.float32) / 255.0 target_layers = [model.block8.branch1[-1]] cam = GradCAM(model=model, target_layers=target_layers) targets = [ClassifierOutputTarget(0)] grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True) grayscale_cam = grayscale_cam[0, :] face_image_np = face.squeeze().permute(1, 2, 0).cpu().detach().numpy() visualization = show_cam_on_image(face_image_np, grayscale_cam, use_rgb=True) face_with_mask = cv2.addWeighted((face_image_np * 255).astype('uint8'), 1, (visualization * 255).astype('uint8'), 0.5, 0) with torch.no_grad(): output = torch.sigmoid(model(face)).item() prediction = "real" if output < 0.5 else "fake" confidences = {'real': 1 - output, 'fake': output} return confidences, prediction, face_with_mask except Exception as e: print(f"Prediction failed: {e}") return {'real': 0, 'fake': 100}, "fake", None # Process images in the selected folder image_files = sorted([f for f in os.listdir(selected_faces_dir) if f.endswith(('.jpg', '.jpeg', '.png', '.bmp'))]) results = {} # Initialize an empty dictionary to store results for image_file in image_files: image_path = os.path.join(selected_faces_dir, image_file) input_image = Image.open(image_path) confidences, prediction, face_with_mask = predict(input_image) # print(confidences, prediction, face_with_mask) if face_with_mask is None: continue # Store the results in the dictionary results[image_file] = { 'Confidence': confidences, 'Prediction': 'real' if confidences['real'] > confidences['fake'] else 'fake' } print(f"Image: {image_file}, Confidence: {confidences}, Prediction: {'real' if confidences['real'] > confidences['fake'] else 'fake'}") image_path = os.path.join(selected_faces_dir, image_files[0]) image = Image.open(image_path) exif_data = image.getexif() # Returns an Exif instance or None if exif_data: exif = "" for tag_id in exif_data: # Get the tag name tag = TAGS.get(tag_id, tag_id) value = exif_data[tag_id] # Print the tag and value in a human-readable format exif += f"{tag}: {value}\n" else: exif = "No EXIF data or Metadata found in the video" # Accumulate 'real' and 'fake' scores real_total = 0.0 fake_total = 0.0 count = 0 for key, value in results.items(): if 'Confidence' in value: real_total += value['Confidence']['real'] fake_total += value['Confidence']['fake'] count += 1 # Calculate and display consolidated score if any images were successfully processed if count > 0: real_avg = (real_total / count) * 100 fake_avg = (fake_total / count) * 100 textoutput = (f"Consolidated Score for the uploaded video - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%") return textoutput, exif, face_with_mask else: print("No images were successfully processed to calculate a consolidated score.") # Gradio Interface def predictimage(input_image: Image.Image): DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu' mtcnn = MTCNN( select_largest=False, post_process=False, device=DEVICE ).to(DEVICE).eval() model = InceptionResnetV1( pretrained="vggface2", classify=True, num_classes=1, device=DEVICE ) checkpoint = torch.load("./resnetinceptionv1_epoch_32.pth", map_location=torch.device('cpu')) model.load_state_dict(checkpoint['model_state_dict']) model.to(DEVICE) model.eval() face = mtcnn(input_image) image = input_image exif_data = image.getexif() # Returns an Exif instance or None if exif_data: exif = "" for tag_id in exif_data: # Get the tag name tag = TAGS.get(tag_id, tag_id) value = exif_data[tag_id] # Print the tag and value in a human-readable format exif += f"{tag}: {value}\n" else: exif = "No EXIF data found in the image" if face is None: return "Neutral", "No face detected", exif, input_image face = face.unsqueeze(0) # add the batch dimension face = F.interpolate(face, size=(256, 256), mode='bilinear', align_corners=False) # convert the face into a numpy array to be able to plot it prev_face = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy() prev_face = prev_face.astype('uint8') face = face.to(DEVICE) face = face.to(torch.float32) face = face / 255.0 face_image_to_plot = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy() target_layers=[model.block8.branch1[-1]] use_cuda = True if torch.cuda.is_available() else False cam = GradCAM(model=model, target_layers=target_layers) targets = [ClassifierOutputTarget(0)] grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True) grayscale_cam = grayscale_cam[0, :] visualization = show_cam_on_image(face_image_to_plot, grayscale_cam, use_rgb=True) face_with_mask = cv2.addWeighted(prev_face, 1, visualization, 0.5, 0) with torch.no_grad(): output = torch.sigmoid(model(face).squeeze(0)) prediction = "Real" if output.item() < 0.5 else "Deepfake" real_prediction = 1 - output.item() fake_prediction = output.item() real_avg = real_prediction * 100 fake_avg = fake_prediction * 100 textoutput = (f"Consolidated Score for the uploaded image - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%") return prediction, textoutput, exif, face_with_mask def main(): # Video Input Interface video_input_interface = gr.Interface( fn=inputseparation, inputs=[ gr.Video(label="Upload Video"), gr.Image(label="Input Image", type="pil"), gr.Audio(label="Upload Audio", type="filepath") ], outputs=[ gr.Label(label="Output Result"), gr.Text(label="Explanation"), gr.Text(label="EXIF Data / Metadata"), gr.Image(label="Face with Mask") ], title="Veritrue.ai", description="You can upload either a video, image or an audio and it will give you whether it is a deepfake or a real one." ) # Execute Video Input Interface video_input_interface.launch() if __name__ == "__main__": main()