Detection_hate_speech / Implementation.py
Lorissss's picture
Upload 7 files
7eeab7d verified
from pydub import AudioSegment
import noisereduce as nr
import librosa
import soundfile as sf
import torchaudio
from silero_vad import get_speech_timestamps, collect_chunks
import io
import numpy as np
from pyannote.audio.pipelines import SpeakerDiarization
from pyannote.core import Segment
from tqdm import tqdm
import pandas as pd
import subprocess
import sys
import os
import torch
import torch.nn as nn
import cv2
from datetime import timedelta
import easyocr
from transformers import pipeline
from codecarbon import EmissionsTracker
from transformers import CLIPProcessor, CLIPModel, AutoTokenizer, BertModel
from PIL import Image
import re
import ast
import tempfile
from model import SenseVoiceSmall
from funasr.utils.postprocess_utils import rich_transcription_postprocess
import csv
import whisper
from datetime import datetime
# 📌 Chargement du modèle Silero VAD
model_and_utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=True,
trust_repo=True # Évite l'avertissement "untrusted repository"
)
# 📌 Extraction correcte des éléments du tuple
model = model_and_utils[0] # Le modèle PyTorch
utils_tuple = model_and_utils[1] # Tuple contenant les fonctions utilitaires
# 📌 Assignation des fonctions utiles
get_speech_timestamps = utils_tuple[0] # Fonction de détection des segments parlés
save_audio = utils_tuple[1] # Fonction de sauvegarde audio (optionnelle)
read_audio = utils_tuple[2] # Fonction de lecture de l'audio
VADIterator = utils_tuple[3] # Classe pour gérer le VAD
collect_chunks = utils_tuple[4] # Fonction pour extraire les morceaux de speech
# FONCTION D'EXTRACTION DE L'AUDIO
def extract_audio(video_path, output_audio_path):
'''
Explication des options ffmpeg
-ac 1 → Convertit l’audio en mono
-ar 16000 → Définit la fréquence d’échantillonnage à 16 kHz (utile pour certaines applications)
-q:a 0 → Qualité audio maximale
-map a → Extrait uniquement la piste audio
-vn → Désactive la vidéo
'''
command = f'ffmpeg -i "{video_path}" -vn -ac 1 -ar 16000 -q:a 0 -map a "{output_audio_path}"'
subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if os.path.exists(output_audio_path):
print(f"✅ Audio extracted successfully : {output_audio_path}")
else:
print(f"❌ Echec of audio extraction : {video_path}")
def extract_all_audio(Video_folder, Audio_folder):
print("##########################################")
for video in os.listdir(Video_folder):
if video.endswith(".mp4"):
video_path = os.path.join(Video_folder, video)
audio_path = os.path.join(Audio_folder, video.replace(".mp4", ".wav"))
extract_audio(video_path , audio_path)
print("Extraction de l'audio terminée !")
print("##########################################")
def time_to_seconds(time_str):
"""Convertit une chaîne de temps HH:MM:SS en secondes."""
h, m, s = map(int, time_str.split(":"))
return h * 3600 + m * 60 + s
def extract_snippets(audio_path, output_path, snippets):
"""
Extrait et concatène des parties spécifiques d'un fichier audio.
:param audio_path: Chemin du fichier audio d'entrée
:param output_path: Chemin du fichier audio de sortie
:param snippets: Liste de listes [["HH:MM:SS", "HH:MM:SS"]]
"""
audio = AudioSegment.from_file(audio_path)
extracted_audio = AudioSegment.empty()
for start, end in snippets:
start_sec = time_to_seconds(start)
end_sec = time_to_seconds(end)
extracted_audio += audio[start_sec * 1000:end_sec * 1000]
# Sauvegarde du fichier final
extracted_audio.export(output_path, format="wav")
def get_video_duration(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Erreur ouverture vidéo")
return None
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
duration = frame_count / fps
cap.release()
return duration
# PREPROCESSING AUDIOS
def format_time(seconds):
"""Convertit un temps en secondes vers HH:MM:SS"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
return f"{h:02}:{m:02}:{s:02}"
def reduce_noise(audio, sr):
"""
Applique une réduction de bruit sur l'audio.
"""
return nr.reduce_noise(y=audio, sr=sr)
def save_audio(audio, sr, output_path):
"""
Sauvegarde un fichier audio au format WAV.
"""
sf.write(output_path, audio, sr)
def detect_music_and_voice(audio, sr):
"""
Détecte si l'audio contient de la musique et identifie si une voix est présente avec.
Utilise MFCCs, Zero Crossing Rate (ZCR) et analyse spectrale pour différencier :
- Musique seule
- Voix seule
- Voix + Musique
"""
# 🔹 Analyse des MFCCs (signature musique vs voix)
mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
mfcc_var = np.var(mfcc, axis=1)
# 🔹 Calcul du Zero Crossing Rate (ZCR) → Détecte les transitions rapides dans la musique
zcr = librosa.feature.zero_crossing_rate(y=audio)
avg_zcr = np.mean(zcr)
# 🔹 Analyse du spectrogramme → Formants vocaux
spec = librosa.amplitude_to_db(librosa.stft(audio), ref=np.max)
vocal_energy = np.mean(spec[50:300, :]) # 50Hz-300Hz = fréquences vocales
# 🔹 Détection de musique pure vs voix
is_music = np.mean(mfcc_var) < 50 and avg_zcr > 0.05
is_voice = vocal_energy > -20 # Plus de -20dB dans les fréquences vocales = voix présente
if is_music and is_voice:
return "Voix + Musique"
elif is_music:
return "Musique seule"
elif is_voice:
return "Voix seule"
else:
return "Silence"
def expand_and_merge_speech_timestamps(speech_timestamps, sr=16000, margin=1.5):
"""
Élargit chaque segment parlé de ±margin (en secondes), puis fusionne les chevauchements.
Fonctionne directement sur les échantillons.
"""
# Étape 1 : élargir
expanded = []
margin_samples = int(margin * sr)
for seg in speech_timestamps:
start = max(seg['start'] - margin_samples, 0)
end = seg['end'] + margin_samples
expanded.append([start, end])
# Étape 2 : fusionner
expanded.sort()
merged = []
for seg in expanded:
if not merged or seg[0] > merged[-1][1]:
merged.append(seg)
else:
merged[-1][1] = max(merged[-1][1], seg[1])
# Étape 3 : retransformer en format [{'start': x, 'end': y}]
return [{'start': start, 'end': end} for start, end in merged]
def preprocess_audio(input_path, output_path, threshold_CDA = 0.2):
"""
Nettoie l'audio et conserve la même durée en remplaçant les silences par du silence audio.
- input_path : Chemin du fichier audio en entrée (16 kHz, mono)
- output_path : Chemin du fichier nettoyé
- vad_threshold : Sensibilité du VAD (0.3 = sensible, 0.5 = normal, 0.7 = strict)
Retourne :
- Un fichier audio nettoyé avec la même durée
- Une liste des timestamps des parties parlées
"""
# 🔹 1. Chargement de l'audio /music ...
audio, sr = librosa.load(input_path, sr=16000) # Assure un échantillonnage à 16kHz
original_duration = len(audio) # Nombre d'échantillons
"""
# Détection de musique et voix
category = detect_music_and_voice(audio, sr)
if category == "Voix + Musique":
threshold = 0.4 # 🎵 Voix dans la musique → Capture bien la parole
elif category == "Musique seule":
threshold = 0.8 # 🎵 Musique seule → Ignorer
elif category == "Voix seule":
threshold = 0.3 # 🎙️ Seulement Voix → Capturer toute la parole
else:
threshold = 0.7 # Silence ou bruit → Ignorer
"""
threshold = threshold_CDA
# 🔹 2. Réduction du bruit
audio = nr.reduce_noise(y=audio, sr=sr)
# 🔹 3. Détection des segments parlés
speech_timestamps = get_speech_timestamps(audio, model,sampling_rate=sr, threshold=threshold)
# 🔹 4. Création d'un nouvel audio avec silences à la place des blancs
cleaned_audio = np.zeros(original_duration, dtype=np.float32) # Commence par du silence total
speech_ranges = []
for seg in expand_and_merge_speech_timestamps(speech_timestamps):
start_sample, end_sample = seg['start'], seg['end']
cleaned_audio[start_sample:end_sample] = audio[start_sample:end_sample] # Remet les parties parlées
speech_ranges.append([format_time(start_sample / sr), format_time(end_sample / sr)]) # Sauvegarde timestamps
# 🔹 5. Sauvegarde de l'audio nettoyé avec silences
sf.write(output_path, cleaned_audio, sr)
print(f"✅ Audio cleaned : {output_path}")
#print(f"🎵 Catégorie détectée : {category} → Threshold = {threshold}")
#print(f"🎙️ Segments parlés détectés : {speech_ranges}")
return speech_ranges
import os
import pandas as pd
def preprocess_all_audio(audio_path, output_audio_clean_path):
data = []
for i, audio_file in enumerate(os.listdir(audio_path)):
if audio_file.endswith(".wav"):
input_audio_path = os.path.join(audio_path, audio_file)
output_clean_path = os.path.join(output_audio_clean_path, audio_file)
speech_ranges = preprocess_audio(input_audio_path, output_clean_path)
data.append({"audio_name": audio_file, "speech_ranges": speech_ranges})
df = pd.DataFrame(data)
if data: # Vérifie si au moins un fichier a été traité
print(f"✅ {len(data)} fichiers audio nettoyés avec succès.")
else:
print(f"❌ Aucun fichier audio n'a été traité.")
return df
# FIRST FILTER : Hate speech detection in audio
def load_whisper_model(model_name: str = "base"):
return whisper.load_model(model_name)
def extract_audi_range(audio_path, start, end):
audio = AudioSegment.from_wav(audio_path)
segment = audio[start * 1000:end * 1000] # convert to milliseconds
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
segment.export(temp_file.name, format="wav")
return temp_file.name
def parse_timstamp(ts):
if isinstance(ts, (float, int)):
return float(ts)
if isinstance(ts, str) and ":" in ts:
h, m, s = ts.split(":")
return int(h) * 3600 + int(m) * 60 + float(s)
return float(ts)
def transcribe_audio(model, audio_path: str, speech_ranges=None) -> dict:
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
if not speech_ranges:
return whisper.transcribe(model, audio_path)
all_segments = []
for start, end in speech_ranges:
if end - start < 1.0:
print(f"Skipped short segment: {start}-{end} (less than 1 second)")
continue
temp_path = extract_audi_range(audio_path, start, end)
try:
partial_result = whisper.transcribe(
model, temp_path,
condition_on_previous_text=False,
no_speech_threshold=0.0
)
for seg in partial_result.get("segments", []):
seg["start"] += start
seg["end"] += start
all_segments.extend(partial_result.get("segments", []))
except Exception as e:
print(f"Error transcribing segment {start}-{end} of {audio_path}: {e}")
finally:
os.remove(temp_path)
return {"segments": all_segments}
def process_dataset(dataset_path: str, model, input_csv: str, output_csv: str) -> None:
with open(input_csv, mode='r', newline='', encoding='utf-8') as infile:
reader = csv.reader(infile)
header = next(reader)
rows = list(reader)
with open(output_csv, mode='w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(header + ["Timestamps", "Texts"])
for row in rows:
video_file_name = row[0].replace(".mp4", ".wav")
video_hate_speech = row[1]
speech_ranges_str = row[5]
print(f"Speech ranges: {speech_ranges_str} for {video_file_name}")
print(f"Row data: {row}")
try:
if not speech_ranges_str.strip():
raise ValueError("Empty speech range")
raw_ranges = ast.literal_eval(speech_ranges_str)
speech_ranges = [(parse_timstamp(start), parse_timstamp(end)) for start, end in raw_ranges]
except Exception as e:
print(f"Invalid speech_ranges for {video_file_name}: {speech_ranges_str}{e}")
continue
folder = "hate_audios_clean" if video_hate_speech == "Hate" else "non_hate_audios_clean"
audio_path = os.path.abspath(os.path.join(dataset_path, folder, video_file_name))
print(f"Processing: {audio_path}")
try:
result = transcribe_audio(model, audio_path, speech_ranges)
segments = result.get("segments", [])
timestamps = [[
f"{int(seg['start'] // 3600):02}:{int((seg['start'] % 3600) // 60):02}:{int(seg['start'] % 60):02}",
f"{int(seg['end'] // 3600):02}:{int((seg['end'] % 3600) // 60):02}:{int(seg['end'] % 60):02}"
] for seg in segments]
texts = [seg.get("text", "") for seg in segments]
writer.writerow(row + [timestamps, texts])
except Exception as e:
print(f"Error processing {video_file_name}: {e}")
print(f"Transcription results saved to {output_csv}")
def speech_ranges_to_timestamps(audio_path, speech_ranges, model_name="base"):
"""
Transcribe only the specified speech_ranges from the given WAV file
and return aligned timestamps and texts.
Args:
audio_path (str): Path to the .wav audio file.
speech_ranges (list of tuple): List of (start, end) times in seconds or "HH:MM:SS" strings.
model_name (str): Whisper model size to load (default "base").
Returns:
timestamps (list of [str, str]): List of [start_ts, end_ts] strings "HH:MM:SS".
texts (list of str): List of transcribed text for each segment.
"""
# load model
model = load_whisper_model(model_name)
# parse any string timestamps into floats
parsed_ranges = [
(parse_timstamp(start), parse_timstamp(end))
for start, end in speech_ranges
]
# run transcription on each segment
result = transcribe_audio(model, audio_path, parsed_ranges)
segments = result.get("segments", [])
# format output
timestamps = [
[
f"{int(seg['start'] // 3600):02}:{int((seg['start'] % 3600) // 60):02}:{int(seg['start'] % 60):02}",
f"{int(seg['end'] // 3600):02}:{int((seg['end'] % 3600) // 60):02}:{int(seg['end'] % 60):02}"
]
for seg in segments
]
texts = [seg.get("text", "").strip() for seg in segments]
return timestamps, texts
def tosec(t):
h, m, s = map(float, t.split(":"))
return h * 3600 + m * 60 + s
def extract_wavv(audio_path, start_sec, end_sec, out_path):
waveform, sr = torchaudio.load(audio_path)
start_frame = int(sr * start_sec)
end_frame = int(sr * end_sec)
segment = waveform[:, start_frame:end_frame]
torchaudio.save(out_path, segment, sample_rate=sr)
def get_emotion_from_segment(wav_path, model, kwargs):
try:
res = model.inference(
data_in=wav_path,
language="en",
use_itn=True,
ban_emo_unk=True,
use_emo=True,
output_emo=True,
output_emo_prob=True,
output_timestamp=False,
**kwargs
)
return res[0][0]['text'].split('|')[3]
except Exception as e:
return f"error: {e}"
def Audio_to_emotion(audio_path, timestamps):
"""
➡️ Donne les émotions pour chaque segment défini par timestamps dans un audio donné.
Args:
audio_path (str): chemin vers le fichier audio (.wav)
timestamps (list): liste de paires ['start', 'end'] en format 'hh:mm:ss'
Returns:
list: liste des émotions détectées
"""
# Charger le modèle une seule fois ici
print("🚀 Chargement du modèle SenseVoiceSmall...")
model_dir = "iic/SenseVoiceSmall"
model, kwargs = SenseVoiceSmall.from_pretrained(model=model_dir, device="cuda:0") # 'cuda:0' for GPU, 'cpu' for CPU
model.eval()
emotions = []
for t_start, t_end in timestamps:
start_sec = tosec(t_start)
end_sec = tosec(t_end)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav:
temp_wav_path = temp_wav.name
try:
extract_wavv(audio_path, start_sec, end_sec, temp_wav_path)
res = model.inference(
data_in=temp_wav_path,
language="en",
use_itn=True,
ban_emo_unk=True,
use_emo=True,
output_emo=True,
output_emo_prob=True,
output_timestamp=False,
**kwargs
)
emotion = res[0][0]['text'].split('|')[3]
except Exception as e:
emotion = f"error: {e}"
finally:
if os.path.exists(temp_wav_path):
os.remove(temp_wav_path)
emotions.append(emotion)
return emotions
def detect_hate_speech_in_audio(audio_path , include_intervals,Co2_release):
## TODO : Implement the hate speech detection in audio
speech_ranges = include_intervals
timestamps = []
texts = []
emotions = []
# Speech_ranges_to_timestamps
timestamps, texts = speech_ranges_to_timestamps(audio_path, speech_ranges)
# audio_to_emotion
emotions = Audio_to_emotion(audio_path,timestamps)
exploded_df = pd.DataFrame({
"timestamp": timestamps,
"text": texts,
"emotion": emotions
})
exploded_df.head()
exploded_df["text"] = exploded_df["text"].apply(clean_text_light)
if Co2_release == "low":
df = EmoHateBert_predict(exploded_df, "student_distilled_EmoHateBERT.pt", device="cpu")
elif Co2_release == "medium":
df = EmoHateBert_predict(exploded_df, "EmoHateBert_teacher.pt", device="cpu")
elif Co2_release == "high":
df = EmoHateBert_predict(exploded_df, "EmoHateBert_teacher.pt", device="cpu")
hate_speech_time_audio = [timestamp for timestamp, text, emotion, label in zip(df["timestamp"], df["text"], df["emotion"], df["predicted_label"]) if label == 1]
return hate_speech_time_audio
def merge_consecutive(group):
merged = []
current_start = group['timestamp'].iloc[0][0]
current_end = group['timestamp'].iloc[0][1]
current_text = group['text'].iloc[0]
for i in range(1, len(group)):
prev_end = group['timestamp'].iloc[i - 1][1]
curr_start, curr_end_val = group['timestamp'].iloc[i]
if prev_end == curr_start:
current_end = curr_end_val
current_text += ' ' + group['text'].iloc[i]
else:
merged.append({
'timestamp': f"{current_start} - {current_end}",
'text': current_text,
'emotion': group['emotion'].iloc[i - 1],
'hate_snippet': group['hate_snippet'].iloc[i - 1]
})
current_start = curr_start
current_end = curr_end_val
current_text = group['text'].iloc[i]
merged.append({
'timestamp': f"{current_start} - {current_end}",
'text': current_text,
'emotion': group['emotion'].iloc[-1],
'hate_snippet': group['hate_snippet'].iloc[-1]
})
return pd.DataFrame(merged)
def clean_text_light(text):
# Supprime les caractères très spéciaux, mais garde les lettres, chiffres et ponctuation classique
return re.sub(r"[^\w\s.,!?'-]", "", text)
def get_label_hate(timestamp, snippets):
t_start, t_end = map(time_to_seconds, timestamp)
label = 0
if snippets is None:
return 0
for snippet in snippets:
s_start, s_end = map(time_to_seconds, snippet)
if t_start >= s_start and t_end <= s_end:
return 1 # entièrement inclus
elif t_start < s_end and t_end > s_start:
label = 2 # partiellement inclus
return label
def explode_row(row):
timestamps = eval(row['Timestamps'])
texts = eval(row['Texts'])
emotions = eval(row['emotion'])
hate_snippet = eval(row['hate_snippet']) if pd.notna(row['hate_snippet']) else [None] * len(timestamps)
return pd.DataFrame({
"hate_snippet": [hate_snippet] * len(timestamps),
"timestamp": timestamps,
"text": texts,
"emotion": emotions
})
def clean_hate_snippet(snippet):
if isinstance(snippet, list) and snippet and snippet[0] is None:
return None
return snippet
from torch.utils.data import DataLoader, Dataset
class FocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.ce = nn.CrossEntropyLoss(reduction='none') # important pour Focal
def forward(self, logits, targets):
ce_loss = self.ce(logits, targets)
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
return focal_loss.mean()
class BertWithEmotion(nn.Module):
def __init__(self, emotion_vocab_size=5, emotion_dim=16, num_labels=2,
class_weights=None, use_focal=False, focal_alpha=1, focal_gamma=2):
super().__init__()
self.bert = BertModel.from_pretrained("bert-base-uncased")
self.bert_hidden = self.bert.config.hidden_size
self.emotion_embed = nn.Embedding(emotion_vocab_size, emotion_dim)
self.dropout = nn.Dropout(0.3)
self.classifier = nn.Linear(self.bert_hidden + emotion_dim, num_labels)
if use_focal:
self.criterion = FocalLoss(alpha=focal_alpha, gamma=focal_gamma)
else:
if class_weights is not None:
self.criterion = nn.CrossEntropyLoss(weight=class_weights)
else:
self.criterion = nn.CrossEntropyLoss()
def forward(self, input_ids, attention_mask, emotion_id, labels=None):
bert_out = self.bert(input_ids=input_ids, attention_mask=attention_mask)
cls_vector = bert_out.last_hidden_state[:, 0, :]
emotion_vector = self.emotion_embed(emotion_id)
fusion = torch.cat([cls_vector, emotion_vector], dim=1)
fusion = self.dropout(fusion)
logits = self.classifier(fusion)
if labels is not None:
loss = self.criterion(logits, labels)
return loss, logits
return logits
def EmoHateBert_predict(df, model_path, emotion2id=None, device='cpu'):
# Vérification et valeurs par défaut
if emotion2id is None:
emotion2id = {'ANGRY': 0, 'DISGUSTED': 1, 'FEARFUL': 2,
'HAPPY': 3, 'NEUTRAL': 4, 'SAD': 5,
'SURPRISED': 6, 'UNKNOWN': 7}
# Nettoyer les données
df = df[["timestamp", "text", "emotion"]].dropna()
df["emotion"] = df["emotion"].fillna("").astype(str).str.upper()
# Tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
# Dataset sans besoin de labels
class HateSpeechDatasetPredict(torch.utils.data.Dataset):
def __init__(self, texts, emotions, tokenizer, emotion2id):
self.texts = texts
self.emotions = emotions
self.tokenizer = tokenizer
self.emotion2id = emotion2id
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
emotion = self.emotions[idx]
tokens = self.tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors='pt')
return {
'input_ids': tokens['input_ids'].squeeze(0),
'attention_mask': tokens['attention_mask'].squeeze(0),
'emotion_id': torch.tensor(self.emotion2id.get(emotion, self.emotion2id["UNKNOWN"]))
}
dataset = HateSpeechDatasetPredict(
texts=df["text"].tolist(),
emotions=df["emotion"].tolist(),
tokenizer=tokenizer,
emotion2id=emotion2id
)
loader = DataLoader(dataset, batch_size=16)
# Charger le modèle
model = BertWithEmotion(emotion_vocab_size=len(emotion2id), emotion_dim=16, num_labels=2)
model.load_state_dict(torch.load(model_path, map_location=device), strict=False)
model.eval()
model.to(device)
# Prédictions
all_preds = []
with torch.no_grad():
for batch in loader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
emotion_id = batch["emotion_id"].to(device)
logits = model(input_ids, attention_mask, emotion_id)
preds = torch.argmax(logits, dim=1)
all_preds.extend(preds.cpu().numpy())
df["predicted_label"] = all_preds
return df
# SECOND FILTER : Hate Speech Detection CLIP (symbole, geste obscene ... etc)
# Charger le modèle CLIP et son processor
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# Descriptions (prompts) de classes visuelles possibles
visual_labels = [
# 🟥 Hateful (13 labels bien distincts)
"symbol of racial hatred",
"violence against minorities",
"racist or extremist graffiti",
"homophobic hate gesture",
"nazi salute",
"white supremacist protest",
"burning of religious symbols",
"aggressive crowd with hate signs",
"physical attack during a livestream",
"threatening gesture on webcam",
"explicit insult written on a sign",
"harassment of LGBTQ individuals",
"extremist propaganda banner",
"showing the middle finger",
# 🟧 Visually intense but not necessarily hateful (8)
"chaotic concert crowd",
"people shouting in a protest",
"police intervention in a public place",
"fire on a stage during performance",
"public fight in the street",
"crowd mosh pit",
"person yelling in a video call",
"people arguing loudly",
# 🟩 Safe / Non-hateful (19)
"group of friends taking selfies",
"people dancing together",
"family celebration",
"peaceful protest",
"friendly street interview",
"musician playing at a concert",
"teenagers laughing on a call",
"people hugging",
"family dinner at home",
"children playing outside",
"teacher explaining to students",
"Snapchat selfie with filter",
"artistic mural in the street",
"volunteers helping each other",
"public event with diverse people",
"sports activity with teammates",
"respectful online conversation",
"people cheering at a show",
"cultural dance performance"
]
def detect_visual_hate_clip(image_path):
image = Image.open(image_path).convert("RGB")
# Préparer les entrées pour CLIP
inputs = clip_processor(text=visual_labels, images=image, return_tensors="pt", padding=True)
# Obtenir les similarités image ↔ texte
with torch.no_grad():
outputs = clip_model(**inputs)
logits_per_image = outputs.logits_per_image
probs = logits_per_image.softmax(dim=1).squeeze()
results = {label: float(probs[i]) for i, label in enumerate(visual_labels)}
hateful_labels = visual_labels[:14]
safe_labels = visual_labels[14:]
hate_scores = [results[label] for label in hateful_labels]
safe_scores = [results[label] for label in safe_labels]
# Moyenne pondérée (plus stable que max)
avg_hate = sum(hate_scores) / len(hate_scores)
avg_safe = sum(safe_scores) / len(safe_scores)
# Meilleur score absolu (pour justifier le label final)
top_label = max(results, key=results.get)
top_score = results[top_label]
# Analyse : marge de différence
delta = abs(avg_hate - avg_safe)
# Définir le label selon logique avancée
if delta < 0.05 and top_score < 0.3:
final_label = "Uncertain"
elif avg_hate * 0.85 > avg_safe :
final_label = "Hate"
else:
final_label = "Safe"
return {
"label": final_label,
"confidence_gap": round(delta, 4),
"top_label": top_label,
"top_score": round(top_score, 4),
"avg_hate_score": round(avg_hate, 4),
"avg_safe_score": round(avg_safe, 4),
"all_scores": results
}
def detect_hate_speech_CLIP(
video_path: str,
sampling_time_froid: float,
sampling_time_chaud: float,
time_to_recover: float,
merge_final_snippet_time: float,
detect_visual_hate_clip=None,
skip_intervals=None
):
if detect_visual_hate_clip is None:
raise ValueError("You must provide a detect_visual_hate_clip function")
if skip_intervals is None:
skip_intervals = []
def is_skipped(time_sec):
for start_str, end_str in skip_intervals:
start = sum(int(x) * 60 ** i for i, x in enumerate(reversed(start_str.split(":"))))
end = sum(int(x) * 60 ** i for i, x in enumerate(reversed(end_str.split(":"))))
if start <= time_sec <= end:
return True
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError("Could not open video file")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
current_time = 0.0
state = "froid"
time_in_chaud = 0.0
hate_timestamps = []
while current_time < duration:
if is_skipped(current_time):
current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid
continue
cap.set(cv2.CAP_PROP_POS_MSEC, current_time * 1000)
ret, frame = cap.read()
if not ret:
break
temp_image_path = "/tmp/temp_frame.jpg"
cv2.imwrite(temp_image_path, frame)
result = detect_visual_hate_clip(temp_image_path)
os.remove(temp_image_path)
if result.get("label") == "Hate":
hate_timestamps.append(current_time)
state = "chaud"
time_in_chaud = 0.0
elif state == "chaud":
time_in_chaud += sampling_time_chaud
if time_in_chaud >= time_to_recover:
state = "froid"
current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid
cap.release()
# Étendre et fusionner les intervalles
intervals = [(max(0, t - merge_final_snippet_time), min(duration, t + merge_final_snippet_time)) for t in hate_timestamps]
merged_intervals = []
for start, end in sorted(intervals):
if not merged_intervals or start > merged_intervals[-1][1]:
merged_intervals.append([start, end])
else:
merged_intervals[-1][1] = max(merged_intervals[-1][1], end)
formatted_intervals = [[seconds_to_hhmmss(start), seconds_to_hhmmss(end)] for start, end in merged_intervals]
return formatted_intervals
# THIRD FILTER : Hate Speech Detection in text from image
def seconds_to_hhmmss(seconds):
return str(timedelta(seconds=int(seconds)))
reader = easyocr.Reader(['en']) # detects the language of the text
nlp_classifier = pipeline("text-classification", model="Hate-speech-CNERG/dehatebert-mono-english")
def detect_hate_speech_in_image(image_path):
# 🖼️ OCR
text_blocks = reader.readtext(image_path, detail=0)
full_text = " ".join(text_blocks).strip()
if not full_text:
return {
"text": None,
"hate_detected": False,
"score": 0.0,
"reason": "No text detected"
}
# 🧠 NLP (classification hate speech)
prediction = nlp_classifier(full_text)[0]
return {
"text": full_text,
"hate_detected": prediction['label'].lower() == 'hate',
"score": float(prediction['score']),
"reason": prediction['label']
}
def detect_hate_speech_OCR(
video_path: str,
sampling_time_froid: float,
sampling_time_chaud: float,
time_to_recover: float,
merge_final_snippet_time: float,
detect_hate_speech_in_image=None,
skip_intervals=None # nouvelle option : intervalles à ignorer
):
if detect_hate_speech_in_image is None:
raise ValueError("You must provide a detect_hate_speech_in_image function")
if skip_intervals is None:
skip_intervals = []
def seconds_to_hhmmss(seconds):
from datetime import timedelta
return str(timedelta(seconds=int(seconds)))
def is_skipped(time_sec):
for start_str, end_str in skip_intervals:
start = sum(int(x) * 60 ** i for i, x in enumerate(reversed(start_str.split(":"))))
end = sum(int(x) * 60 ** i for i, x in enumerate(reversed(end_str.split(":"))))
if start <= time_sec <= end:
return True
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError("Could not open video file")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
current_time = 0.0
state = "froid"
time_in_chaud = 0.0
hate_timestamps = []
while current_time < duration:
if is_skipped(current_time):
current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid
continue
cap.set(cv2.CAP_PROP_POS_MSEC, current_time * 1000)
ret, frame = cap.read()
if not ret:
break
temp_image_path = "/tmp/temp_frame.jpg"
cv2.imwrite(temp_image_path, frame)
result = detect_hate_speech_in_image(temp_image_path)
os.remove(temp_image_path)
if result.get("hate_detected", False):
hate_timestamps.append(current_time)
state = "chaud"
time_in_chaud = 0.0
elif state == "chaud":
time_in_chaud += sampling_time_chaud
if time_in_chaud >= time_to_recover:
state = "froid"
current_time += sampling_time_chaud if state == "chaud" else sampling_time_froid
cap.release()
# Étendre et fusionner les intervalles
intervals = [(max(0, t - merge_final_snippet_time), min(duration, t + merge_final_snippet_time)) for t in hate_timestamps]
merged_intervals = []
for start, end in sorted(intervals):
if not merged_intervals or start > merged_intervals[-1][1]:
merged_intervals.append([start, end])
else:
merged_intervals[-1][1] = max(merged_intervals[-1][1], end)
formatted_intervals = [[seconds_to_hhmmss(start), seconds_to_hhmmss(end)] for start, end in merged_intervals]
return formatted_intervals
# FINAL FUNCTION
def merge_all_snippet_groups(list_of_snippet_lists):
all_segments = []
# Aplatir et convertir en secondes
for snippet_list in list_of_snippet_lists:
for start, end in snippet_list:
all_segments.append([time_to_seconds(start), time_to_seconds(end)])
# Trier et fusionner
all_segments.sort()
merged = []
for seg in all_segments:
if not merged or seg[0] > merged[-1][1]:
merged.append(seg)
else:
merged[-1][1] = max(merged[-1][1], seg[1])
# Reformat en HH:MM:SS
return [[format_time(start), format_time(end)] for start, end in merged]
def merge_and_expand_timestamps(timestamps, expand_seconds=1, max_gap=1):
"""
- Élargit chaque timestamp de 'expand_seconds' secondes de chaque côté.
- Puis fusionne les timestamps qui se touchent (gap <= max_gap).
timestamps : liste de [start, end] au format 'HH:MM:SS'
expand_seconds : nombre de secondes à ajouter avant et après chaque intervalle
max_gap : gap maximum pour merger
"""
if not timestamps:
return []
# Convertir string -> datetime
def str_to_time(s):
return datetime.strptime(s, "%H:%M:%S")
# Convertir datetime -> string
def time_to_str(t):
return t.strftime("%H:%M:%S")
# Étendre chaque intervalle
expanded = []
for start_str, end_str in timestamps:
start = str_to_time(start_str) - timedelta(seconds=expand_seconds)
end = str_to_time(end_str) + timedelta(seconds=expand_seconds)
start = max(start, datetime.strptime("00:00:00", "%H:%M:%S")) # éviter temps négatif
expanded.append([start, end])
# Maintenant fusionner
merged = []
current_start, current_end = expanded[0]
for start, end in expanded[1:]:
if (start - current_end).total_seconds() <= max_gap:
current_end = max(current_end, end)
else:
merged.append([time_to_str(current_start), time_to_str(current_end)])
current_start, current_end = start, end
merged.append([time_to_str(current_start), time_to_str(current_end)])
return merged
def better_normalized_duration(video_duration):
"""
Vidéo courte → précision maximale.
Vidéo longue → analyse relâchée.
Normalisation douce selon la durée.
"""
# On travaille directement en minutes
duration_min = video_duration / 60
# Normalisation progressive :
# 0 min → 0
# 5 min → 0.2
# 10 min → 0.4
# 20 min → 0.7
# 30 min ou + → 1
if duration_min <= 5:
return duration_min / 25 # max 0.2 pour 5 min
elif duration_min <= 10:
return 0.2 + (duration_min - 5) / 25 # ajoute jusqu’à 0.4
elif duration_min <= 20:
return 0.4 + (duration_min - 10) / 25 # ajoute jusqu’à 0.8
else:
return min(1, 0.8 + (duration_min - 20) / 20) # plafonne à 1 après 30 min
def adjust_parameters(base_params, video_duration, min_factor=0.6, max_factor=1.6):
"""
Ajuste les paramètres :
- petite vidéo → plus précis (params plus petits)
- grande vidéo → moins précis (params plus grands)
base_params = [sampling_froid, sampling_chaud, time_to_recover, merge_time]
"""
# Normaliser la durée entre 0 et 1 (ex: 0 min → 0, 30 min → ~0.5, 60 min → 1)
normalized_duration = better_normalized_duration(video_duration)
# Calcul du facteur d'ajustement entre min_factor et max_factor
# Petite vidéo → facteur proche de min_factor
# Grande vidéo → facteur proche de max_factor
factor = min_factor + (max_factor - min_factor) * normalized_duration
sampling_froid = max(1, int(base_params[0] * factor))
sampling_chaud = max(1, int(base_params[1] * (0.5 * factor + 0.5)))
time_to_recover = int(base_params[2] * (0.5 * factor + 0.5))
merge_final = base_params[3]
return [sampling_froid, sampling_chaud, time_to_recover, merge_final]
def detectHateSpeechSmartFilter(Video_path, Co2_release = "low"):
tracker = EmissionsTracker(log_level="error" , allow_multiple_runs=True)
tracker.start()
video_duration = get_video_duration(Video_path)
if video_duration is None:
raise Exception("Impossible de lire la vidéo.")
if Co2_release == "low":
CRC = [4, 2, 5, 4]
Clip = [11, 3, 10, 3]
elif Co2_release == "medium":
CRC = [3, 2, 10, 3]
Clip = [9, 4, 10, 2]
elif Co2_release == "high":
CRC = [2, 1, 20, 1]
Clip = [7,1, 10, 2]
CRC = adjust_parameters(CRC, video_duration, min_factor=0.6, max_factor=3)
Clip = adjust_parameters(Clip, video_duration, min_factor=0.4, max_factor=1.2)
# Name of the video
Name = os.path.splitext(os.path.basename(Video_path))[0]
# Extraction de l'audio
extract_audio(Video_path, Name +".wav")
# Prétraitement de l'audio
speech_ranges = preprocess_audio(Name +".wav", Name +"clean.wav")
# first filter : hate speech detection in audio
os.remove(Name +".wav")
hate_speech_time_audio = detect_hate_speech_in_audio(Name +"clean.wav", include_intervals = speech_ranges , Co2_release = Co2_release)
os.remove(Name +"clean.wav")
print("✅ Filter 1 : Hate speech detection in audio done !" , hate_speech_time_audio)
# second filter : hate speech detection CLIP (obscene gesture, symbol ... etc)
hate_speech_time_CLIP = detect_hate_speech_CLIP(
video_path=Video_path,
sampling_time_froid= Clip[0],
sampling_time_chaud= Clip[1],
time_to_recover= Clip[2],
merge_final_snippet_time= Clip[3],
detect_visual_hate_clip= detect_visual_hate_clip,
skip_intervals=hate_speech_time_audio
)
print("✅ Filter 2 : Hate speech detection using text embedding done !" , hate_speech_time_CLIP)
# third filter : hate speech detection in text from image
hate_speech_time_image_text = detect_hate_speech_OCR(
video_path=Video_path,
sampling_time_froid= CRC[0],
sampling_time_chaud= CRC[1],
time_to_recover= CRC[2],
merge_final_snippet_time= CRC[3],
detect_hate_speech_in_image=detect_hate_speech_in_image,
skip_intervals= merge_all_snippet_groups([hate_speech_time_CLIP, hate_speech_time_audio])
)
print("✅ Filter 3 : Hate speech detection using text from image done !", hate_speech_time_image_text)
hate_speech_time = merge_all_snippet_groups([hate_speech_time_audio, hate_speech_time_CLIP, hate_speech_time_image_text])
print("✅ All filters done !" , hate_speech_time , "Hate speech detected !" , "C02 emissions : " , tracker.stop())
C02_emissions = tracker.stop()
return merge_and_expand_timestamps(hate_speech_time), C02_emissions
def Detect_hate_speech_emo_hate_bert(audio_path, Co2_release="low"):
tracker = EmissionsTracker(log_level="error", allow_multiple_runs=True)
tracker.start()
# Nom de la vidéo/audio
Name = os.path.splitext(os.path.basename(audio_path))[0]
# Conversion de l'audio en audio.wav
audio, sr = librosa.load(audio_path, sr=16000)
sf.write(Name + ".wav", audio, sr)
# Prétraitement de l'audio
speech_ranges = preprocess_audio(Name + ".wav", Name + "clean.wav")
# Détection de discours haineux dans l'audio
hate_speech_time_audio = detect_hate_speech_in_audio(
Name + "clean.wav",
include_intervals=speech_ranges,
Co2_release=Co2_release
)
os.remove(Name + "clean.wav")
# Arrêt du tracker et récupération des émissions CO₂
CO2_emissions = tracker.stop()
print("Hate speech detection in audio done :", hate_speech_time_audio,
"Hate speech detected ! / CO₂ emissions :", CO2_emissions)
return merge_and_expand_timestamps(hate_speech_time_audio), CO2_emissions