import logging import os from typing import Dict, Any, List, Optional from transformers import pipeline import numpy as np import nltk from nltk.tokenize import sent_tokenize logger = logging.getLogger(__name__) class BiasAnalyzer: def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): """ Initialize bias analyzer with both LLM and traditional approaches. Args: use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) model_registry: Optional shared model registry for better performance """ self.use_ai = use_ai self.llm_available = False self.model_registry = model_registry # Load traditional keywords self.resources_dir = os.path.join(os.path.dirname(__file__), '..', 'resources') self.left_keywords = self._load_keywords('left_bias_words.txt') self.right_keywords = self._load_keywords('right_bias_words.txt') if use_ai: try: if model_registry and model_registry.is_available: self.classifier = model_registry.zero_shot self.llm_available = True logger.info("Using shared model pipeline for bias analysis") else: # Initialize own pipeline if no shared registry self.classifier = pipeline( "zero-shot-classification", model="facebook/bart-large-mnli", device=-1, batch_size=8 ) self.llm_available = True logger.info("Initialized dedicated model pipeline for bias analysis") except Exception as e: logger.warning(f"Failed to initialize LLM pipeline: {str(e)}") self.llm_available = False else: logger.info("Initializing bias analyzer in traditional mode") def analyze(self, text: str) -> Dict[str, Any]: """ Analyze bias using LLM with fallback to traditional method. Args: text: The text to analyze Returns: Dict containing bias analysis results """ try: # Try LLM analysis if enabled and available if self.use_ai and self.llm_available: llm_result = self._analyze_with_llm(text) if llm_result: return llm_result # Use traditional analysis logger.info("Using traditional bias analysis") return self._analyze_traditional(text) except Exception as e: logger.error(f"Error in bias analysis: {str(e)}") return { "bias": "Error", "bias_score": 0.0, "bias_percentage": 0, "flagged_phrases": [] } def _load_keywords(self, filename: str) -> List[str]: """Load keywords from file.""" try: filepath = os.path.join(self.resources_dir, filename) with open(filepath, 'r', encoding='utf-8') as f: return [line.strip().lower() for line in f if line.strip() and not line.startswith('#')] except Exception as e: logger.error(f"Error loading {filename}: {str(e)}") return [] def _analyze_traditional(self, text: str) -> Dict[str, Any]: """Traditional keyword-based bias analysis.""" text_lower = text.lower() # Count matches and collect flagged phrases left_matches = [word for word in self.left_keywords if word in text_lower] right_matches = [word for word in self.right_keywords if word in text_lower] left_count = len(left_matches) right_count = len(right_matches) total_count = left_count + right_count if total_count == 0: return { "bias": "Neutral", "bias_score": 0.0, "bias_percentage": 0, "flagged_phrases": [] } # Calculate bias score (-1 to 1) bias_score = (right_count - left_count) / total_count # Calculate bias percentage bias_percentage = abs(bias_score * 100) # Determine bias label if bias_score < -0.6: bias = "Strongly Left" elif bias_score < -0.3: bias = "Moderately Left" elif bias_score < -0.1: bias = "Leaning Left" elif bias_score > 0.6: bias = "Strongly Right" elif bias_score > 0.3: bias = "Moderately Right" elif bias_score > 0.1: bias = "Leaning Right" else: bias = "Neutral" return { "bias": bias, "bias_score": round(bias_score, 2), "bias_percentage": round(bias_percentage, 1), "flagged_phrases": list(set(left_matches + right_matches))[:5] # Limit to top 5 unique phrases } def _analyze_with_llm(self, text: str) -> Dict[str, Any]: """Analyze bias using LLM zero-shot classification with batch processing.""" try: logger.info("\n" + "="*50) logger.info("BIAS ANALYSIS STARTED") logger.info("="*50) # Define bias categories bias_categories = [ "left-wing bias", "right-wing bias", "neutral/balanced perspective" ] logger.info("Using categories for analysis:") for cat in bias_categories: logger.info(f" - {cat}") # Clean and prepare text logger.info("\nCleaning and preparing text...") cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') if not line.startswith('[') and not line.startswith('More on')) logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") # Split into larger chunks (4000 chars) for fewer API calls chunks = [cleaned_text[i:i+4000] for i in range(0, len(cleaned_text), 4000)] logger.info(f"Split text into {len(chunks)} chunks for processing") # Process chunks in batches chunk_scores = [] flagged_phrases = [] for i, chunk in enumerate(chunks, 1): logger.info(f"\n{'-'*30}") logger.info(f"Processing chunk {i}/{len(chunks)}") logger.info(f"Chunk length: {len(chunk)} characters") # Analyze chunk as a whole first logger.info("Analyzing chunk for overall bias...") chunk_result = self.classifier( chunk, bias_categories, multi_label=True ) chunk_scores.append({ label: score for label, score in zip(chunk_result['labels'], chunk_result['scores']) }) logger.info("Chunk bias scores:") for label, score in chunk_scores[-1].items(): logger.info(f" - {label}: {score:.3f}") # Only analyze individual sentences if chunk shows strong bias max_chunk_score = max(chunk_result['scores']) if max_chunk_score > 0.6: logger.info(f"Strong bias detected (score: {max_chunk_score:.3f}), analyzing individual sentences...") sentences = sent_tokenize(chunk) logger.info(f"Found {len(sentences)} sentences to analyze") # Filter sentences for analysis (longer, potentially more meaningful ones) relevant_sentences = [s.strip() for s in sentences if len(s.strip()) > 20] logger.info(f"Filtered to {len(relevant_sentences)} relevant sentences") # Process sentences in batches of 8 for j in range(0, len(relevant_sentences), 8): batch = relevant_sentences[j:j+8] try: batch_results = self.classifier( batch, bias_categories, multi_label=False ) # Handle single or multiple results if not isinstance(batch_results, list): batch_results = [batch_results] for sentence, result in zip(batch, batch_results): max_score = max(result['scores']) if max_score > 0.8 and result['labels'][0] != "neutral/balanced perspective": logger.info(f"Found biased sentence (score: {max_score:.3f}, type: {result['labels'][0]}):") logger.info(f" \"{sentence}\"") flagged_phrases.append({ "text": sentence, "type": result['labels'][0], "score": max_score, "highlight": f"[{result['labels'][0].upper()}] (Score: {round(max_score * 100, 1)}%) \"{sentence}\"" }) except Exception as batch_error: logger.warning(f"Batch processing error: {str(batch_error)}") continue # Aggregate scores across chunks logger.info("\nAggregating scores across all chunks...") aggregated_scores = { category: np.mean([ scores[category] for scores in chunk_scores ]) for category in bias_categories } logger.info("\nFinal aggregated scores:") for category, score in aggregated_scores.items(): logger.info(f" - {category}: {score:.3f}") # Calculate bias metrics left_score = aggregated_scores["left-wing bias"] right_score = aggregated_scores["right-wing bias"] neutral_score = aggregated_scores["neutral/balanced perspective"] # Calculate bias score (-1 to 1) bias_score = (right_score - left_score) / max(right_score + left_score, 0.0001) logger.info(f"\nRaw bias score: {bias_score:.3f}") # Determine bias label if bias_score < -0.6: bias = "Strongly Left" elif bias_score < -0.3: bias = "Moderately Left" elif bias_score < -0.1: bias = "Leaning Left" elif bias_score > 0.6: bias = "Strongly Right" elif bias_score > 0.3: bias = "Moderately Right" elif bias_score > 0.1: bias = "Leaning Right" else: bias = "Neutral" logger.info(f"Determined bias label: {bias}") # Calculate bias percentage (0-100) bias_percentage = min(100, abs(bias_score * 100)) logger.info(f"Bias percentage: {bias_percentage:.1f}%") # Sort and limit flagged phrases sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True) unique_phrases = [] seen = set() for phrase in sorted_phrases: if phrase['text'] not in seen: unique_phrases.append(phrase) seen.add(phrase['text']) if len(unique_phrases) >= 5: break logger.info(f"\nFlagged {len(unique_phrases)} unique biased phrases") logger.info("\nBias analysis completed successfully") return { "bias": bias, "bias_score": round(bias_score, 2), "bias_percentage": round(bias_percentage, 1), "flagged_phrases": unique_phrases, "detailed_scores": { "left_bias": round(left_score * 100, 1), "right_bias": round(right_score * 100, 1), "neutral": round(neutral_score * 100, 1) } } except Exception as e: logger.error(f"LLM analysis failed: {str(e)}") return None