wozwize's picture
updating logging
1360e33
raw
history blame contribute delete
13.2 kB
import logging
import os
from typing import Dict, Any, List, Optional
from transformers import pipeline
import numpy as np
import nltk
from nltk.tokenize import sent_tokenize
logger = logging.getLogger(__name__)
class BiasAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize bias analyzer with both LLM and traditional approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
# Load traditional keywords
self.resources_dir = os.path.join(os.path.dirname(__file__), '..', 'resources')
self.left_keywords = self._load_keywords('left_bias_words.txt')
self.right_keywords = self._load_keywords('right_bias_words.txt')
if use_ai:
try:
if model_registry and model_registry.is_available:
self.classifier = model_registry.zero_shot
self.llm_available = True
logger.info("Using shared model pipeline for bias analysis")
else:
# Initialize own pipeline if no shared registry
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
self.llm_available = True
logger.info("Initialized dedicated model pipeline for bias analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing bias analyzer in traditional mode")
def analyze(self, text: str) -> Dict[str, Any]:
"""
Analyze bias using LLM with fallback to traditional method.
Args:
text: The text to analyze
Returns:
Dict containing bias analysis results
"""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional bias analysis")
return self._analyze_traditional(text)
except Exception as e:
logger.error(f"Error in bias analysis: {str(e)}")
return {
"bias": "Error",
"bias_score": 0.0,
"bias_percentage": 0,
"flagged_phrases": []
}
def _load_keywords(self, filename: str) -> List[str]:
"""Load keywords from file."""
try:
filepath = os.path.join(self.resources_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
return [line.strip().lower() for line in f if line.strip() and not line.startswith('#')]
except Exception as e:
logger.error(f"Error loading {filename}: {str(e)}")
return []
def _analyze_traditional(self, text: str) -> Dict[str, Any]:
"""Traditional keyword-based bias analysis."""
text_lower = text.lower()
# Count matches and collect flagged phrases
left_matches = [word for word in self.left_keywords if word in text_lower]
right_matches = [word for word in self.right_keywords if word in text_lower]
left_count = len(left_matches)
right_count = len(right_matches)
total_count = left_count + right_count
if total_count == 0:
return {
"bias": "Neutral",
"bias_score": 0.0,
"bias_percentage": 0,
"flagged_phrases": []
}
# Calculate bias score (-1 to 1)
bias_score = (right_count - left_count) / total_count
# Calculate bias percentage
bias_percentage = abs(bias_score * 100)
# Determine bias label
if bias_score < -0.6:
bias = "Strongly Left"
elif bias_score < -0.3:
bias = "Moderately Left"
elif bias_score < -0.1:
bias = "Leaning Left"
elif bias_score > 0.6:
bias = "Strongly Right"
elif bias_score > 0.3:
bias = "Moderately Right"
elif bias_score > 0.1:
bias = "Leaning Right"
else:
bias = "Neutral"
return {
"bias": bias,
"bias_score": round(bias_score, 2),
"bias_percentage": round(bias_percentage, 1),
"flagged_phrases": list(set(left_matches + right_matches))[:5] # Limit to top 5 unique phrases
}
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Analyze bias using LLM zero-shot classification with batch processing."""
try:
logger.info("\n" + "="*50)
logger.info("BIAS ANALYSIS STARTED")
logger.info("="*50)
# Define bias categories
bias_categories = [
"left-wing bias",
"right-wing bias",
"neutral/balanced perspective"
]
logger.info("Using categories for analysis:")
for cat in bias_categories:
logger.info(f" - {cat}")
# Clean and prepare text
logger.info("\nCleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Split into larger chunks (4000 chars) for fewer API calls
chunks = [cleaned_text[i:i+4000] for i in range(0, len(cleaned_text), 4000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Process chunks in batches
chunk_scores = []
flagged_phrases = []
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
# Analyze chunk as a whole first
logger.info("Analyzing chunk for overall bias...")
chunk_result = self.classifier(
chunk,
bias_categories,
multi_label=True
)
chunk_scores.append({
label: score
for label, score in zip(chunk_result['labels'], chunk_result['scores'])
})
logger.info("Chunk bias scores:")
for label, score in chunk_scores[-1].items():
logger.info(f" - {label}: {score:.3f}")
# Only analyze individual sentences if chunk shows strong bias
max_chunk_score = max(chunk_result['scores'])
if max_chunk_score > 0.6:
logger.info(f"Strong bias detected (score: {max_chunk_score:.3f}), analyzing individual sentences...")
sentences = sent_tokenize(chunk)
logger.info(f"Found {len(sentences)} sentences to analyze")
# Filter sentences for analysis (longer, potentially more meaningful ones)
relevant_sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
logger.info(f"Filtered to {len(relevant_sentences)} relevant sentences")
# Process sentences in batches of 8
for j in range(0, len(relevant_sentences), 8):
batch = relevant_sentences[j:j+8]
try:
batch_results = self.classifier(
batch,
bias_categories,
multi_label=False
)
# Handle single or multiple results
if not isinstance(batch_results, list):
batch_results = [batch_results]
for sentence, result in zip(batch, batch_results):
max_score = max(result['scores'])
if max_score > 0.8 and result['labels'][0] != "neutral/balanced perspective":
logger.info(f"Found biased sentence (score: {max_score:.3f}, type: {result['labels'][0]}):")
logger.info(f" \"{sentence}\"")
flagged_phrases.append({
"text": sentence,
"type": result['labels'][0],
"score": max_score,
"highlight": f"[{result['labels'][0].upper()}] (Score: {round(max_score * 100, 1)}%) \"{sentence}\""
})
except Exception as batch_error:
logger.warning(f"Batch processing error: {str(batch_error)}")
continue
# Aggregate scores across chunks
logger.info("\nAggregating scores across all chunks...")
aggregated_scores = {
category: np.mean([
scores[category]
for scores in chunk_scores
])
for category in bias_categories
}
logger.info("\nFinal aggregated scores:")
for category, score in aggregated_scores.items():
logger.info(f" - {category}: {score:.3f}")
# Calculate bias metrics
left_score = aggregated_scores["left-wing bias"]
right_score = aggregated_scores["right-wing bias"]
neutral_score = aggregated_scores["neutral/balanced perspective"]
# Calculate bias score (-1 to 1)
bias_score = (right_score - left_score) / max(right_score + left_score, 0.0001)
logger.info(f"\nRaw bias score: {bias_score:.3f}")
# Determine bias label
if bias_score < -0.6:
bias = "Strongly Left"
elif bias_score < -0.3:
bias = "Moderately Left"
elif bias_score < -0.1:
bias = "Leaning Left"
elif bias_score > 0.6:
bias = "Strongly Right"
elif bias_score > 0.3:
bias = "Moderately Right"
elif bias_score > 0.1:
bias = "Leaning Right"
else:
bias = "Neutral"
logger.info(f"Determined bias label: {bias}")
# Calculate bias percentage (0-100)
bias_percentage = min(100, abs(bias_score * 100))
logger.info(f"Bias percentage: {bias_percentage:.1f}%")
# Sort and limit flagged phrases
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True)
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
if phrase['text'] not in seen:
unique_phrases.append(phrase)
seen.add(phrase['text'])
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique biased phrases")
logger.info("\nBias analysis completed successfully")
return {
"bias": bias,
"bias_score": round(bias_score, 2),
"bias_percentage": round(bias_percentage, 1),
"flagged_phrases": unique_phrases,
"detailed_scores": {
"left_bias": round(left_score * 100, 1),
"right_bias": round(right_score * 100, 1),
"neutral": round(neutral_score * 100, 1)
}
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}")
return None