Spaces:
Running
Running
import logging | |
from typing import Dict, Any, List, Optional | |
from textblob import TextBlob | |
from transformers import pipeline | |
import numpy as np | |
logger = logging.getLogger(__name__) | |
class SentimentAnalyzer: | |
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): | |
""" | |
Initialize sentiment analyzer with both traditional and LLM-based approaches. | |
Args: | |
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) | |
model_registry: Optional shared model registry for better performance | |
""" | |
self.use_ai = use_ai | |
self.llm_available = False | |
self.model_registry = model_registry | |
self.toxicity_available = False | |
# Traditional manipulation patterns | |
self.manipulative_patterns = [ | |
"experts say", | |
"sources claim", | |
"many believe", | |
"some say", | |
"everyone knows", | |
"clearly", | |
"obviously", | |
"without doubt", | |
"certainly" | |
] | |
if use_ai: | |
try: | |
if model_registry and model_registry.is_available: | |
# Use shared models | |
self.sentiment_pipeline = model_registry.sentiment | |
self.zero_shot = model_registry.zero_shot | |
self.toxicity_pipeline = getattr(model_registry, 'toxicity', None) | |
self.toxicity_available = self.toxicity_pipeline is not None | |
self.llm_available = True | |
logger.info("Using shared model pipelines for sentiment analysis") | |
if self.toxicity_available: | |
logger.info("Toxicity analysis enabled") | |
else: | |
logger.info("Toxicity analysis not available") | |
else: | |
# Initialize own pipelines | |
self.sentiment_pipeline = pipeline( | |
"text-classification", | |
model="SamLowe/roberta-base-go_emotions", | |
device=-1, | |
batch_size=16 | |
) | |
self.zero_shot = pipeline( | |
"zero-shot-classification", | |
model="facebook/bart-large-mnli", | |
device=-1, | |
batch_size=8 | |
) | |
try: | |
self.toxicity_pipeline = pipeline( | |
"text-classification", | |
model="unitary/toxic-bert", | |
device=-1, | |
batch_size=16 | |
) | |
self.toxicity_available = True | |
logger.info("Toxicity analysis enabled") | |
except Exception as tox_error: | |
logger.warning(f"Toxicity pipeline initialization failed: {str(tox_error)}") | |
self.toxicity_available = False | |
self.llm_available = True | |
logger.info("Initialized dedicated model pipelines for sentiment analysis") | |
except Exception as e: | |
logger.warning(f"Failed to initialize LLM pipelines: {str(e)}") | |
self.llm_available = False | |
else: | |
logger.info("Initializing sentiment analyzer in traditional mode") | |
def _analyze_with_llm(self, text: str) -> Dict[str, Any]: | |
"""Perform sentiment analysis using LLM models.""" | |
try: | |
logger.info("\n" + "="*50) | |
logger.info("SENTIMENT ANALYSIS STARTED") | |
logger.info("="*50) | |
# Clean the text of formatting markers | |
logger.info("Cleaning and preparing text...") | |
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') | |
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') | |
if not line.startswith('[') and not line.startswith('More on')) | |
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") | |
# Split text into chunks of 512 tokens (approximate) | |
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)] | |
logger.info(f"Split text into {len(chunks)} chunks for processing") | |
# Initialize aggregation variables | |
sentiment_scores = [] | |
toxicity_scores = [] | |
manipulation_scores = [] | |
flagged_phrases = [] | |
manipulation_categories = [ | |
"emotional manipulation", | |
"fear mongering", | |
"propaganda", | |
"factual reporting", | |
"balanced perspective" | |
] | |
# Process each chunk | |
for i, chunk in enumerate(chunks, 1): | |
logger.info(f"\n{'-'*30}") | |
logger.info(f"Processing chunk {i}/{len(chunks)}") | |
logger.info(f"Chunk length: {len(chunk)} characters") | |
try: | |
# Get emotion scores | |
logger.info("Analyzing emotions...") | |
emotions = self.sentiment_pipeline(chunk) | |
logger.debug(f"Raw emotion response: {emotions}") | |
# Handle different response formats | |
if isinstance(emotions, list): | |
for emotion in emotions: | |
if isinstance(emotion, dict) and 'label' in emotion and 'score' in emotion: | |
sentiment_scores.append(emotion) | |
logger.info(f"Detected emotion: {emotion['label']} (score: {emotion['score']:.3f})") | |
elif isinstance(emotions, dict) and 'label' in emotions and 'score' in emotions: | |
sentiment_scores.append(emotions) | |
logger.info(f"Detected emotion: {emotions['label']} (score: {emotions['score']:.3f})") | |
# Get toxicity scores if available | |
if self.toxicity_available: | |
logger.info("Analyzing toxicity...") | |
try: | |
toxicity = self.toxicity_pipeline(chunk) | |
if isinstance(toxicity, list): | |
toxicity_scores.extend(toxicity) | |
else: | |
toxicity_scores.append(toxicity) | |
logger.info(f"Toxicity analysis complete for chunk {i}") | |
logger.debug(f"Toxicity scores: {toxicity_scores[-1]}") | |
except Exception as tox_error: | |
logger.warning(f"Toxicity analysis failed for chunk {i}: {str(tox_error)}") | |
# Get manipulation scores | |
logger.info("Analyzing manipulation patterns...") | |
manipulation = self.zero_shot( | |
chunk, | |
manipulation_categories, | |
multi_label=True | |
) | |
if isinstance(manipulation, dict) and 'labels' in manipulation and 'scores' in manipulation: | |
chunk_scores = { | |
label: score | |
for label, score in zip(manipulation['labels'], manipulation['scores']) | |
} | |
manipulation_scores.append(chunk_scores) | |
logger.info("Manipulation scores for chunk:") | |
for label, score in chunk_scores.items(): | |
logger.info(f" - {label}: {score:.3f}") | |
# Analyze sentences for manipulation | |
logger.info("Analyzing individual sentences for manipulation...") | |
sentences = chunk.split('.') | |
for sentence in sentences: | |
if len(sentence.strip()) > 10: | |
sent_result = self.zero_shot( | |
sentence.strip(), | |
manipulation_categories, | |
multi_label=False | |
) | |
if (sent_result['labels'][0] in ["emotional manipulation", "fear mongering", "propaganda"] | |
and sent_result['scores'][0] > 0.7): | |
logger.info(f"Found manipulative content (score: {sent_result['scores'][0]:.3f}): {sentence.strip()}") | |
flagged_phrases.append({ | |
'text': sentence.strip(), | |
'type': sent_result['labels'][0], | |
'score': sent_result['scores'][0] | |
}) | |
except Exception as chunk_error: | |
logger.error(f"Error processing chunk {i}: {str(chunk_error)}") | |
continue | |
logger.info("\nAggregating final scores...") | |
# Aggregate scores with error handling | |
def aggregate_scores(scores_list, score_type: str): | |
try: | |
if not scores_list: | |
logger.warning(f"No {score_type} scores to aggregate") | |
return {} | |
all_scores = {} | |
for scores in scores_list: | |
if isinstance(scores, dict): | |
if 'label' in scores and 'score' in scores: | |
label = scores['label'] | |
score = scores['score'] | |
else: | |
# Handle direct label-score mapping | |
for label, score in scores.items(): | |
if label not in all_scores: | |
all_scores[label] = [] | |
if isinstance(score, (int, float)): | |
all_scores[label].append(score) | |
continue | |
else: | |
logger.warning(f"Unexpected score format in {score_type}: {scores}") | |
continue | |
if isinstance(label, (str, bytes)): | |
if label not in all_scores: | |
all_scores[label] = [] | |
if isinstance(score, (int, float)): | |
all_scores[label].append(score) | |
return {k: float(np.mean(v)) for k, v in all_scores.items() if v} | |
except Exception as agg_error: | |
logger.error(f"Error aggregating {score_type} scores: {str(agg_error)}") | |
return {} | |
emotion_scores = aggregate_scores(sentiment_scores, "emotion") | |
toxicity_scores = aggregate_scores(toxicity_scores, "toxicity") if self.toxicity_available else {} | |
logger.info("\nFinal emotion scores:") | |
for emotion, score in emotion_scores.items(): | |
logger.info(f" - {emotion}: {score:.3f}") | |
if toxicity_scores: | |
logger.info("\nFinal toxicity scores:") | |
for category, score in toxicity_scores.items(): | |
logger.info(f" - {category}: {score:.3f}") | |
# Aggregate manipulation scores | |
manipulation_agg = { | |
category: float(np.mean([ | |
scores.get(category, 0) | |
for scores in manipulation_scores | |
])) | |
for category in manipulation_categories | |
if manipulation_scores | |
} | |
logger.info("\nFinal manipulation scores:") | |
for category, score in manipulation_agg.items(): | |
logger.info(f" - {category}: {score:.3f}") | |
# Calculate manipulation score based on multiple factors | |
manipulation_indicators = { | |
'emotional manipulation': 0.4, | |
'fear mongering': 0.3, | |
'propaganda': 0.3 | |
} | |
if self.toxicity_available: | |
manipulation_indicators.update({ | |
'toxic': 0.2, | |
'severe_toxic': 0.3, | |
'threat': 0.2 | |
}) | |
# Combine toxicity and manipulation scores | |
combined_scores = {**toxicity_scores, **manipulation_agg} | |
# Calculate manipulation score with fallback | |
if combined_scores: | |
manipulation_score = min(100, sum( | |
combined_scores.get(k, 0) * weight | |
for k, weight in manipulation_indicators.items() | |
) * 100) | |
else: | |
# Fallback to traditional analysis if no scores available | |
manipulation_score = len(self._detect_manipulative_phrases(text)) * 10 | |
logger.info(f"\nFinal manipulation score: {manipulation_score:.1f}") | |
# Determine overall sentiment | |
positive_emotions = ['admiration', 'joy', 'amusement', 'approval'] | |
negative_emotions = ['disgust', 'anger', 'disappointment', 'fear'] | |
neutral_emotions = ['neutral', 'confusion', 'realization'] | |
pos_score = sum(emotion_scores.get(emotion, 0) for emotion in positive_emotions) | |
neg_score = sum(emotion_scores.get(emotion, 0) for emotion in negative_emotions) | |
neu_score = sum(emotion_scores.get(emotion, 0) for emotion in neutral_emotions) | |
logger.info(f"\nSentiment component scores:") | |
logger.info(f" - Positive: {pos_score:.3f}") | |
logger.info(f" - Negative: {neg_score:.3f}") | |
logger.info(f" - Neutral: {neu_score:.3f}") | |
# Determine sentiment based on highest score | |
max_score = max(pos_score, neg_score, neu_score) | |
if max_score == pos_score and pos_score > 0.3: | |
sentiment = "Positive" | |
elif max_score == neg_score and neg_score > 0.3: | |
sentiment = "Negative" | |
else: | |
sentiment = "Neutral" | |
logger.info(f"\nFinal sentiment determination: {sentiment}") | |
# Sort and limit flagged phrases by manipulation score | |
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True) | |
unique_phrases = [] | |
seen = set() | |
for phrase in sorted_phrases: | |
clean_text = phrase['text'].strip() | |
if clean_text not in seen: | |
unique_phrases.append(clean_text) | |
seen.add(clean_text) | |
if len(unique_phrases) >= 5: | |
break | |
logger.info(f"\nFlagged {len(unique_phrases)} unique manipulative phrases") | |
logger.info("\nSentiment analysis completed successfully") | |
return { | |
"sentiment": sentiment, | |
"manipulation_score": round(manipulation_score, 1), | |
"flagged_phrases": unique_phrases, | |
"detailed_scores": { | |
"emotions": emotion_scores, | |
"manipulation": manipulation_agg, | |
"toxicity": toxicity_scores | |
} | |
} | |
except Exception as e: | |
logger.error(f"LLM analysis failed: {str(e)}", exc_info=True) | |
return None | |
def analyze(self, text: str) -> Dict[str, Any]: | |
""" | |
Analyze sentiment using LLM with fallback to traditional methods. | |
Args: | |
text: The text to analyze | |
Returns: | |
Dict containing sentiment analysis results | |
""" | |
try: | |
# Try LLM analysis if enabled and available | |
if self.use_ai and self.llm_available: | |
llm_result = self._analyze_with_llm(text) | |
if llm_result: | |
return llm_result | |
# Use traditional analysis | |
logger.info("Using traditional sentiment analysis") | |
blob = TextBlob(text) | |
sentiment_score = blob.sentiment.polarity | |
manipulative_phrases = self._detect_manipulative_phrases(text) | |
manipulation_score = len(manipulative_phrases) * 10 | |
if sentiment_score > 0.2: | |
sentiment = "Positive" | |
elif sentiment_score < -0.2: | |
sentiment = "Negative" | |
else: | |
sentiment = "Neutral" | |
return { | |
"sentiment": sentiment, | |
"manipulation_score": min(manipulation_score, 100), | |
"flagged_phrases": manipulative_phrases[:5] # Limit to top 5 phrases | |
} | |
except Exception as e: | |
logger.error(f"Error in sentiment analysis: {str(e)}") | |
return { | |
"sentiment": "Error", | |
"manipulation_score": 0, | |
"flagged_phrases": [] | |
} | |
def _detect_manipulative_phrases(self, text: str) -> List[str]: | |
"""Detect potentially manipulative phrases.""" | |
found_phrases = [] | |
text_lower = text.lower() | |
for pattern in self.manipulative_patterns: | |
if pattern in text_lower: | |
start = text_lower.find(pattern) | |
context = text[max(0, start-20):min(len(text), start+len(pattern)+20)] | |
found_phrases.append(context.strip()) | |
return found_phrases |