Veritrue.ai / app.py
Dhejan's picture
Added temp file for audio and other folders for faces etc
209c908
import gradio as gr
import cv2
import dlib
import shutil
import numpy as np
import random
from datetime import datetime
import torch
import torch.nn.functional as F
from facenet_pytorch import MTCNN, InceptionResnetV1
from PIL import Image
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image
import os
import warnings
import tempfile
import glob
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import re
from PIL import Image
from PIL.ExifTags import TAGS
import tempfile
import librosa
import plotly.express as px
import torchaudio
from tortoise.models.classifier import AudioMiniEncoderWithClassifierHead
warnings.filterwarnings("ignore")
def inputseparation(video, image, audio):
if video is not None:
return save_video(video)
elif image is not None:
return predictimage(image)
else:
return audiopredict(audio)
def load_audio(uploaded_file, sampling_rate=22000):
# Handle MP3 files with torchaudio
with tempfile.NamedTemporaryFile(delete=False) as tmp:
with open(uploaded_file, 'rb') as audio_file: # Open in binary mode
tmp.write(audio_file.read())
tmp_path = tmp.name
audio, sr = torchaudio.load(tmp_path)
audio = audio.mean(dim=0)
if sr != sampling_rate:
audio = torchaudio.transforms.Resample(sr, sampling_rate)(audio)
audio = audio.clamp_(-1, 1)
return audio.unsqueeze(0)
def classify_audio_clip(clip):
classifier = AudioMiniEncoderWithClassifierHead(2, spec_dim=1, embedding_dim=512, depth=5, downsample_factor=4, resnet_blocks=2, attn_blocks=4, num_attn_heads=4, base_channels=32, dropout=0, kernel_size=5, distribute_zero_label=False)
state_dict = torch.load('classifier.pth', map_location=torch.device('cpu'))
classifier.load_state_dict(state_dict)
classifier.eval()
clip = clip.cpu().unsqueeze(0)
with torch.no_grad():
results = classifier(clip)
probabilities = F.softmax(results, dim=-1)
ai_generated_probability = probabilities[0][1].item()
return ai_generated_probability
def audiopredict(audio):
if audio is not None:
audio_clip = load_audio(audio)
ai_generated_probability = classify_audio_clip(audio_clip)
image_path = os.path.join("./wave.jpg")
image = Image.open(image_path)
if ai_generated_probability < 0.5:
return "Real", "The audio is likely to be Real", "No EXIF data found in the audio", image
else:
return "Deepfake", "The audio is likely to be AI Generated", "No EXIF data found in the audio", image
# Video Input Code
def save_video(video_path):
# Create a temporary directory to save the video
with tempfile.TemporaryDirectory() as temp_dir:
# Extract filename from path
filename = os.path.basename(video_path)
# Save video to the temporary folder
temp_video_path = os.path.join(temp_dir, filename)
with open(temp_video_path, "wb") as f:
f.write(open(video_path, "rb").read())
# Process frames, select faces, and perform deepfake identification
textoutput, exif, face_with_mask = process_video(temp_dir, filename)
print(textoutput)
string = textoutput
# Extract percentages and convert them to floats
percentages = re.findall(r"(\d+\.\d+)%", string)
real_percentage = float(percentages[0])
fake_percentage = float(percentages[1])
# Determine which percentage is higher
if real_percentage > fake_percentage:
print("Real")
val = "Real"
else:
print("Fake")
val = "Deepfake"
return val, textoutput, exif, face_with_mask
def process_video(video_folder, video_filename):
# Additional Processing (Frames, Faces, Deepfake Identification)
frames_base_dir = "./frames"
faces_base_dir = "./faces"
selected_faces_base_dir = "./selected_faces"
# Find the latest video
video_path = os.path.join(video_folder, video_filename)
# Create session folders
session_name = datetime.now().strftime("%Y%m%d_%H%M%S")
frames_session_dir = create_session_folder(frames_base_dir, session_name)
faces_session_dir = create_session_folder(faces_base_dir, session_name)
selected_faces_session_dir = create_session_folder(selected_faces_base_dir, session_name)
# Extract frames and faces
video_to_frames_and_extract_faces(video_path, frames_session_dir, faces_session_dir)
# Select random faces
select_random_faces(faces_session_dir, selected_faces_session_dir)
# Perform deepfake identification
textoutput, exif, face_with_mask = identify_deepfake(selected_faces_session_dir)
return textoutput, exif, face_with_mask
def create_session_folder(parent_dir, session_name=None):
if not session_name:
session_name = datetime.now().strftime("%Y%m%d_%H%M%S")
session_path = os.path.join(parent_dir, session_name)
os.makedirs(session_path, exist_ok=True)
return session_path
def extract_faces(frame_path, faces_dir):
frame = cv2.imread(frame_path)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
detector = dlib.get_frontal_face_detector()
faces = detector(gray, 1)
faces_extracted = 0
for (i, face) in enumerate(faces):
(x, y, w, h) = (face.left(), face.top(), face.width(), face.height())
face_image = frame[y:y+h, x:x+w]
face_file_path = os.path.join(faces_dir, f"face_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg")
cv2.imwrite(face_file_path, face_image)
faces_extracted += 1
return faces_extracted
def video_to_frames_and_extract_faces(video_path, frames_dir, faces_dir):
video_capture = cv2.VideoCapture(video_path)
success, frame = video_capture.read()
frame_count = 0
processed_frame_count = 0
futures = []
num_workers = min(multiprocessing.cpu_count(), 8)
with ProcessPoolExecutor(max_workers=num_workers) as executor:
while success:
if frame_count % 2 == 0:
frame_file = os.path.join(frames_dir, f"frame_{processed_frame_count}.jpg")
cv2.imwrite(frame_file, frame)
processed_frame_count += 1
if processed_frame_count % 4 == 0:
future = executor.submit(extract_faces, frame_file, faces_dir)
futures.append(future)
success, frame = video_capture.read()
frame_count += 1
total_faces = sum(f.result() for f in as_completed(futures))
print(f"Saved frames: {processed_frame_count}, Processed for face extraction: {len(futures)}, Extracted faces: {total_faces}")
video_capture.release()
return total_faces
def select_random_faces(faces_dir, selected_faces_dir):
face_files = [os.path.join(faces_dir, f) for f in os.listdir(faces_dir) if f.endswith('.jpg')]
selected_faces = random.sample(face_files, min(20, len(face_files)))
for face_file in selected_faces:
basename = os.path.basename(face_file)
destination_file = os.path.join(selected_faces_dir, basename)
shutil.copy(face_file, destination_file)
print(f"Selected random faces: {len(selected_faces)}")
# Find Deepfake or Not
def identify_deepfake(selected_faces_dir):
# Setup device
DEVICE = 'cpu' if not torch.cuda.is_available() else 'cuda'
# Initialize MTCNN and InceptionResnetV1 with pre-trained models
mtcnn = MTCNN(select_largest=False, post_process=False, device=DEVICE).to(DEVICE).eval()
model = InceptionResnetV1(pretrained="vggface2", classify=True, num_classes=1, device=DEVICE)
# Load the model checkpoint
checkpoint_path = "./resnetinceptionv1_epoch_32.pth" # Update this path
checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(DEVICE)
model.eval()
# Define prediction function
def predict(input_image: Image.Image):
try:
face = mtcnn(input_image)
if face is None:
raise Exception('No face detected')
face = F.interpolate(face.unsqueeze(0), size=(256, 256), mode='bilinear', align_corners=False)
face = face.to(DEVICE).to(torch.float32) / 255.0
target_layers = [model.block8.branch1[-1]]
cam = GradCAM(model=model, target_layers=target_layers)
targets = [ClassifierOutputTarget(0)]
grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True)
grayscale_cam = grayscale_cam[0, :]
face_image_np = face.squeeze().permute(1, 2, 0).cpu().detach().numpy()
visualization = show_cam_on_image(face_image_np, grayscale_cam, use_rgb=True)
face_with_mask = cv2.addWeighted((face_image_np * 255).astype('uint8'), 1, (visualization * 255).astype('uint8'), 0.5, 0)
with torch.no_grad():
output = torch.sigmoid(model(face)).item()
prediction = "real" if output < 0.5 else "fake"
confidences = {'real': 1 - output, 'fake': output}
return confidences, prediction, face_with_mask
except Exception as e:
print(f"Prediction failed: {e}")
return {'real': 0, 'fake': 100}, "fake", None
# Process images in the selected folder
image_files = sorted([f for f in os.listdir(selected_faces_dir) if f.endswith(('.jpg', '.jpeg', '.png', '.bmp'))])
results = {} # Initialize an empty dictionary to store results
for image_file in image_files:
image_path = os.path.join(selected_faces_dir, image_file)
input_image = Image.open(image_path)
confidences, prediction, face_with_mask = predict(input_image)
# print(confidences, prediction, face_with_mask)
if face_with_mask is None:
continue
# Store the results in the dictionary
results[image_file] = {
'Confidence': confidences,
'Prediction': 'real' if confidences['real'] > confidences['fake'] else 'fake'
}
print(f"Image: {image_file}, Confidence: {confidences}, Prediction: {'real' if confidences['real'] > confidences['fake'] else 'fake'}")
image_path = os.path.join(selected_faces_dir, image_files[0])
image = Image.open(image_path)
exif_data = image.getexif() # Returns an Exif instance or None
if exif_data:
exif = ""
for tag_id in exif_data:
# Get the tag name
tag = TAGS.get(tag_id, tag_id)
value = exif_data[tag_id]
# Print the tag and value in a human-readable format
exif += f"{tag}: {value}\n"
else:
exif = "No EXIF data or Metadata found in the video"
# Accumulate 'real' and 'fake' scores
real_total = 0.0
fake_total = 0.0
count = 0
for key, value in results.items():
if 'Confidence' in value:
real_total += value['Confidence']['real']
fake_total += value['Confidence']['fake']
count += 1
# Calculate and display consolidated score if any images were successfully processed
if count > 0:
real_avg = (real_total / count) * 100
fake_avg = (fake_total / count) * 100
textoutput = (f"Consolidated Score for the uploaded video - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%")
return textoutput, exif, face_with_mask
else:
print("No images were successfully processed to calculate a consolidated score.")
# Gradio Interface
def predictimage(input_image: Image.Image):
DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'
mtcnn = MTCNN(
select_largest=False,
post_process=False,
device=DEVICE
).to(DEVICE).eval()
model = InceptionResnetV1(
pretrained="vggface2",
classify=True,
num_classes=1,
device=DEVICE
)
checkpoint = torch.load("./resnetinceptionv1_epoch_32.pth", map_location=torch.device('cpu'))
model.load_state_dict(checkpoint['model_state_dict'])
model.to(DEVICE)
model.eval()
face = mtcnn(input_image)
image = input_image
exif_data = image.getexif() # Returns an Exif instance or None
if exif_data:
exif = ""
for tag_id in exif_data:
# Get the tag name
tag = TAGS.get(tag_id, tag_id)
value = exif_data[tag_id]
# Print the tag and value in a human-readable format
exif += f"{tag}: {value}\n"
else:
exif = "No EXIF data found in the image"
if face is None:
return "Neutral", "No face detected", exif, input_image
face = face.unsqueeze(0) # add the batch dimension
face = F.interpolate(face, size=(256, 256), mode='bilinear', align_corners=False)
# convert the face into a numpy array to be able to plot it
prev_face = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy()
prev_face = prev_face.astype('uint8')
face = face.to(DEVICE)
face = face.to(torch.float32)
face = face / 255.0
face_image_to_plot = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy()
target_layers=[model.block8.branch1[-1]]
use_cuda = True if torch.cuda.is_available() else False
cam = GradCAM(model=model, target_layers=target_layers)
targets = [ClassifierOutputTarget(0)]
grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True)
grayscale_cam = grayscale_cam[0, :]
visualization = show_cam_on_image(face_image_to_plot, grayscale_cam, use_rgb=True)
face_with_mask = cv2.addWeighted(prev_face, 1, visualization, 0.5, 0)
with torch.no_grad():
output = torch.sigmoid(model(face).squeeze(0))
prediction = "Real" if output.item() < 0.5 else "Deepfake"
real_prediction = 1 - output.item()
fake_prediction = output.item()
real_avg = real_prediction * 100
fake_avg = fake_prediction * 100
textoutput = (f"Consolidated Score for the uploaded image - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%")
return prediction, textoutput, exif, face_with_mask
def main():
# Video Input Interface
video_input_interface = gr.Interface(
fn=inputseparation,
inputs=[
gr.Video(label="Upload Video"),
gr.Image(label="Input Image", type="pil"),
gr.Audio(label="Upload Audio", type="filepath")
],
outputs=[
gr.Label(label="Output Result"),
gr.Text(label="Explanation"),
gr.Text(label="EXIF Data / Metadata"),
gr.Image(label="Face with Mask")
],
title="Veritrue.ai",
description="You can upload either a video, image or an audio and it will give you whether it is a deepfake or a real one."
)
# Execute Video Input Interface
video_input_interface.launch()
if __name__ == "__main__":
main()