Spaces:
Running
Running
import logging | |
import os | |
from typing import Dict, Any, List, Optional | |
from transformers import pipeline | |
import numpy as np | |
import nltk | |
from nltk.tokenize import sent_tokenize | |
logger = logging.getLogger(__name__) | |
class BiasAnalyzer: | |
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): | |
""" | |
Initialize bias analyzer with both LLM and traditional approaches. | |
Args: | |
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) | |
model_registry: Optional shared model registry for better performance | |
""" | |
self.use_ai = use_ai | |
self.llm_available = False | |
self.model_registry = model_registry | |
# Load traditional keywords | |
self.resources_dir = os.path.join(os.path.dirname(__file__), '..', 'resources') | |
self.left_keywords = self._load_keywords('left_bias_words.txt') | |
self.right_keywords = self._load_keywords('right_bias_words.txt') | |
if use_ai: | |
try: | |
if model_registry and model_registry.is_available: | |
self.classifier = model_registry.zero_shot | |
self.llm_available = True | |
logger.info("Using shared model pipeline for bias analysis") | |
else: | |
# Initialize own pipeline if no shared registry | |
self.classifier = pipeline( | |
"zero-shot-classification", | |
model="facebook/bart-large-mnli", | |
device=-1, | |
batch_size=8 | |
) | |
self.llm_available = True | |
logger.info("Initialized dedicated model pipeline for bias analysis") | |
except Exception as e: | |
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}") | |
self.llm_available = False | |
else: | |
logger.info("Initializing bias analyzer in traditional mode") | |
def analyze(self, text: str) -> Dict[str, Any]: | |
""" | |
Analyze bias using LLM with fallback to traditional method. | |
Args: | |
text: The text to analyze | |
Returns: | |
Dict containing bias analysis results | |
""" | |
try: | |
# Try LLM analysis if enabled and available | |
if self.use_ai and self.llm_available: | |
llm_result = self._analyze_with_llm(text) | |
if llm_result: | |
return llm_result | |
# Use traditional analysis | |
logger.info("Using traditional bias analysis") | |
return self._analyze_traditional(text) | |
except Exception as e: | |
logger.error(f"Error in bias analysis: {str(e)}") | |
return { | |
"bias": "Error", | |
"bias_score": 0.0, | |
"bias_percentage": 0, | |
"flagged_phrases": [] | |
} | |
def _load_keywords(self, filename: str) -> List[str]: | |
"""Load keywords from file.""" | |
try: | |
filepath = os.path.join(self.resources_dir, filename) | |
with open(filepath, 'r', encoding='utf-8') as f: | |
return [line.strip().lower() for line in f if line.strip() and not line.startswith('#')] | |
except Exception as e: | |
logger.error(f"Error loading {filename}: {str(e)}") | |
return [] | |
def _analyze_traditional(self, text: str) -> Dict[str, Any]: | |
"""Traditional keyword-based bias analysis.""" | |
text_lower = text.lower() | |
# Count matches and collect flagged phrases | |
left_matches = [word for word in self.left_keywords if word in text_lower] | |
right_matches = [word for word in self.right_keywords if word in text_lower] | |
left_count = len(left_matches) | |
right_count = len(right_matches) | |
total_count = left_count + right_count | |
if total_count == 0: | |
return { | |
"bias": "Neutral", | |
"bias_score": 0.0, | |
"bias_percentage": 0, | |
"flagged_phrases": [] | |
} | |
# Calculate bias score (-1 to 1) | |
bias_score = (right_count - left_count) / total_count | |
# Calculate bias percentage | |
bias_percentage = abs(bias_score * 100) | |
# Determine bias label | |
if bias_score < -0.6: | |
bias = "Strongly Left" | |
elif bias_score < -0.3: | |
bias = "Moderately Left" | |
elif bias_score < -0.1: | |
bias = "Leaning Left" | |
elif bias_score > 0.6: | |
bias = "Strongly Right" | |
elif bias_score > 0.3: | |
bias = "Moderately Right" | |
elif bias_score > 0.1: | |
bias = "Leaning Right" | |
else: | |
bias = "Neutral" | |
return { | |
"bias": bias, | |
"bias_score": round(bias_score, 2), | |
"bias_percentage": round(bias_percentage, 1), | |
"flagged_phrases": list(set(left_matches + right_matches))[:5] # Limit to top 5 unique phrases | |
} | |
def _analyze_with_llm(self, text: str) -> Dict[str, Any]: | |
"""Analyze bias using LLM zero-shot classification with batch processing.""" | |
try: | |
logger.info("\n" + "="*50) | |
logger.info("BIAS ANALYSIS STARTED") | |
logger.info("="*50) | |
# Define bias categories | |
bias_categories = [ | |
"left-wing bias", | |
"right-wing bias", | |
"neutral/balanced perspective" | |
] | |
logger.info("Using categories for analysis:") | |
for cat in bias_categories: | |
logger.info(f" - {cat}") | |
# Clean and prepare text | |
logger.info("\nCleaning and preparing text...") | |
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') | |
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') | |
if not line.startswith('[') and not line.startswith('More on')) | |
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") | |
# Split into larger chunks (4000 chars) for fewer API calls | |
chunks = [cleaned_text[i:i+4000] for i in range(0, len(cleaned_text), 4000)] | |
logger.info(f"Split text into {len(chunks)} chunks for processing") | |
# Process chunks in batches | |
chunk_scores = [] | |
flagged_phrases = [] | |
for i, chunk in enumerate(chunks, 1): | |
logger.info(f"\n{'-'*30}") | |
logger.info(f"Processing chunk {i}/{len(chunks)}") | |
logger.info(f"Chunk length: {len(chunk)} characters") | |
# Analyze chunk as a whole first | |
logger.info("Analyzing chunk for overall bias...") | |
chunk_result = self.classifier( | |
chunk, | |
bias_categories, | |
multi_label=True | |
) | |
chunk_scores.append({ | |
label: score | |
for label, score in zip(chunk_result['labels'], chunk_result['scores']) | |
}) | |
logger.info("Chunk bias scores:") | |
for label, score in chunk_scores[-1].items(): | |
logger.info(f" - {label}: {score:.3f}") | |
# Only analyze individual sentences if chunk shows strong bias | |
max_chunk_score = max(chunk_result['scores']) | |
if max_chunk_score > 0.6: | |
logger.info(f"Strong bias detected (score: {max_chunk_score:.3f}), analyzing individual sentences...") | |
sentences = sent_tokenize(chunk) | |
logger.info(f"Found {len(sentences)} sentences to analyze") | |
# Filter sentences for analysis (longer, potentially more meaningful ones) | |
relevant_sentences = [s.strip() for s in sentences if len(s.strip()) > 20] | |
logger.info(f"Filtered to {len(relevant_sentences)} relevant sentences") | |
# Process sentences in batches of 8 | |
for j in range(0, len(relevant_sentences), 8): | |
batch = relevant_sentences[j:j+8] | |
try: | |
batch_results = self.classifier( | |
batch, | |
bias_categories, | |
multi_label=False | |
) | |
# Handle single or multiple results | |
if not isinstance(batch_results, list): | |
batch_results = [batch_results] | |
for sentence, result in zip(batch, batch_results): | |
max_score = max(result['scores']) | |
if max_score > 0.8 and result['labels'][0] != "neutral/balanced perspective": | |
logger.info(f"Found biased sentence (score: {max_score:.3f}, type: {result['labels'][0]}):") | |
logger.info(f" \"{sentence}\"") | |
flagged_phrases.append({ | |
"text": sentence, | |
"type": result['labels'][0], | |
"score": max_score, | |
"highlight": f"[{result['labels'][0].upper()}] (Score: {round(max_score * 100, 1)}%) \"{sentence}\"" | |
}) | |
except Exception as batch_error: | |
logger.warning(f"Batch processing error: {str(batch_error)}") | |
continue | |
# Aggregate scores across chunks | |
logger.info("\nAggregating scores across all chunks...") | |
aggregated_scores = { | |
category: np.mean([ | |
scores[category] | |
for scores in chunk_scores | |
]) | |
for category in bias_categories | |
} | |
logger.info("\nFinal aggregated scores:") | |
for category, score in aggregated_scores.items(): | |
logger.info(f" - {category}: {score:.3f}") | |
# Calculate bias metrics | |
left_score = aggregated_scores["left-wing bias"] | |
right_score = aggregated_scores["right-wing bias"] | |
neutral_score = aggregated_scores["neutral/balanced perspective"] | |
# Calculate bias score (-1 to 1) | |
bias_score = (right_score - left_score) / max(right_score + left_score, 0.0001) | |
logger.info(f"\nRaw bias score: {bias_score:.3f}") | |
# Determine bias label | |
if bias_score < -0.6: | |
bias = "Strongly Left" | |
elif bias_score < -0.3: | |
bias = "Moderately Left" | |
elif bias_score < -0.1: | |
bias = "Leaning Left" | |
elif bias_score > 0.6: | |
bias = "Strongly Right" | |
elif bias_score > 0.3: | |
bias = "Moderately Right" | |
elif bias_score > 0.1: | |
bias = "Leaning Right" | |
else: | |
bias = "Neutral" | |
logger.info(f"Determined bias label: {bias}") | |
# Calculate bias percentage (0-100) | |
bias_percentage = min(100, abs(bias_score * 100)) | |
logger.info(f"Bias percentage: {bias_percentage:.1f}%") | |
# Sort and limit flagged phrases | |
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True) | |
unique_phrases = [] | |
seen = set() | |
for phrase in sorted_phrases: | |
if phrase['text'] not in seen: | |
unique_phrases.append(phrase) | |
seen.add(phrase['text']) | |
if len(unique_phrases) >= 5: | |
break | |
logger.info(f"\nFlagged {len(unique_phrases)} unique biased phrases") | |
logger.info("\nBias analysis completed successfully") | |
return { | |
"bias": bias, | |
"bias_score": round(bias_score, 2), | |
"bias_percentage": round(bias_percentage, 1), | |
"flagged_phrases": unique_phrases, | |
"detailed_scores": { | |
"left_bias": round(left_score * 100, 1), | |
"right_bias": round(right_score * 100, 1), | |
"neutral": round(neutral_score * 100, 1) | |
} | |
} | |
except Exception as e: | |
logger.error(f"LLM analysis failed: {str(e)}") | |
return None | |