Spaces:
Running
Running
File size: 13,216 Bytes
876b12f a2624a3 5c3b4a6 a2624a3 876b12f a2624a3 5c3b4a6 a2624a3 5c3b4a6 a2624a3 5c3b4a6 876b12f 5c3b4a6 a2624a3 5c3b4a6 876b12f 5c3b4a6 a2624a3 876b12f 1360e33 a2624a3 5c3b4a6 1360e33 876b12f a2624a3 1360e33 a2624a3 1360e33 a2624a3 1360e33 55cdb25 a2624a3 5c3b4a6 876b12f 1360e33 a2624a3 1360e33 a2624a3 5c3b4a6 a2624a3 5c3b4a6 1360e33 a2624a3 1360e33 a2624a3 1360e33 a2624a3 1360e33 a2624a3 1360e33 a2624a3 1360e33 a2624a3 5c3b4a6 1360e33 5c3b4a6 1360e33 5c3b4a6 876b12f a2624a3 5c3b4a6 1360e33 876b12f 5c3b4a6 876b12f 5c3b4a6 876b12f 5c3b4a6 876b12f 5c3b4a6 876b12f 5c3b4a6 876b12f 5c3b4a6 876b12f 1360e33 5c3b4a6 1360e33 5c3b4a6 a2624a3 1360e33 876b12f 5c3b4a6 a2624a3 5c3b4a6 876b12f 5c3b4a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
import logging
import os
from typing import Dict, Any, List, Optional
from transformers import pipeline
import numpy as np
import nltk
from nltk.tokenize import sent_tokenize
logger = logging.getLogger(__name__)
class BiasAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize bias analyzer with both LLM and traditional approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
# Load traditional keywords
self.resources_dir = os.path.join(os.path.dirname(__file__), '..', 'resources')
self.left_keywords = self._load_keywords('left_bias_words.txt')
self.right_keywords = self._load_keywords('right_bias_words.txt')
if use_ai:
try:
if model_registry and model_registry.is_available:
self.classifier = model_registry.zero_shot
self.llm_available = True
logger.info("Using shared model pipeline for bias analysis")
else:
# Initialize own pipeline if no shared registry
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
self.llm_available = True
logger.info("Initialized dedicated model pipeline for bias analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing bias analyzer in traditional mode")
def analyze(self, text: str) -> Dict[str, Any]:
"""
Analyze bias using LLM with fallback to traditional method.
Args:
text: The text to analyze
Returns:
Dict containing bias analysis results
"""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional bias analysis")
return self._analyze_traditional(text)
except Exception as e:
logger.error(f"Error in bias analysis: {str(e)}")
return {
"bias": "Error",
"bias_score": 0.0,
"bias_percentage": 0,
"flagged_phrases": []
}
def _load_keywords(self, filename: str) -> List[str]:
"""Load keywords from file."""
try:
filepath = os.path.join(self.resources_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
return [line.strip().lower() for line in f if line.strip() and not line.startswith('#')]
except Exception as e:
logger.error(f"Error loading {filename}: {str(e)}")
return []
def _analyze_traditional(self, text: str) -> Dict[str, Any]:
"""Traditional keyword-based bias analysis."""
text_lower = text.lower()
# Count matches and collect flagged phrases
left_matches = [word for word in self.left_keywords if word in text_lower]
right_matches = [word for word in self.right_keywords if word in text_lower]
left_count = len(left_matches)
right_count = len(right_matches)
total_count = left_count + right_count
if total_count == 0:
return {
"bias": "Neutral",
"bias_score": 0.0,
"bias_percentage": 0,
"flagged_phrases": []
}
# Calculate bias score (-1 to 1)
bias_score = (right_count - left_count) / total_count
# Calculate bias percentage
bias_percentage = abs(bias_score * 100)
# Determine bias label
if bias_score < -0.6:
bias = "Strongly Left"
elif bias_score < -0.3:
bias = "Moderately Left"
elif bias_score < -0.1:
bias = "Leaning Left"
elif bias_score > 0.6:
bias = "Strongly Right"
elif bias_score > 0.3:
bias = "Moderately Right"
elif bias_score > 0.1:
bias = "Leaning Right"
else:
bias = "Neutral"
return {
"bias": bias,
"bias_score": round(bias_score, 2),
"bias_percentage": round(bias_percentage, 1),
"flagged_phrases": list(set(left_matches + right_matches))[:5] # Limit to top 5 unique phrases
}
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Analyze bias using LLM zero-shot classification with batch processing."""
try:
logger.info("\n" + "="*50)
logger.info("BIAS ANALYSIS STARTED")
logger.info("="*50)
# Define bias categories
bias_categories = [
"left-wing bias",
"right-wing bias",
"neutral/balanced perspective"
]
logger.info("Using categories for analysis:")
for cat in bias_categories:
logger.info(f" - {cat}")
# Clean and prepare text
logger.info("\nCleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Split into larger chunks (4000 chars) for fewer API calls
chunks = [cleaned_text[i:i+4000] for i in range(0, len(cleaned_text), 4000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Process chunks in batches
chunk_scores = []
flagged_phrases = []
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
# Analyze chunk as a whole first
logger.info("Analyzing chunk for overall bias...")
chunk_result = self.classifier(
chunk,
bias_categories,
multi_label=True
)
chunk_scores.append({
label: score
for label, score in zip(chunk_result['labels'], chunk_result['scores'])
})
logger.info("Chunk bias scores:")
for label, score in chunk_scores[-1].items():
logger.info(f" - {label}: {score:.3f}")
# Only analyze individual sentences if chunk shows strong bias
max_chunk_score = max(chunk_result['scores'])
if max_chunk_score > 0.6:
logger.info(f"Strong bias detected (score: {max_chunk_score:.3f}), analyzing individual sentences...")
sentences = sent_tokenize(chunk)
logger.info(f"Found {len(sentences)} sentences to analyze")
# Filter sentences for analysis (longer, potentially more meaningful ones)
relevant_sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
logger.info(f"Filtered to {len(relevant_sentences)} relevant sentences")
# Process sentences in batches of 8
for j in range(0, len(relevant_sentences), 8):
batch = relevant_sentences[j:j+8]
try:
batch_results = self.classifier(
batch,
bias_categories,
multi_label=False
)
# Handle single or multiple results
if not isinstance(batch_results, list):
batch_results = [batch_results]
for sentence, result in zip(batch, batch_results):
max_score = max(result['scores'])
if max_score > 0.8 and result['labels'][0] != "neutral/balanced perspective":
logger.info(f"Found biased sentence (score: {max_score:.3f}, type: {result['labels'][0]}):")
logger.info(f" \"{sentence}\"")
flagged_phrases.append({
"text": sentence,
"type": result['labels'][0],
"score": max_score,
"highlight": f"[{result['labels'][0].upper()}] (Score: {round(max_score * 100, 1)}%) \"{sentence}\""
})
except Exception as batch_error:
logger.warning(f"Batch processing error: {str(batch_error)}")
continue
# Aggregate scores across chunks
logger.info("\nAggregating scores across all chunks...")
aggregated_scores = {
category: np.mean([
scores[category]
for scores in chunk_scores
])
for category in bias_categories
}
logger.info("\nFinal aggregated scores:")
for category, score in aggregated_scores.items():
logger.info(f" - {category}: {score:.3f}")
# Calculate bias metrics
left_score = aggregated_scores["left-wing bias"]
right_score = aggregated_scores["right-wing bias"]
neutral_score = aggregated_scores["neutral/balanced perspective"]
# Calculate bias score (-1 to 1)
bias_score = (right_score - left_score) / max(right_score + left_score, 0.0001)
logger.info(f"\nRaw bias score: {bias_score:.3f}")
# Determine bias label
if bias_score < -0.6:
bias = "Strongly Left"
elif bias_score < -0.3:
bias = "Moderately Left"
elif bias_score < -0.1:
bias = "Leaning Left"
elif bias_score > 0.6:
bias = "Strongly Right"
elif bias_score > 0.3:
bias = "Moderately Right"
elif bias_score > 0.1:
bias = "Leaning Right"
else:
bias = "Neutral"
logger.info(f"Determined bias label: {bias}")
# Calculate bias percentage (0-100)
bias_percentage = min(100, abs(bias_score * 100))
logger.info(f"Bias percentage: {bias_percentage:.1f}%")
# Sort and limit flagged phrases
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True)
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
if phrase['text'] not in seen:
unique_phrases.append(phrase)
seen.add(phrase['text'])
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique biased phrases")
logger.info("\nBias analysis completed successfully")
return {
"bias": bias,
"bias_score": round(bias_score, 2),
"bias_percentage": round(bias_percentage, 1),
"flagged_phrases": unique_phrases,
"detailed_scores": {
"left_bias": round(left_score * 100, 1),
"right_bias": round(right_score * 100, 1),
"neutral": round(neutral_score * 100, 1)
}
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}")
return None
|