Spaces:
Running
Running
from typing import Dict, Any, Literal | |
import logging | |
from transformers import pipeline | |
import torch | |
import numpy as np | |
from .headline_analyzer import HeadlineAnalyzer | |
from .sentiment_analyzer import SentimentAnalyzer | |
from .bias_analyzer import BiasAnalyzer | |
from .evidence_analyzer import EvidenceAnalyzer | |
logger = logging.getLogger(__name__) | |
# Define analysis mode type | |
AnalysisMode = Literal['ai', 'traditional'] | |
class ModelRegistry: | |
"""Singleton class to manage shared model pipelines.""" | |
_instance = None | |
_initialized = False | |
def __new__(cls): | |
if cls._instance is None: | |
cls._instance = super(ModelRegistry, cls).__new__(cls) | |
return cls._instance | |
def __init__(self): | |
if not self._initialized: | |
try: | |
# Use GPU if available | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
logger.info(f"Using device: {self.device}") | |
# Initialize shared models with larger batch sizes | |
self.zero_shot = pipeline( | |
"zero-shot-classification", | |
model="facebook/bart-large-mnli", | |
device=self.device, | |
batch_size=8 | |
) | |
self.sentiment = pipeline( | |
"text-classification", | |
model="SamLowe/roberta-base-go_emotions", | |
device=self.device, | |
batch_size=16 | |
) | |
self.nli = pipeline( | |
"text-classification", | |
model="roberta-large-mnli", | |
device=self.device, | |
batch_size=16 | |
) | |
# Add toxicity pipeline | |
self.toxicity = pipeline( | |
"text-classification", | |
model="unitary/toxic-bert", | |
device=self.device, | |
batch_size=16 | |
) | |
logger.info("Successfully initialized shared model pipelines") | |
self._initialized = True | |
except Exception as e: | |
logger.error(f"Failed to initialize shared models: {str(e)}") | |
self._initialized = False | |
def is_available(self): | |
return self._initialized | |
class MediaScorer: | |
def __init__(self, use_ai: bool = True): | |
""" | |
Initialize the MediaScorer with required analyzers. | |
Args: | |
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) | |
""" | |
self.use_ai = use_ai | |
self.analysis_mode: AnalysisMode = 'ai' if use_ai else 'traditional' | |
logger.info(f"Initializing MediaScorer with {self.analysis_mode} analysis") | |
# Initialize shared model registry if using AI | |
if use_ai: | |
self.model_registry = ModelRegistry() | |
if not self.model_registry.is_available: | |
logger.warning("Shared models not available, falling back to traditional analysis") | |
self.use_ai = False | |
self.analysis_mode = 'traditional' | |
# Initialize analyzers with analysis mode preference and shared models | |
self.headline_analyzer = HeadlineAnalyzer( | |
use_ai=self.use_ai, | |
model_registry=self.model_registry if self.use_ai else None | |
) | |
self.sentiment_analyzer = SentimentAnalyzer( | |
use_ai=self.use_ai, | |
model_registry=self.model_registry if self.use_ai else None | |
) | |
self.bias_analyzer = BiasAnalyzer( | |
use_ai=self.use_ai, | |
model_registry=self.model_registry if self.use_ai else None | |
) | |
self.evidence_analyzer = EvidenceAnalyzer( | |
use_ai=self.use_ai, | |
model_registry=self.model_registry if self.use_ai else None | |
) | |
logger.info(f"All analyzers initialized in {self.analysis_mode} mode") | |
def calculate_media_score(self, headline: str, content: str) -> Dict[str, Any]: | |
"""Calculate final media credibility score.""" | |
try: | |
logger.info("\n" + "="*50) | |
logger.info("MEDIA SCORE CALCULATION STARTED") | |
logger.info("="*50) | |
logger.info(f"Analysis Mode: {self.analysis_mode}") | |
# Headline Analysis | |
logger.info("\n" + "-"*30) | |
logger.info("HEADLINE ANALYSIS") | |
logger.info("-"*30) | |
headline_analysis = self.headline_analyzer.analyze(headline, content) | |
logger.info(f"Headline Score: {headline_analysis.get('headline_vs_content_score', 0)}") | |
logger.info(f"Flagged Phrases: {headline_analysis.get('flagged_phrases', [])}") | |
# Sentiment Analysis | |
logger.info("\n" + "-"*30) | |
logger.info("SENTIMENT ANALYSIS") | |
logger.info("-"*30) | |
sentiment_analysis = self.sentiment_analyzer.analyze(content) | |
logger.info(f"Sentiment: {sentiment_analysis.get('sentiment', 'Unknown')}") | |
logger.info(f"Manipulation Score: {sentiment_analysis.get('manipulation_score', 0)}") | |
logger.info(f"Flagged Phrases: {sentiment_analysis.get('flagged_phrases', [])}") | |
# Bias Analysis | |
logger.info("\n" + "-"*30) | |
logger.info("BIAS ANALYSIS") | |
logger.info("-"*30) | |
bias_analysis = self.bias_analyzer.analyze(content) | |
logger.info(f"""Bias Results: | |
Label: {bias_analysis.get('bias', 'Unknown')} | |
Score: {bias_analysis.get('bias_score', 0)} | |
Percentage: {bias_analysis.get('bias_percentage', 0)}% | |
Flagged Phrases: {bias_analysis.get('flagged_phrases', [])} | |
""") | |
# Evidence Analysis | |
logger.info("\n" + "-"*30) | |
logger.info("EVIDENCE ANALYSIS") | |
logger.info("-"*30) | |
evidence_analysis = self.evidence_analyzer.analyze(content) | |
logger.info(f"Evidence Score: {evidence_analysis.get('evidence_based_score', 0)}") | |
logger.info(f"Flagged Phrases: {evidence_analysis.get('flagged_phrases', [])}") | |
# Calculate component scores with NaN handling | |
# For headline: 20% contradiction = 20% score (don't invert) | |
headline_score = headline_analysis.get("headline_vs_content_score", 0) | |
if isinstance(headline_score, (int, float)) and not np.isnan(headline_score): | |
headline_score = headline_score / 100 | |
else: | |
headline_score = 0.5 # Default to neutral if score is invalid | |
logger.warning("Invalid headline score, using default value of 0.5") | |
# For manipulation: 0% = good (use directly), 100% = bad | |
manipulation_score = sentiment_analysis.get("manipulation_score", 0) | |
if isinstance(manipulation_score, (int, float)) and not np.isnan(manipulation_score): | |
manipulation_score = (100 - manipulation_score) / 100 | |
else: | |
manipulation_score = 0.5 | |
logger.warning("Invalid manipulation score, using default value of 0.5") | |
# For bias: 0% = good (use directly), 100% = bad | |
bias_percentage = bias_analysis.get("bias_percentage", 0) | |
if isinstance(bias_percentage, (int, float)) and not np.isnan(bias_percentage): | |
bias_score = (100 - bias_percentage) / 100 | |
else: | |
bias_score = 0.5 | |
logger.warning("Invalid bias score, using default value of 0.5") | |
# For evidence: higher is better | |
evidence_score = evidence_analysis.get("evidence_based_score", 0) | |
if isinstance(evidence_score, (int, float)) and not np.isnan(evidence_score): | |
evidence_score = evidence_score / 100 | |
else: | |
evidence_score = 0.5 | |
logger.warning("Invalid evidence score, using default value of 0.5") | |
logger.info(f"""Component Scores: | |
Headline: {headline_score * 100:.1f}% (from {headline_analysis.get("headline_vs_content_score", 0)}) | |
Evidence: {evidence_score * 100:.1f}% | |
Manipulation: {manipulation_score * 100:.1f}% (100 - {sentiment_analysis.get("manipulation_score", 0)}%) | |
Bias: {bias_score * 100:.1f}% (100 - {bias_analysis.get("bias_percentage", 0)}%) | |
""") | |
# Calculate final score | |
final_score = float(( | |
(headline_score * 0.25) + | |
(manipulation_score * 0.25) + | |
(bias_score * 0.25) + | |
(evidence_score * 0.25) | |
) * 100) | |
# Ensure final score is valid | |
if np.isnan(final_score) or not np.isfinite(final_score): | |
final_score = 50.0 # Default to neutral | |
logger.warning("Invalid final score calculated, using default value of 50.0") | |
# Determine rating | |
if final_score >= 80: | |
rating = "Trustworthy" | |
elif final_score >= 50: | |
rating = "Bias Present" | |
else: | |
rating = "Misleading" | |
result = { | |
"media_unmasked_score": round(float(final_score), 1), | |
"rating": rating, | |
"analysis_mode": self.analysis_mode, | |
"details": { | |
"headline_analysis": { | |
"headline_vs_content_score": float(headline_analysis.get("headline_vs_content_score", 0)), | |
"flagged_phrases": headline_analysis.get("flagged_phrases", []) | |
}, | |
"sentiment_analysis": { | |
"sentiment": str(sentiment_analysis.get("sentiment", "Neutral")), | |
"manipulation_score": float(sentiment_analysis.get("manipulation_score", 0)), | |
"flagged_phrases": sentiment_analysis.get("flagged_phrases", []) | |
}, | |
"bias_analysis": { | |
"bias": str(bias_analysis.get("bias", "Neutral")), | |
"bias_score": float(bias_analysis.get("bias_score", 0)), | |
"bias_percentage": float(bias_analysis.get("bias_percentage", 0)), | |
"flagged_phrases": bias_analysis.get("flagged_phrases", []) | |
}, | |
"evidence_analysis": { | |
"evidence_based_score": float(evidence_analysis.get("evidence_based_score", 0)), | |
"flagged_phrases": evidence_analysis.get("flagged_phrases", []) | |
} | |
} | |
} | |
logger.info("\n=== Final Score Result ===") | |
logger.info(f"Result: {result}") | |
return result | |
except Exception as e: | |
logger.error(f"Error calculating media score: {str(e)}") | |
return { | |
"media_unmasked_score": 0, | |
"rating": "Error", | |
"analysis_mode": self.analysis_mode, | |
"details": { | |
"headline_analysis": {"headline_vs_content_score": 0, "flagged_phrases": []}, | |
"sentiment_analysis": {"sentiment": "Error", "manipulation_score": 0, "flagged_phrases": []}, | |
"bias_analysis": {"bias": "Error", "bias_score": 0.0, "bias_percentage": 0, "flagged_phrases": []}, | |
"evidence_analysis": {"evidence_based_score": 0, "flagged_phrases": []} | |
} | |
} |