Spaces:
Running
Running
File size: 11,963 Bytes
5c3b4a6 876b12f a2624a3 876b12f 5c3b4a6 a2624a3 876b12f 5c3b4a6 a2624a3 5c3b4a6 876b12f 1360e33 5c3b4a6 1360e33 876b12f 1360e33 876b12f 1360e33 876b12f 1360e33 876b12f 1360e33 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f a2624a3 876b12f 5c3b4a6 876b12f 55cdb25 a2624a3 55cdb25 a2624a3 55cdb25 a2624a3 55cdb25 a2624a3 55cdb25 876b12f 5c3b4a6 876b12f 55cdb25 876b12f 55cdb25 876b12f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
from typing import Dict, Any, Literal
import logging
from transformers import pipeline
import torch
import numpy as np
from .headline_analyzer import HeadlineAnalyzer
from .sentiment_analyzer import SentimentAnalyzer
from .bias_analyzer import BiasAnalyzer
from .evidence_analyzer import EvidenceAnalyzer
logger = logging.getLogger(__name__)
# Define analysis mode type
AnalysisMode = Literal['ai', 'traditional']
class ModelRegistry:
"""Singleton class to manage shared model pipelines."""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super(ModelRegistry, cls).__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
try:
# Use GPU if available
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
# Initialize shared models with larger batch sizes
self.zero_shot = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=self.device,
batch_size=8
)
self.sentiment = pipeline(
"text-classification",
model="SamLowe/roberta-base-go_emotions",
device=self.device,
batch_size=16
)
self.nli = pipeline(
"text-classification",
model="roberta-large-mnli",
device=self.device,
batch_size=16
)
# Add toxicity pipeline
self.toxicity = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=self.device,
batch_size=16
)
logger.info("Successfully initialized shared model pipelines")
self._initialized = True
except Exception as e:
logger.error(f"Failed to initialize shared models: {str(e)}")
self._initialized = False
@property
def is_available(self):
return self._initialized
class MediaScorer:
def __init__(self, use_ai: bool = True):
"""
Initialize the MediaScorer with required analyzers.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
"""
self.use_ai = use_ai
self.analysis_mode: AnalysisMode = 'ai' if use_ai else 'traditional'
logger.info(f"Initializing MediaScorer with {self.analysis_mode} analysis")
# Initialize shared model registry if using AI
if use_ai:
self.model_registry = ModelRegistry()
if not self.model_registry.is_available:
logger.warning("Shared models not available, falling back to traditional analysis")
self.use_ai = False
self.analysis_mode = 'traditional'
# Initialize analyzers with analysis mode preference and shared models
self.headline_analyzer = HeadlineAnalyzer(
use_ai=self.use_ai,
model_registry=self.model_registry if self.use_ai else None
)
self.sentiment_analyzer = SentimentAnalyzer(
use_ai=self.use_ai,
model_registry=self.model_registry if self.use_ai else None
)
self.bias_analyzer = BiasAnalyzer(
use_ai=self.use_ai,
model_registry=self.model_registry if self.use_ai else None
)
self.evidence_analyzer = EvidenceAnalyzer(
use_ai=self.use_ai,
model_registry=self.model_registry if self.use_ai else None
)
logger.info(f"All analyzers initialized in {self.analysis_mode} mode")
def calculate_media_score(self, headline: str, content: str) -> Dict[str, Any]:
"""Calculate final media credibility score."""
try:
logger.info("\n" + "="*50)
logger.info("MEDIA SCORE CALCULATION STARTED")
logger.info("="*50)
logger.info(f"Analysis Mode: {self.analysis_mode}")
# Headline Analysis
logger.info("\n" + "-"*30)
logger.info("HEADLINE ANALYSIS")
logger.info("-"*30)
headline_analysis = self.headline_analyzer.analyze(headline, content)
logger.info(f"Headline Score: {headline_analysis.get('headline_vs_content_score', 0)}")
logger.info(f"Flagged Phrases: {headline_analysis.get('flagged_phrases', [])}")
# Sentiment Analysis
logger.info("\n" + "-"*30)
logger.info("SENTIMENT ANALYSIS")
logger.info("-"*30)
sentiment_analysis = self.sentiment_analyzer.analyze(content)
logger.info(f"Sentiment: {sentiment_analysis.get('sentiment', 'Unknown')}")
logger.info(f"Manipulation Score: {sentiment_analysis.get('manipulation_score', 0)}")
logger.info(f"Flagged Phrases: {sentiment_analysis.get('flagged_phrases', [])}")
# Bias Analysis
logger.info("\n" + "-"*30)
logger.info("BIAS ANALYSIS")
logger.info("-"*30)
bias_analysis = self.bias_analyzer.analyze(content)
logger.info(f"""Bias Results:
Label: {bias_analysis.get('bias', 'Unknown')}
Score: {bias_analysis.get('bias_score', 0)}
Percentage: {bias_analysis.get('bias_percentage', 0)}%
Flagged Phrases: {bias_analysis.get('flagged_phrases', [])}
""")
# Evidence Analysis
logger.info("\n" + "-"*30)
logger.info("EVIDENCE ANALYSIS")
logger.info("-"*30)
evidence_analysis = self.evidence_analyzer.analyze(content)
logger.info(f"Evidence Score: {evidence_analysis.get('evidence_based_score', 0)}")
logger.info(f"Flagged Phrases: {evidence_analysis.get('flagged_phrases', [])}")
# Calculate component scores with NaN handling
# For headline: 20% contradiction = 20% score (don't invert)
headline_score = headline_analysis.get("headline_vs_content_score", 0)
if isinstance(headline_score, (int, float)) and not np.isnan(headline_score):
headline_score = headline_score / 100
else:
headline_score = 0.5 # Default to neutral if score is invalid
logger.warning("Invalid headline score, using default value of 0.5")
# For manipulation: 0% = good (use directly), 100% = bad
manipulation_score = sentiment_analysis.get("manipulation_score", 0)
if isinstance(manipulation_score, (int, float)) and not np.isnan(manipulation_score):
manipulation_score = (100 - manipulation_score) / 100
else:
manipulation_score = 0.5
logger.warning("Invalid manipulation score, using default value of 0.5")
# For bias: 0% = good (use directly), 100% = bad
bias_percentage = bias_analysis.get("bias_percentage", 0)
if isinstance(bias_percentage, (int, float)) and not np.isnan(bias_percentage):
bias_score = (100 - bias_percentage) / 100
else:
bias_score = 0.5
logger.warning("Invalid bias score, using default value of 0.5")
# For evidence: higher is better
evidence_score = evidence_analysis.get("evidence_based_score", 0)
if isinstance(evidence_score, (int, float)) and not np.isnan(evidence_score):
evidence_score = evidence_score / 100
else:
evidence_score = 0.5
logger.warning("Invalid evidence score, using default value of 0.5")
logger.info(f"""Component Scores:
Headline: {headline_score * 100:.1f}% (from {headline_analysis.get("headline_vs_content_score", 0)})
Evidence: {evidence_score * 100:.1f}%
Manipulation: {manipulation_score * 100:.1f}% (100 - {sentiment_analysis.get("manipulation_score", 0)}%)
Bias: {bias_score * 100:.1f}% (100 - {bias_analysis.get("bias_percentage", 0)}%)
""")
# Calculate final score
final_score = float((
(headline_score * 0.25) +
(manipulation_score * 0.25) +
(bias_score * 0.25) +
(evidence_score * 0.25)
) * 100)
# Ensure final score is valid
if np.isnan(final_score) or not np.isfinite(final_score):
final_score = 50.0 # Default to neutral
logger.warning("Invalid final score calculated, using default value of 50.0")
# Determine rating
if final_score >= 80:
rating = "Trustworthy"
elif final_score >= 50:
rating = "Bias Present"
else:
rating = "Misleading"
result = {
"media_unmasked_score": round(float(final_score), 1),
"rating": rating,
"analysis_mode": self.analysis_mode,
"details": {
"headline_analysis": {
"headline_vs_content_score": float(headline_analysis.get("headline_vs_content_score", 0)),
"flagged_phrases": headline_analysis.get("flagged_phrases", [])
},
"sentiment_analysis": {
"sentiment": str(sentiment_analysis.get("sentiment", "Neutral")),
"manipulation_score": float(sentiment_analysis.get("manipulation_score", 0)),
"flagged_phrases": sentiment_analysis.get("flagged_phrases", [])
},
"bias_analysis": {
"bias": str(bias_analysis.get("bias", "Neutral")),
"bias_score": float(bias_analysis.get("bias_score", 0)),
"bias_percentage": float(bias_analysis.get("bias_percentage", 0)),
"flagged_phrases": bias_analysis.get("flagged_phrases", [])
},
"evidence_analysis": {
"evidence_based_score": float(evidence_analysis.get("evidence_based_score", 0)),
"flagged_phrases": evidence_analysis.get("flagged_phrases", [])
}
}
}
logger.info("\n=== Final Score Result ===")
logger.info(f"Result: {result}")
return result
except Exception as e:
logger.error(f"Error calculating media score: {str(e)}")
return {
"media_unmasked_score": 0,
"rating": "Error",
"analysis_mode": self.analysis_mode,
"details": {
"headline_analysis": {"headline_vs_content_score": 0, "flagged_phrases": []},
"sentiment_analysis": {"sentiment": "Error", "manipulation_score": 0, "flagged_phrases": []},
"bias_analysis": {"bias": "Error", "bias_score": 0.0, "bias_percentage": 0, "flagged_phrases": []},
"evidence_analysis": {"evidence_based_score": 0, "flagged_phrases": []}
}
} |