File size: 15,370 Bytes
7d86f43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209c908
7d86f43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209c908
7d86f43
 
 
 
 
 
 
 
209c908
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7d86f43
 
 
 
 
209c908
 
 
7d86f43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209c908
7d86f43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209c908
7d86f43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409

import gradio as gr
import cv2
import dlib
import shutil
import numpy as np
import random
from datetime import datetime
import torch
import torch.nn.functional as F
from facenet_pytorch import MTCNN, InceptionResnetV1
from PIL import Image
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image
import os
import warnings
import tempfile
import glob
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import re
from PIL import Image
from PIL.ExifTags import TAGS
import tempfile
import librosa
import plotly.express as px
import torchaudio
from tortoise.models.classifier import AudioMiniEncoderWithClassifierHead

warnings.filterwarnings("ignore")

def inputseparation(video, image, audio):
    if video is not None:
        return save_video(video)
    elif image is not None:
        return predictimage(image)
    else:
        return audiopredict(audio)

def load_audio(uploaded_file, sampling_rate=22000):
    
    # Handle MP3 files with torchaudio
    with tempfile.NamedTemporaryFile(delete=False) as tmp:
        with open(uploaded_file, 'rb') as audio_file:  # Open in binary mode
                tmp.write(audio_file.read())
        tmp_path = tmp.name
        audio, sr = torchaudio.load(tmp_path)
        audio = audio.mean(dim=0)

    if sr != sampling_rate:
        audio = torchaudio.transforms.Resample(sr, sampling_rate)(audio)

    audio = audio.clamp_(-1, 1)

    return audio.unsqueeze(0)


def classify_audio_clip(clip):
    classifier = AudioMiniEncoderWithClassifierHead(2, spec_dim=1, embedding_dim=512, depth=5, downsample_factor=4, resnet_blocks=2, attn_blocks=4, num_attn_heads=4, base_channels=32, dropout=0, kernel_size=5, distribute_zero_label=False)
    state_dict = torch.load('classifier.pth', map_location=torch.device('cpu'))
    classifier.load_state_dict(state_dict)
    classifier.eval()
    clip = clip.cpu().unsqueeze(0)    
    with torch.no_grad():
        results = classifier(clip)
        probabilities = F.softmax(results, dim=-1)
    ai_generated_probability = probabilities[0][1].item()
    return ai_generated_probability

def audiopredict(audio):
    if audio is not None:
        audio_clip = load_audio(audio)
        ai_generated_probability = classify_audio_clip(audio_clip)
        image_path = os.path.join("./wave.jpg")
        image = Image.open(image_path)
        if ai_generated_probability < 0.5:
            return "Real", "The audio is likely to be Real", "No EXIF data found in the audio", image
        else:
            return "Deepfake", "The audio is likely to be AI Generated", "No EXIF data found in the audio", image
    
# Video Input Code
def save_video(video_path):
    # Create a temporary directory to save the video
    with tempfile.TemporaryDirectory() as temp_dir:
        # Extract filename from path
        filename = os.path.basename(video_path)

        # Save video to the temporary folder
        temp_video_path = os.path.join(temp_dir, filename)
        with open(temp_video_path, "wb") as f:
            f.write(open(video_path, "rb").read())

        # Process frames, select faces, and perform deepfake identification
        textoutput, exif, face_with_mask = process_video(temp_dir, filename)   
        print(textoutput)
        string = textoutput

        # Extract percentages and convert them to floats
        percentages = re.findall(r"(\d+\.\d+)%", string)
        real_percentage = float(percentages[0])
        fake_percentage = float(percentages[1])

        # Determine which percentage is higher
        if real_percentage > fake_percentage:
            print("Real")
            val = "Real"
        else:
            print("Fake")
            val = "Deepfake"

    return val, textoutput, exif, face_with_mask

def process_video(video_folder, video_filename):
    # Additional Processing (Frames, Faces, Deepfake Identification)
    frames_base_dir = "./frames"
    faces_base_dir = "./faces"
    selected_faces_base_dir = "./selected_faces"

    # Find the latest video
    video_path = os.path.join(video_folder, video_filename)
    
    # Create session folders
    session_name = datetime.now().strftime("%Y%m%d_%H%M%S")
    frames_session_dir = create_session_folder(frames_base_dir, session_name)
    faces_session_dir = create_session_folder(faces_base_dir, session_name)
    selected_faces_session_dir = create_session_folder(selected_faces_base_dir, session_name)

    # Extract frames and faces
    video_to_frames_and_extract_faces(video_path, frames_session_dir, faces_session_dir)
    
    # Select random faces
    select_random_faces(faces_session_dir, selected_faces_session_dir)

    # Perform deepfake identification
    textoutput, exif, face_with_mask = identify_deepfake(selected_faces_session_dir)
    return textoutput, exif, face_with_mask

def create_session_folder(parent_dir, session_name=None):
    if not session_name:
        session_name = datetime.now().strftime("%Y%m%d_%H%M%S")
    session_path = os.path.join(parent_dir, session_name)
    os.makedirs(session_path, exist_ok=True)
    return session_path

def extract_faces(frame_path, faces_dir):
    frame = cv2.imread(frame_path)
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    detector = dlib.get_frontal_face_detector()
    faces = detector(gray, 1)

    faces_extracted = 0
    for (i, face) in enumerate(faces):
        (x, y, w, h) = (face.left(), face.top(), face.width(), face.height())
        face_image = frame[y:y+h, x:x+w]
        face_file_path = os.path.join(faces_dir, f"face_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.jpg")
        cv2.imwrite(face_file_path, face_image)
        faces_extracted += 1

    return faces_extracted

def video_to_frames_and_extract_faces(video_path, frames_dir, faces_dir):
    video_capture = cv2.VideoCapture(video_path)
    success, frame = video_capture.read()
    frame_count = 0
    processed_frame_count = 0  
    futures = []
   
    num_workers = min(multiprocessing.cpu_count(), 8)  

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        while success:
           
            if frame_count % 2 == 0:
                frame_file = os.path.join(frames_dir, f"frame_{processed_frame_count}.jpg")
                cv2.imwrite(frame_file, frame)
                processed_frame_count += 1

                
                if processed_frame_count % 4 == 0:
                    future = executor.submit(extract_faces, frame_file, faces_dir)
                    futures.append(future)

            success, frame = video_capture.read()
            frame_count += 1

    total_faces = sum(f.result() for f in as_completed(futures))
    print(f"Saved frames: {processed_frame_count}, Processed for face extraction: {len(futures)}, Extracted faces: {total_faces}")

    video_capture.release()
    return total_faces

def select_random_faces(faces_dir, selected_faces_dir):
    face_files = [os.path.join(faces_dir, f) for f in os.listdir(faces_dir) if f.endswith('.jpg')]
    selected_faces = random.sample(face_files, min(20, len(face_files)))  
    for face_file in selected_faces:
        basename = os.path.basename(face_file)
        destination_file = os.path.join(selected_faces_dir, basename)
        shutil.copy(face_file, destination_file)  

    print(f"Selected random faces: {len(selected_faces)}")

# Find Deepfake or Not
def identify_deepfake(selected_faces_dir):
    # Setup device
    DEVICE = 'cpu' if not torch.cuda.is_available() else 'cuda'

    # Initialize MTCNN and InceptionResnetV1 with pre-trained models
    mtcnn = MTCNN(select_largest=False, post_process=False, device=DEVICE).to(DEVICE).eval()
    model = InceptionResnetV1(pretrained="vggface2", classify=True, num_classes=1, device=DEVICE)

    # Load the model checkpoint
    checkpoint_path = "./resnetinceptionv1_epoch_32.pth"  # Update this path
    checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(DEVICE)
    model.eval()

    # Define prediction function
    def predict(input_image: Image.Image):
        try:
            face = mtcnn(input_image)
            if face is None:
                raise Exception('No face detected')
            
            face = F.interpolate(face.unsqueeze(0), size=(256, 256), mode='bilinear', align_corners=False)
            face = face.to(DEVICE).to(torch.float32) / 255.0

            target_layers = [model.block8.branch1[-1]]
            cam = GradCAM(model=model, target_layers=target_layers)
            targets = [ClassifierOutputTarget(0)]

            grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True)
            grayscale_cam = grayscale_cam[0, :]
            face_image_np = face.squeeze().permute(1, 2, 0).cpu().detach().numpy()
            visualization = show_cam_on_image(face_image_np, grayscale_cam, use_rgb=True)
            face_with_mask = cv2.addWeighted((face_image_np * 255).astype('uint8'), 1, (visualization * 255).astype('uint8'), 0.5, 0)
            
            with torch.no_grad():
                output = torch.sigmoid(model(face)).item()
                prediction = "real" if output < 0.5 else "fake"
                confidences = {'real': 1 - output, 'fake': output}
            
            return confidences, prediction, face_with_mask

        except Exception as e:
            print(f"Prediction failed: {e}")
            return {'real': 0, 'fake': 100}, "fake", None

    # Process images in the selected folder
    image_files = sorted([f for f in os.listdir(selected_faces_dir) if f.endswith(('.jpg', '.jpeg', '.png', '.bmp'))])
    results = {}  # Initialize an empty dictionary to store results

    for image_file in image_files:
        image_path = os.path.join(selected_faces_dir, image_file)
        input_image = Image.open(image_path)
        
        confidences, prediction, face_with_mask = predict(input_image)   
        # print(confidences, prediction, face_with_mask) 
        if face_with_mask is None:
            continue
        
        # Store the results in the dictionary
        results[image_file] = {
            'Confidence': confidences,
            'Prediction': 'real' if confidences['real'] > confidences['fake'] else 'fake'
        }
        print(f"Image: {image_file}, Confidence: {confidences}, Prediction: {'real' if confidences['real'] > confidences['fake'] else 'fake'}")
    
    image_path = os.path.join(selected_faces_dir, image_files[0])
    image = Image.open(image_path)
    exif_data = image.getexif()  # Returns an Exif instance or None

    if exif_data:
        exif = ""
        for tag_id in exif_data:
            # Get the tag name
            tag = TAGS.get(tag_id, tag_id)
            value = exif_data[tag_id]
            # Print the tag and value in a human-readable format
            exif += f"{tag}: {value}\n"
    else:
        exif = "No EXIF data or Metadata found in the video"
    
    # Accumulate 'real' and 'fake' scores
    real_total = 0.0
    fake_total = 0.0
    count = 0  

    for key, value in results.items():
        if 'Confidence' in value:
            real_total += value['Confidence']['real']
            fake_total += value['Confidence']['fake']
            count += 1

    # Calculate and display consolidated score if any images were successfully processed
    if count > 0:
        real_avg = (real_total / count) * 100
        fake_avg = (fake_total / count) * 100
        
        textoutput = (f"Consolidated Score for the uploaded video - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%")
        
        return textoutput, exif, face_with_mask
        
    else:
        print("No images were successfully processed to calculate a consolidated score.")

# Gradio Interface
def predictimage(input_image: Image.Image):
    DEVICE = 'cuda:0' if torch.cuda.is_available() else 'cpu'

    mtcnn = MTCNN(
        select_largest=False,
        post_process=False,
        device=DEVICE
    ).to(DEVICE).eval()

    model = InceptionResnetV1(
        pretrained="vggface2",
        classify=True,
        num_classes=1,
        device=DEVICE
    )

    checkpoint = torch.load("./resnetinceptionv1_epoch_32.pth", map_location=torch.device('cpu'))
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(DEVICE)
    model.eval()
    face = mtcnn(input_image)
    image = input_image
    exif_data = image.getexif()  # Returns an Exif instance or None

    if exif_data:
        exif = ""
        for tag_id in exif_data:
            # Get the tag name
            tag = TAGS.get(tag_id, tag_id)
            value = exif_data[tag_id]
            # Print the tag and value in a human-readable format
            exif += f"{tag}: {value}\n"
    else:
        exif = "No EXIF data found in the image"
    if face is None:
        return "Neutral", "No face detected", exif, input_image
    face = face.unsqueeze(0) # add the batch dimension
    face = F.interpolate(face, size=(256, 256), mode='bilinear', align_corners=False)
    
    # convert the face into a numpy array to be able to plot it
    prev_face = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy()
    prev_face = prev_face.astype('uint8')

    face = face.to(DEVICE)
    face = face.to(torch.float32)
    face = face / 255.0
    face_image_to_plot = face.squeeze(0).permute(1, 2, 0).cpu().detach().int().numpy()

    target_layers=[model.block8.branch1[-1]]
    use_cuda = True if torch.cuda.is_available() else False
    cam = GradCAM(model=model, target_layers=target_layers)
    targets = [ClassifierOutputTarget(0)]

    grayscale_cam = cam(input_tensor=face, targets=targets, eigen_smooth=True)
    grayscale_cam = grayscale_cam[0, :]
    visualization = show_cam_on_image(face_image_to_plot, grayscale_cam, use_rgb=True)
    face_with_mask = cv2.addWeighted(prev_face, 1, visualization, 0.5, 0)

    with torch.no_grad():
        output = torch.sigmoid(model(face).squeeze(0))
        prediction = "Real" if output.item() < 0.5 else "Deepfake"
        
        real_prediction = 1 - output.item()
        fake_prediction = output.item()
        
        real_avg = real_prediction * 100
        fake_avg = fake_prediction * 100
        
        textoutput = (f"Consolidated Score for the uploaded image - Real: {real_avg:.2f}%, Fake: {fake_avg:.2f}%")
        
                
    return prediction, textoutput, exif, face_with_mask

def main():
    # Video Input Interface
    video_input_interface = gr.Interface(
        fn=inputseparation,
        inputs=[
            gr.Video(label="Upload Video"),
            gr.Image(label="Input Image", type="pil"),
            gr.Audio(label="Upload Audio", type="filepath")
        ],
        outputs=[
            gr.Label(label="Output Result"),
            gr.Text(label="Explanation"),
            gr.Text(label="EXIF Data / Metadata"),  
            gr.Image(label="Face with Mask")         
        ],
        title="Veritrue.ai",
        description="You can upload either a video, image or an audio and it will give you whether it is a deepfake or a real one."
    )

    # Execute Video Input Interface
    video_input_interface.launch()

if __name__ == "__main__":
    main()