from pydub import AudioSegment import noisereduce as nr import librosa import soundfile as sf import torchaudio from silero_vad import get_speech_timestamps, collect_chunks import io import numpy as np from pyannote.audio.pipelines import SpeakerDiarization from pyannote.core import Segment from tqdm import tqdm import pandas as pd import subprocess import sys import os import torch import torch.nn as nn import cv2 from datetime import timedelta import easyocr from transformers import pipeline from codecarbon import EmissionsTracker from transformers import CLIPProcessor, CLIPModel, AutoTokenizer, BertModel from PIL import Image import re import ast import tempfile from model import SenseVoiceSmall from funasr.utils.postprocess_utils import rich_transcription_postprocess import csv import whisper from datetime import datetime # 📌 Chargement du modèle Silero VAD model_and_utils = torch.hub.load( repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=True, trust_repo=True # Évite l'avertissement "untrusted repository" ) # 📌 Extraction correcte des éléments du tuple model = model_and_utils[0] # Le modèle PyTorch utils_tuple = model_and_utils[1] # Tuple contenant les fonctions utilitaires # 📌 Assignation des fonctions utiles get_speech_timestamps = utils_tuple[0] # Fonction de détection des segments parlés save_audio = utils_tuple[1] # Fonction de sauvegarde audio (optionnelle) read_audio = utils_tuple[2] # Fonction de lecture de l'audio VADIterator = utils_tuple[3] # Classe pour gérer le VAD collect_chunks = utils_tuple[4] # Fonction pour extraire les morceaux de speech # FONCTION D'EXTRACTION DE L'AUDIO def extract_audio(video_path, output_audio_path): ''' Explication des options ffmpeg -ac 1 → Convertit l’audio en mono -ar 16000 → Définit la fréquence d’échantillonnage à 16 kHz (utile pour certaines applications) -q:a 0 → Qualité audio maximale -map a → Extrait uniquement la piste audio -vn → Désactive la vidéo ''' command = f'ffmpeg -i "{video_path}" -vn -ac 1 -ar 16000 -q:a 0 -map a "{output_audio_path}"' subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if os.path.exists(output_audio_path): print(f"✅ Audio extracted successfully : {output_audio_path}") else: print(f"❌ Echec of audio extraction : {video_path}") def extract_all_audio(Video_folder, Audio_folder): print("##########################################") for video in os.listdir(Video_folder): if video.endswith(".mp4"): video_path = os.path.join(Video_folder, video) audio_path = os.path.join(Audio_folder, video.replace(".mp4", ".wav")) extract_audio(video_path , audio_path) print("Extraction de l'audio terminée !") print("##########################################") def time_to_seconds(time_str): """Convertit une chaîne de temps HH:MM:SS en secondes.""" h, m, s = map(int, time_str.split(":")) return h * 3600 + m * 60 + s def extract_snippets(audio_path, output_path, snippets): """ Extrait et concatène des parties spécifiques d'un fichier audio. :param audio_path: Chemin du fichier audio d'entrée :param output_path: Chemin du fichier audio de sortie :param snippets: Liste de listes [["HH:MM:SS", "HH:MM:SS"]] """ audio = AudioSegment.from_file(audio_path) extracted_audio = AudioSegment.empty() for start, end in snippets: start_sec = time_to_seconds(start) end_sec = time_to_seconds(end) extracted_audio += audio[start_sec * 1000:end_sec * 1000] # Sauvegarde du fichier final extracted_audio.export(output_path, format="wav") def get_video_duration(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print("Erreur ouverture vidéo") return None fps = cap.get(cv2.CAP_PROP_FPS) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) duration = frame_count / fps cap.release() return duration # PREPROCESSING AUDIOS def format_time(seconds): """Convertit un temps en secondes vers HH:MM:SS""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) return f"{h:02}:{m:02}:{s:02}" def reduce_noise(audio, sr): """ Applique une réduction de bruit sur l'audio. """ return nr.reduce_noise(y=audio, sr=sr) def save_audio(audio, sr, output_path): """ Sauvegarde un fichier audio au format WAV. """ sf.write(output_path, audio, sr) def detect_music_and_voice(audio, sr): """ Détecte si l'audio contient de la musique et identifie si une voix est présente avec. Utilise MFCCs, Zero Crossing Rate (ZCR) et analyse spectrale pour différencier : - Musique seule - Voix seule - Voix + Musique """ # 🔹 Analyse des MFCCs (signature musique vs voix) mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) mfcc_var = np.var(mfcc, axis=1) # 🔹 Calcul du Zero Crossing Rate (ZCR) → Détecte les transitions rapides dans la musique zcr = librosa.feature.zero_crossing_rate(y=audio) avg_zcr = np.mean(zcr) # 🔹 Analyse du spectrogramme → Formants vocaux spec = librosa.amplitude_to_db(librosa.stft(audio), ref=np.max) vocal_energy = np.mean(spec[50:300, :]) # 50Hz-300Hz = fréquences vocales # 🔹 Détection de musique pure vs voix is_music = np.mean(mfcc_var) < 50 and avg_zcr > 0.05 is_voice = vocal_energy > -20 # Plus de -20dB dans les fréquences vocales = voix présente if is_music and is_voice: return "Voix + Musique" elif is_music: return "Musique seule" elif is_voice: return "Voix seule" else: return "Silence" def expand_and_merge_speech_timestamps(speech_timestamps, sr=16000, margin=1.5): """ Élargit chaque segment parlé de ±margin (en secondes), puis fusionne les chevauchements. Fonctionne directement sur les échantillons. """ # Étape 1 : élargir expanded = [] margin_samples = int(margin * sr) for seg in speech_timestamps: start = max(seg['start'] - margin_samples, 0) end = seg['end'] + margin_samples expanded.append([start, end]) # Étape 2 : fusionner expanded.sort() merged = [] for seg in expanded: if not merged or seg[0] > merged[-1][1]: merged.append(seg) else: merged[-1][1] = max(merged[-1][1], seg[1]) # Étape 3 : retransformer en format [{'start': x, 'end': y}] return [{'start': start, 'end': end} for start, end in merged] def preprocess_audio(input_path, output_path, threshold_CDA = 0.2): """ Nettoie l'audio et conserve la même durée en remplaçant les silences par du silence audio. - input_path : Chemin du fichier audio en entrée (16 kHz, mono) - output_path : Chemin du fichier nettoyé - vad_threshold : Sensibilité du VAD (0.3 = sensible, 0.5 = normal, 0.7 = strict) Retourne : - Un fichier audio nettoyé avec la même durée - Une liste des timestamps des parties parlées """ # 🔹 1. Chargement de l'audio /music ... audio, sr = librosa.load(input_path, sr=16000) # Assure un échantillonnage à 16kHz original_duration = len(audio) # Nombre d'échantillons """ # Détection de musique et voix category = detect_music_and_voice(audio, sr) if category == "Voix + Musique": threshold = 0.4 # 🎵 Voix dans la musique → Capture bien la parole elif category == "Musique seule": threshold = 0.8 # 🎵 Musique seule → Ignorer elif category == "Voix seule": threshold = 0.3 # 🎙️ Seulement Voix → Capturer toute la parole else: threshold = 0.7 # Silence ou bruit → Ignorer """ threshold = threshold_CDA # 🔹 2. Réduction du bruit audio = nr.reduce_noise(y=audio, sr=sr) # 🔹 3. Détection des segments parlés speech_timestamps = get_speech_timestamps(audio, model,sampling_rate=sr, threshold=threshold) # 🔹 4. Création d'un nouvel audio avec silences à la place des blancs cleaned_audio = np.zeros(original_duration, dtype=np.float32) # Commence par du silence total speech_ranges = [] for seg in expand_and_merge_speech_timestamps(speech_timestamps): start_sample, end_sample = seg['start'], seg['end'] cleaned_audio[start_sample:end_sample] = audio[start_sample:end_sample] # Remet les parties parlées speech_ranges.append([format_time(start_sample / sr), format_time(end_sample / sr)]) # Sauvegarde timestamps # 🔹 5. Sauvegarde de l'audio nettoyé avec silences sf.write(output_path, cleaned_audio, sr) print(f"✅ Audio cleaned : {output_path}") #print(f"🎵 Catégorie détectée : {category} → Threshold = {threshold}") #print(f"🎙️ Segments parlés détectés : {speech_ranges}") return speech_ranges import os import pandas as pd def preprocess_all_audio(audio_path, output_audio_clean_path): data = [] for i, audio_file in enumerate(os.listdir(audio_path)): if audio_file.endswith(".wav"): input_audio_path = os.path.join(audio_path, audio_file) output_clean_path = os.path.join(output_audio_clean_path, audio_file) speech_ranges = preprocess_audio(input_audio_path, output_clean_path) data.append({"audio_name": audio_file, "speech_ranges": speech_ranges}) df = pd.DataFrame(data) if data: # Vérifie si au moins un fichier a été traité print(f"✅ {len(data)} fichiers audio nettoyés avec succès.") else: print(f"❌ Aucun fichier audio n'a été traité.") return df # FIRST FILTER : Hate speech detection in audio def load_whisper_model(model_name: str = "base"): return whisper.load_model(model_name) def extract_audi_range(audio_path, start, end): audio = AudioSegment.from_wav(audio_path) segment = audio[start * 1000:end * 1000] # convert to milliseconds temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") segment.export(temp_file.name, format="wav") return temp_file.name def parse_timstamp(ts): if isinstance(ts, (float, int)): return float(ts) if isinstance(ts, str) and ":" in ts: h, m, s = ts.split(":") return int(h) * 3600 + int(m) * 60 + float(s) return float(ts) def transcribe_audio(model, audio_path: str, speech_ranges=None) -> dict: if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") if not speech_ranges: return whisper.transcribe(model, audio_path) all_segments = [] for start, end in speech_ranges: if end - start < 1.0: print(f"Skipped short segment: {start}-{end} (less than 1 second)") continue temp_path = extract_audi_range(audio_path, start, end) try: partial_result = whisper.transcribe( model, temp_path, condition_on_previous_text=False, no_speech_threshold=0.0 ) for seg in partial_result.get("segments", []): seg["start"] += start seg["end"] += start all_segments.extend(partial_result.get("segments", [])) except Exception as e: print(f"Error transcribing segment {start}-{end} of {audio_path}: {e}") finally: os.remove(temp_path) return {"segments": all_segments} def process_dataset(dataset_path: str, model, input_csv: str, output_csv: str) -> None: with open(input_csv, mode='r', newline='', encoding='utf-8') as infile: reader = csv.reader(infile) header = next(reader) rows = list(reader) with open(output_csv, mode='w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) writer.writerow(header + ["Timestamps", "Texts"]) for row in rows: video_file_name = row[0].replace(".mp4", ".wav") video_hate_speech = row[1] speech_ranges_str = row[5] print(f"Speech ranges: {speech_ranges_str} for {video_file_name}") print(f"Row data: {row}") try: if not speech_ranges_str.strip(): raise ValueError("Empty speech range") raw_ranges = ast.literal_eval(speech_ranges_str) speech_ranges = [(parse_timstamp(start), parse_timstamp(end)) for start, end in raw_ranges] except Exception as e: print(f"Invalid speech_ranges for {video_file_name}: {speech_ranges_str} — {e}") continue folder = "hate_audios_clean" if video_hate_speech == "Hate" else "non_hate_audios_clean" audio_path = os.path.abspath(os.path.join(dataset_path, folder, video_file_name)) print(f"Processing: {audio_path}") try: result = transcribe_audio(model, audio_path, speech_ranges) segments = result.get("segments", []) timestamps = [[ f"{int(seg['start'] // 3600):02}:{int((seg['start'] % 3600) // 60):02}:{int(seg['start'] % 60):02}", f"{int(seg['end'] // 3600):02}:{int((seg['end'] % 3600) // 60):02}:{int(seg['end'] % 60):02}" ] for seg in segments] texts = [seg.get("text", "") for seg in segments] writer.writerow(row + [timestamps, texts]) except Exception as e: print(f"Error processing {video_file_name}: {e}") print(f"Transcription results saved to {output_csv}") def speech_ranges_to_timestamps(audio_path, speech_ranges, model_name="base"): """ Transcribe only the specified speech_ranges from the given WAV file and return aligned timestamps and texts. Args: audio_path (str): Path to the .wav audio file. speech_ranges (list of tuple): List of (start, end) times in seconds or "HH:MM:SS" strings. model_name (str): Whisper model size to load (default "base"). Returns: timestamps (list of [str, str]): List of [start_ts, end_ts] strings "HH:MM:SS". texts (list of str): List of transcribed text for each segment. """ # load model model = load_whisper_model(model_name) # parse any string timestamps into floats parsed_ranges = [ (parse_timstamp(start), parse_timstamp(end)) for start, end in speech_ranges ] # run transcription on each segment result = transcribe_audio(model, audio_path, parsed_ranges) segments = result.get("segments", []) # format output timestamps = [ [ f"{int(seg['start'] // 3600):02}:{int((seg['start'] % 3600) // 60):02}:{int(seg['start'] % 60):02}", f"{int(seg['end'] // 3600):02}:{int((seg['end'] % 3600) // 60):02}:{int(seg['end'] % 60):02}" ] for seg in segments ] texts = [seg.get("text", "").strip() for seg in segments] return timestamps, texts def tosec(t): h, m, s = map(float, t.split(":")) return h * 3600 + m * 60 + s def extract_wavv(audio_path, start_sec, end_sec, out_path): waveform, sr = torchaudio.load(audio_path) start_frame = int(sr * start_sec) end_frame = int(sr * end_sec) segment = waveform[:, start_frame:end_frame] torchaudio.save(out_path, segment, sample_rate=sr) def get_emotion_from_segment(wav_path, model, kwargs): try: res = model.inference( data_in=wav_path, language="en", use_itn=True, ban_emo_unk=True, use_emo=True, output_emo=True, output_emo_prob=True, output_timestamp=False, **kwargs ) return res[0][0]['text'].split('|')[3] except Exception as e: return f"error: {e}" def Audio_to_emotion(audio_path, timestamps): """ ➡️ Donne les émotions pour chaque segment défini par timestamps dans un audio donné. Args: audio_path (str): chemin vers le fichier audio (.wav) timestamps (list): liste de paires ['start', 'end'] en format 'hh:mm:ss' Returns: list: liste des émotions détectées """ # Charger le modèle une seule fois ici print("🚀 Chargement du modèle SenseVoiceSmall...") model_dir = "iic/SenseVoiceSmall" model, kwargs = SenseVoiceSmall.from_pretrained(model=model_dir, device="cuda:0") # 'cuda:0' for GPU, 'cpu' for CPU model.eval() emotions = [] for t_start, t_end in timestamps: start_sec = tosec(t_start) end_sec = tosec(t_end) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: temp_wav_path = temp_wav.name try: extract_wavv(audio_path, start_sec, end_sec, temp_wav_path) res = model.inference( data_in=temp_wav_path, language="en", use_itn=True, ban_emo_unk=True, use_emo=True, output_emo=True, output_emo_prob=True, output_timestamp=False, **kwargs ) emotion = res[0][0]['text'].split('|')[3] except Exception as e: emotion = f"error: {e}" finally: if os.path.exists(temp_wav_path): os.remove(temp_wav_path) emotions.append(emotion) return emotions def detect_hate_speech_in_audio(audio_path , include_intervals,Co2_release): ## TODO : Implement the hate speech detection in audio speech_ranges = include_intervals timestamps = [] texts = [] emotions = [] # Speech_ranges_to_timestamps timestamps, texts = speech_ranges_to_timestamps(audio_path, speech_ranges) # audio_to_emotion emotions = Audio_to_emotion(audio_path,timestamps) exploded_df = pd.DataFrame({ "timestamp": timestamps, "text": texts, "emotion": emotions }) exploded_df.head() exploded_df["text"] = exploded_df["text"].apply(clean_text_light) if Co2_release == "low": df = EmoHateBert_predict(exploded_df, "student_distilled_EmoHateBERT.pt", device="cpu") elif Co2_release == "medium": df = EmoHateBert_predict(exploded_df, "EmoHateBert_teacher.pt", device="cpu") elif Co2_release == "high": df = EmoHateBert_predict(exploded_df, "EmoHateBert_teacher.pt", device="cpu") hate_speech_time_audio = [timestamp for timestamp, text, emotion, label in zip(df["timestamp"], df["text"], df["emotion"], df["predicted_label"]) if label == 1] return hate_speech_time_audio def merge_consecutive(group): merged = [] current_start = group['timestamp'].iloc[0][0] current_end = group['timestamp'].iloc[0][1] current_text = group['text'].iloc[0] for i in range(1, len(group)): prev_end = group['timestamp'].iloc[i - 1][1] curr_start, curr_end_val = group['timestamp'].iloc[i] if prev_end == curr_start: current_end = curr_end_val current_text += ' ' + group['text'].iloc[i] else: merged.append({ 'timestamp': f"{current_start} - {current_end}", 'text': current_text, 'emotion': group['emotion'].iloc[i - 1], 'hate_snippet': group['hate_snippet'].iloc[i - 1] }) current_start = curr_start current_end = curr_end_val current_text = group['text'].iloc[i] merged.append({ 'timestamp': f"{current_start} - {current_end}", 'text': current_text, 'emotion': group['emotion'].iloc[-1], 'hate_snippet': group['hate_snippet'].iloc[-1] }) return pd.DataFrame(merged) def clean_text_light(text): # Supprime les caractères très spéciaux, mais garde les lettres, chiffres et ponctuation classique return re.sub(r"[^\w\s.,!?'-]", "", text) def get_label_hate(timestamp, snippets): t_start, t_end = map(time_to_seconds, timestamp) label = 0 if snippets is None: return 0 for snippet in snippets: s_start, s_end = map(time_to_seconds, snippet) if t_start >= s_start and t_end <= s_end: return 1 # entièrement inclus elif t_start < s_end and t_end > s_start: label = 2 # partiellement inclus return label def explode_row(row): timestamps = eval(row['Timestamps']) texts = eval(row['Texts']) emotions = eval(row['emotion']) hate_snippet = eval(row['hate_snippet']) if pd.notna(row['hate_snippet']) else [None] * len(timestamps) return pd.DataFrame({ "hate_snippet": [hate_snippet] * len(timestamps), "timestamp": timestamps, "text": texts, "emotion": emotions }) def clean_hate_snippet(snippet): if isinstance(snippet, list) and snippet and snippet[0] is None: return None return snippet from torch.utils.data import DataLoader, Dataset class FocalLoss(nn.Module): def __init__(self, alpha=1, gamma=2): super().__init__() self.alpha = alpha self.gamma = gamma self.ce = nn.CrossEntropyLoss(reduction='none') # important pour Focal def forward(self, logits, targets): ce_loss = self.ce(logits, targets) pt = torch.exp(-ce_loss) focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss return focal_loss.mean() class BertWithEmotion(nn.Module): def __init__(self, emotion_vocab_size=5, emotion_dim=16, num_labels=2, class_weights=None, use_focal=False, focal_alpha=1, focal_gamma=2): super().__init__() self.bert = BertModel.from_pretrained("bert-base-uncased") self.bert_hidden = self.bert.config.hidden_size self.emotion_embed = nn.Embedding(emotion_vocab_size, emotion_dim) self.dropout = nn.Dropout(0.3) self.classifier = nn.Linear(self.bert_hidden + emotion_dim, num_labels) if use_focal: self.criterion = FocalLoss(alpha=focal_alpha, gamma=focal_gamma) else: if class_weights is not None: self.criterion = nn.CrossEntropyLoss(weight=class_weights) else: self.criterion = nn.CrossEntropyLoss() def forward(self, input_ids, attention_mask, emotion_id, labels=None): bert_out = self.bert(input_ids=input_ids, attention_mask=attention_mask) cls_vector = bert_out.last_hidden_state[:, 0, :] emotion_vector = self.emotion_embed(emotion_id) fusion = torch.cat([cls_vector, emotion_vector], dim=1) fusion = self.dropout(fusion) logits = self.classifier(fusion) if labels is not None: loss = self.criterion(logits, labels) return loss, logits return logits def EmoHateBert_predict(df, model_path, emotion2id=None, device='cpu'): # Vérification et valeurs par défaut if emotion2id is None: emotion2id = {'ANGRY': 0, 'DISGUSTED': 1, 'FEARFUL': 2, 'HAPPY': 3, 'NEUTRAL': 4, 'SAD': 5, 'SURPRISED': 6, 'UNKNOWN': 7} # Nettoyer les données df = df[["timestamp", "text", "emotion"]].dropna() df["emotion"] = df["emotion"].fillna("").astype(str).str.upper() # Tokenizer tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") # Dataset sans besoin de labels class HateSpeechDatasetPredict(torch.utils.data.Dataset): def __init__(self, texts, emotions, tokenizer, emotion2id): self.texts = texts self.emotions = emotions self.tokenizer = tokenizer self.emotion2id = emotion2id def __len__(self): return len(self.texts) def __getitem__(self, idx): text = self.texts[idx] emotion = self.emotions[idx] tokens = self.tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors='pt') return { 'input_ids': tokens['input_ids'].squeeze(0), 'attention_mask': tokens['attention_mask'].squeeze(0), 'emotion_id': torch.tensor(self.emotion2id.get(emotion, self.emotion2id["UNKNOWN"])) } dataset = HateSpeechDatasetPredict( texts=df["text"].tolist(), emotions=df["emotion"].tolist(), tokenizer=tokenizer, emotion2id=emotion2id ) loader = DataLoader(dataset, batch_size=16) # Charger le modèle model = BertWithEmotion(emotion_vocab_size=len(emotion2id), emotion_dim=16, num_labels=2) model.load_state_dict(torch.load(model_path, map_location=device), strict=False) model.eval() model.to(device) # Prédictions all_preds = [] with torch.no_grad(): for batch in loader: input_ids = batch["input_ids"].to(device) attention_mask = batch["attention_mask"].to(device) emotion_id = batch["emotion_id"].to(device) logits = model(input_ids, attention_mask, emotion_id) preds = torch.argmax(logits, dim=1) all_preds.extend(preds.cpu().numpy()) df["predicted_label"] = all_preds return df # SECOND FILTER : Hate Speech Detection CLIP (symbole, geste obscene ... etc) # Charger le modèle CLIP et son processor clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") # Descriptions (prompts) de classes visuelles possibles visual_labels = [ # 🟥 Hateful (13 labels bien distincts) "symbol of racial hatred", "violence against minorities", "racist or extremist graffiti", "homophobic hate gesture", "nazi salute", "white supremacist protest", "burning of religious symbols", "aggressive crowd with hate signs", "physical attack during a livestream", "threatening gesture on webcam", "explicit insult written on a sign", "harassment of LGBTQ individuals", "extremist propaganda banner", "showing the middle finger", # 🟧 Visually intense but not necessarily hateful (8) "chaotic concert crowd", "people shouting in a protest", "police intervention in a public place", "fire on a stage during performance", "public fight in the street", "crowd mosh pit", "person yelling in a video call", "people arguing loudly", # 🟩 Safe / Non-hateful (19) "group of friends taking selfies", "people dancing together", "family celebration", "peaceful protest", "friendly street interview", "musician playing at a concert", "teenagers laughing on a call", "people hugging", "family dinner at home", "children playing outside", "teacher explaining to students", "Snapchat selfie with filter", "artistic mural in the street", "volunteers helping each other", "public event with diverse people", "sports activity with teammates", "respectful online conversation", "people cheering at a show", "cultural dance performance" ] def detect_visual_hate_clip(image_path): image = Image.open(image_path).convert("RGB") # Préparer les entrées pour CLIP inputs = clip_processor(text=visual_labels, images=image, return_tensors="pt", padding=True) # Obtenir les similarités image ↔ texte with torch.no_grad(): outputs = clip_model(**inputs) logits_per_image = outputs.logits_per_image probs = logits_per_image.softmax(dim=1).squeeze() results = {label: float(probs[i]) for i, label in enumerate(visual_labels)} hateful_labels = visual_labels[:14] safe_labels = visual_labels[14:] hate_scores = [results[label] for label in hateful_labels] safe_scores = [results[label] for label in safe_labels] # Moyenne pondérée (plus stable que max) avg_hate = sum(hate_scores) / len(hate_scores) avg_safe = sum(safe_scores) / len(safe_scores) # Meilleur score absolu (pour justifier le label final) top_label = max(results, key=results.get) top_score = results[top_label] # Analyse : marge de différence delta = abs(avg_hate - avg_safe) # Définir le label selon logique avancée if delta < 0.05 and top_score < 0.3: final_label = "Uncertain" elif avg_hate * 0.85 > avg_safe : final_label = "Hate" else: final_label = "Safe" return { "label": final_label, "confidence_gap": round(delta, 4), "top_label": top_label, "top_score": round(top_score, 4), "avg_hate_score": round(avg_hate, 4), "avg_safe_score": round(avg_safe, 4), "all_scores": results } def detect_hate_speech_CLIP( video_path: str, sampling_time_froid: float, sampling_time_chaud: float, time_to_recover: float, merge_final_snippet_time: float, detect_visual_hate_clip=None, skip_intervals=None ): if detect_visual_hate_clip is None: raise ValueError("You must provide a detect_visual_hate_clip function") if skip_intervals is None: skip_intervals = [] def is_skipped(time_sec): for start_str, end_str in skip_intervals: start = sum(int(x) * 60 ** i for i, x in enumerate(reversed(start_str.split(":")))) end = sum(int(x) * 60 ** i for i, x in enumerate(reversed(end_str.split(":")))) if start <= time_sec <= end: return True return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Could not open video file") fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps current_time = 0.0 state = "froid" time_in_chaud = 0.0 hate_timestamps = [] while current_time < duration: if is_skipped(current_time): current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid continue cap.set(cv2.CAP_PROP_POS_MSEC, current_time * 1000) ret, frame = cap.read() if not ret: break temp_image_path = "/tmp/temp_frame.jpg" cv2.imwrite(temp_image_path, frame) result = detect_visual_hate_clip(temp_image_path) os.remove(temp_image_path) if result.get("label") == "Hate": hate_timestamps.append(current_time) state = "chaud" time_in_chaud = 0.0 elif state == "chaud": time_in_chaud += sampling_time_chaud if time_in_chaud >= time_to_recover: state = "froid" current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid cap.release() # Étendre et fusionner les intervalles intervals = [(max(0, t - merge_final_snippet_time), min(duration, t + merge_final_snippet_time)) for t in hate_timestamps] merged_intervals = [] for start, end in sorted(intervals): if not merged_intervals or start > merged_intervals[-1][1]: merged_intervals.append([start, end]) else: merged_intervals[-1][1] = max(merged_intervals[-1][1], end) formatted_intervals = [[seconds_to_hhmmss(start), seconds_to_hhmmss(end)] for start, end in merged_intervals] return formatted_intervals # THIRD FILTER : Hate Speech Detection in text from image def seconds_to_hhmmss(seconds): return str(timedelta(seconds=int(seconds))) reader = easyocr.Reader(['en']) # detects the language of the text nlp_classifier = pipeline("text-classification", model="Hate-speech-CNERG/dehatebert-mono-english") def detect_hate_speech_in_image(image_path): # 🖼️ OCR text_blocks = reader.readtext(image_path, detail=0) full_text = " ".join(text_blocks).strip() if not full_text: return { "text": None, "hate_detected": False, "score": 0.0, "reason": "No text detected" } # 🧠 NLP (classification hate speech) prediction = nlp_classifier(full_text)[0] return { "text": full_text, "hate_detected": prediction['label'].lower() == 'hate', "score": float(prediction['score']), "reason": prediction['label'] } def detect_hate_speech_OCR( video_path: str, sampling_time_froid: float, sampling_time_chaud: float, time_to_recover: float, merge_final_snippet_time: float, detect_hate_speech_in_image=None, skip_intervals=None # nouvelle option : intervalles à ignorer ): if detect_hate_speech_in_image is None: raise ValueError("You must provide a detect_hate_speech_in_image function") if skip_intervals is None: skip_intervals = [] def seconds_to_hhmmss(seconds): from datetime import timedelta return str(timedelta(seconds=int(seconds))) def is_skipped(time_sec): for start_str, end_str in skip_intervals: start = sum(int(x) * 60 ** i for i, x in enumerate(reversed(start_str.split(":")))) end = sum(int(x) * 60 ** i for i, x in enumerate(reversed(end_str.split(":")))) if start <= time_sec <= end: return True return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Could not open video file") fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps current_time = 0.0 state = "froid" time_in_chaud = 0.0 hate_timestamps = [] while current_time < duration: if is_skipped(current_time): current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid continue cap.set(cv2.CAP_PROP_POS_MSEC, current_time * 1000) ret, frame = cap.read() if not ret: break temp_image_path = "/tmp/temp_frame.jpg" cv2.imwrite(temp_image_path, frame) result = detect_hate_speech_in_image(temp_image_path) os.remove(temp_image_path) if result.get("hate_detected", False): hate_timestamps.append(current_time) state = "chaud" time_in_chaud = 0.0 elif state == "chaud": time_in_chaud += sampling_time_chaud if time_in_chaud >= time_to_recover: state = "froid" current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid cap.release() # Étendre et fusionner les intervalles intervals = [(max(0, t - merge_final_snippet_time), min(duration, t + merge_final_snippet_time)) for t in hate_timestamps] merged_intervals = [] for start, end in sorted(intervals): if not merged_intervals or start > merged_intervals[-1][1]: merged_intervals.append([start, end]) else: merged_intervals[-1][1] = max(merged_intervals[-1][1], end) formatted_intervals = [[seconds_to_hhmmss(start), seconds_to_hhmmss(end)] for start, end in merged_intervals] return formatted_intervals # FINAL FUNCTION def merge_all_snippet_groups(list_of_snippet_lists): all_segments = [] # Aplatir et convertir en secondes for snippet_list in list_of_snippet_lists: for start, end in snippet_list: all_segments.append([time_to_seconds(start), time_to_seconds(end)]) # Trier et fusionner all_segments.sort() merged = [] for seg in all_segments: if not merged or seg[0] > merged[-1][1]: merged.append(seg) else: merged[-1][1] = max(merged[-1][1], seg[1]) # Reformat en HH:MM:SS return [[format_time(start), format_time(end)] for start, end in merged] def merge_and_expand_timestamps(timestamps, expand_seconds=1, max_gap=1): """ - Élargit chaque timestamp de 'expand_seconds' secondes de chaque côté. - Puis fusionne les timestamps qui se touchent (gap <= max_gap). timestamps : liste de [start, end] au format 'HH:MM:SS' expand_seconds : nombre de secondes à ajouter avant et après chaque intervalle max_gap : gap maximum pour merger """ if not timestamps: return [] # Convertir string -> datetime def str_to_time(s): return datetime.strptime(s, "%H:%M:%S") # Convertir datetime -> string def time_to_str(t): return t.strftime("%H:%M:%S") # Étendre chaque intervalle expanded = [] for start_str, end_str in timestamps: start = str_to_time(start_str) - timedelta(seconds=expand_seconds) end = str_to_time(end_str) + timedelta(seconds=expand_seconds) start = max(start, datetime.strptime("00:00:00", "%H:%M:%S")) # éviter temps négatif expanded.append([start, end]) # Maintenant fusionner merged = [] current_start, current_end = expanded[0] for start, end in expanded[1:]: if (start - current_end).total_seconds() <= max_gap: current_end = max(current_end, end) else: merged.append([time_to_str(current_start), time_to_str(current_end)]) current_start, current_end = start, end merged.append([time_to_str(current_start), time_to_str(current_end)]) return merged def better_normalized_duration(video_duration): """ Vidéo courte → précision maximale. Vidéo longue → analyse relâchée. Normalisation douce selon la durée. """ # On travaille directement en minutes duration_min = video_duration / 60 # Normalisation progressive : # 0 min → 0 # 5 min → 0.2 # 10 min → 0.4 # 20 min → 0.7 # 30 min ou + → 1 if duration_min <= 5: return duration_min / 25 # max 0.2 pour 5 min elif duration_min <= 10: return 0.2 + (duration_min - 5) / 25 # ajoute jusqu’à 0.4 elif duration_min <= 20: return 0.4 + (duration_min - 10) / 25 # ajoute jusqu’à 0.8 else: return min(1, 0.8 + (duration_min - 20) / 20) # plafonne à 1 après 30 min def adjust_parameters(base_params, video_duration, min_factor=0.6, max_factor=1.6): """ Ajuste les paramètres : - petite vidéo → plus précis (params plus petits) - grande vidéo → moins précis (params plus grands) base_params = [sampling_froid, sampling_chaud, time_to_recover, merge_time] """ # Normaliser la durée entre 0 et 1 (ex: 0 min → 0, 30 min → ~0.5, 60 min → 1) normalized_duration = better_normalized_duration(video_duration) # Calcul du facteur d'ajustement entre min_factor et max_factor # Petite vidéo → facteur proche de min_factor # Grande vidéo → facteur proche de max_factor factor = min_factor + (max_factor - min_factor) * normalized_duration sampling_froid = max(1, int(base_params[0] * factor)) sampling_chaud = max(1, int(base_params[1] * (0.5 * factor + 0.5))) time_to_recover = int(base_params[2] * (0.5 * factor + 0.5)) merge_final = base_params[3] return [sampling_froid, sampling_chaud, time_to_recover, merge_final] def detectHateSpeechSmartFilter(Video_path, Co2_release = "low"): tracker = EmissionsTracker(log_level="error" , allow_multiple_runs=True) tracker.start() video_duration = get_video_duration(Video_path) if video_duration is None: raise Exception("Impossible de lire la vidéo.") if Co2_release == "low": CRC = [4, 2, 5, 4] Clip = [11, 3, 10, 3] elif Co2_release == "medium": CRC = [3, 2, 10, 3] Clip = [9, 4, 10, 2] elif Co2_release == "high": CRC = [2, 1, 20, 1] Clip = [7,1, 10, 2] CRC = adjust_parameters(CRC, video_duration, min_factor=0.6, max_factor=3) Clip = adjust_parameters(Clip, video_duration, min_factor=0.4, max_factor=1.2) # Name of the video Name = os.path.splitext(os.path.basename(Video_path))[0] # Extraction de l'audio extract_audio(Video_path, Name +".wav") # Prétraitement de l'audio speech_ranges = preprocess_audio(Name +".wav", Name +"clean.wav") # first filter : hate speech detection in audio os.remove(Name +".wav") hate_speech_time_audio = detect_hate_speech_in_audio(Name +"clean.wav", include_intervals = speech_ranges , Co2_release = Co2_release) os.remove(Name +"clean.wav") print("✅ Filter 1 : Hate speech detection in audio done !" , hate_speech_time_audio) # second filter : hate speech detection CLIP (obscene gesture, symbol ... etc) hate_speech_time_CLIP = detect_hate_speech_CLIP( video_path=Video_path, sampling_time_froid= Clip[0], sampling_time_chaud= Clip[1], time_to_recover= Clip[2], merge_final_snippet_time= Clip[3], detect_visual_hate_clip= detect_visual_hate_clip, skip_intervals=hate_speech_time_audio ) print("✅ Filter 2 : Hate speech detection using text embedding done !" , hate_speech_time_CLIP) # third filter : hate speech detection in text from image hate_speech_time_image_text = detect_hate_speech_OCR( video_path=Video_path, sampling_time_froid= CRC[0], sampling_time_chaud= CRC[1], time_to_recover= CRC[2], merge_final_snippet_time= CRC[3], detect_hate_speech_in_image=detect_hate_speech_in_image, skip_intervals= merge_all_snippet_groups([hate_speech_time_CLIP, hate_speech_time_audio]) ) print("✅ Filter 3 : Hate speech detection using text from image done !", hate_speech_time_image_text) hate_speech_time = merge_all_snippet_groups([hate_speech_time_audio, hate_speech_time_CLIP, hate_speech_time_image_text]) print("✅ All filters done !" , hate_speech_time , "Hate speech detected !" , "C02 emissions : " , tracker.stop()) C02_emissions = tracker.stop() return merge_and_expand_timestamps(hate_speech_time), C02_emissions def Detect_hate_speech_emo_hate_bert(audio_path, Co2_release="low"): tracker = EmissionsTracker(log_level="error", allow_multiple_runs=True) tracker.start() # Nom de la vidéo/audio Name = os.path.splitext(os.path.basename(audio_path))[0] # Conversion de l'audio en audio.wav audio, sr = librosa.load(audio_path, sr=16000) sf.write(Name + ".wav", audio, sr) # Prétraitement de l'audio speech_ranges = preprocess_audio(Name + ".wav", Name + "clean.wav") # Détection de discours haineux dans l'audio hate_speech_time_audio = detect_hate_speech_in_audio( Name + "clean.wav", include_intervals=speech_ranges, Co2_release=Co2_release ) os.remove(Name + "clean.wav") # Arrêt du tracker et récupération des émissions CO₂ CO2_emissions = tracker.stop() print("Hate speech detection in audio done :", hate_speech_time_audio, "Hate speech detected ! / CO₂ emissions :", CO2_emissions) return merge_and_expand_timestamps(hate_speech_time_audio), CO2_emissions