wozwize's picture
updating logging
1360e33
raw
history blame contribute delete
18.4 kB
import logging
from typing import Dict, Any, List, Optional
from textblob import TextBlob
from transformers import pipeline
import numpy as np
logger = logging.getLogger(__name__)
class SentimentAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize sentiment analyzer with both traditional and LLM-based approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
self.toxicity_available = False
# Traditional manipulation patterns
self.manipulative_patterns = [
"experts say",
"sources claim",
"many believe",
"some say",
"everyone knows",
"clearly",
"obviously",
"without doubt",
"certainly"
]
if use_ai:
try:
if model_registry and model_registry.is_available:
# Use shared models
self.sentiment_pipeline = model_registry.sentiment
self.zero_shot = model_registry.zero_shot
self.toxicity_pipeline = getattr(model_registry, 'toxicity', None)
self.toxicity_available = self.toxicity_pipeline is not None
self.llm_available = True
logger.info("Using shared model pipelines for sentiment analysis")
if self.toxicity_available:
logger.info("Toxicity analysis enabled")
else:
logger.info("Toxicity analysis not available")
else:
# Initialize own pipelines
self.sentiment_pipeline = pipeline(
"text-classification",
model="SamLowe/roberta-base-go_emotions",
device=-1,
batch_size=16
)
self.zero_shot = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
try:
self.toxicity_pipeline = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=-1,
batch_size=16
)
self.toxicity_available = True
logger.info("Toxicity analysis enabled")
except Exception as tox_error:
logger.warning(f"Toxicity pipeline initialization failed: {str(tox_error)}")
self.toxicity_available = False
self.llm_available = True
logger.info("Initialized dedicated model pipelines for sentiment analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipelines: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing sentiment analyzer in traditional mode")
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Perform sentiment analysis using LLM models."""
try:
logger.info("\n" + "="*50)
logger.info("SENTIMENT ANALYSIS STARTED")
logger.info("="*50)
# Clean the text of formatting markers
logger.info("Cleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Split text into chunks of 512 tokens (approximate)
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Initialize aggregation variables
sentiment_scores = []
toxicity_scores = []
manipulation_scores = []
flagged_phrases = []
manipulation_categories = [
"emotional manipulation",
"fear mongering",
"propaganda",
"factual reporting",
"balanced perspective"
]
# Process each chunk
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
try:
# Get emotion scores
logger.info("Analyzing emotions...")
emotions = self.sentiment_pipeline(chunk)
logger.debug(f"Raw emotion response: {emotions}")
# Handle different response formats
if isinstance(emotions, list):
for emotion in emotions:
if isinstance(emotion, dict) and 'label' in emotion and 'score' in emotion:
sentiment_scores.append(emotion)
logger.info(f"Detected emotion: {emotion['label']} (score: {emotion['score']:.3f})")
elif isinstance(emotions, dict) and 'label' in emotions and 'score' in emotions:
sentiment_scores.append(emotions)
logger.info(f"Detected emotion: {emotions['label']} (score: {emotions['score']:.3f})")
# Get toxicity scores if available
if self.toxicity_available:
logger.info("Analyzing toxicity...")
try:
toxicity = self.toxicity_pipeline(chunk)
if isinstance(toxicity, list):
toxicity_scores.extend(toxicity)
else:
toxicity_scores.append(toxicity)
logger.info(f"Toxicity analysis complete for chunk {i}")
logger.debug(f"Toxicity scores: {toxicity_scores[-1]}")
except Exception as tox_error:
logger.warning(f"Toxicity analysis failed for chunk {i}: {str(tox_error)}")
# Get manipulation scores
logger.info("Analyzing manipulation patterns...")
manipulation = self.zero_shot(
chunk,
manipulation_categories,
multi_label=True
)
if isinstance(manipulation, dict) and 'labels' in manipulation and 'scores' in manipulation:
chunk_scores = {
label: score
for label, score in zip(manipulation['labels'], manipulation['scores'])
}
manipulation_scores.append(chunk_scores)
logger.info("Manipulation scores for chunk:")
for label, score in chunk_scores.items():
logger.info(f" - {label}: {score:.3f}")
# Analyze sentences for manipulation
logger.info("Analyzing individual sentences for manipulation...")
sentences = chunk.split('.')
for sentence in sentences:
if len(sentence.strip()) > 10:
sent_result = self.zero_shot(
sentence.strip(),
manipulation_categories,
multi_label=False
)
if (sent_result['labels'][0] in ["emotional manipulation", "fear mongering", "propaganda"]
and sent_result['scores'][0] > 0.7):
logger.info(f"Found manipulative content (score: {sent_result['scores'][0]:.3f}): {sentence.strip()}")
flagged_phrases.append({
'text': sentence.strip(),
'type': sent_result['labels'][0],
'score': sent_result['scores'][0]
})
except Exception as chunk_error:
logger.error(f"Error processing chunk {i}: {str(chunk_error)}")
continue
logger.info("\nAggregating final scores...")
# Aggregate scores with error handling
def aggregate_scores(scores_list, score_type: str):
try:
if not scores_list:
logger.warning(f"No {score_type} scores to aggregate")
return {}
all_scores = {}
for scores in scores_list:
if isinstance(scores, dict):
if 'label' in scores and 'score' in scores:
label = scores['label']
score = scores['score']
else:
# Handle direct label-score mapping
for label, score in scores.items():
if label not in all_scores:
all_scores[label] = []
if isinstance(score, (int, float)):
all_scores[label].append(score)
continue
else:
logger.warning(f"Unexpected score format in {score_type}: {scores}")
continue
if isinstance(label, (str, bytes)):
if label not in all_scores:
all_scores[label] = []
if isinstance(score, (int, float)):
all_scores[label].append(score)
return {k: float(np.mean(v)) for k, v in all_scores.items() if v}
except Exception as agg_error:
logger.error(f"Error aggregating {score_type} scores: {str(agg_error)}")
return {}
emotion_scores = aggregate_scores(sentiment_scores, "emotion")
toxicity_scores = aggregate_scores(toxicity_scores, "toxicity") if self.toxicity_available else {}
logger.info("\nFinal emotion scores:")
for emotion, score in emotion_scores.items():
logger.info(f" - {emotion}: {score:.3f}")
if toxicity_scores:
logger.info("\nFinal toxicity scores:")
for category, score in toxicity_scores.items():
logger.info(f" - {category}: {score:.3f}")
# Aggregate manipulation scores
manipulation_agg = {
category: float(np.mean([
scores.get(category, 0)
for scores in manipulation_scores
]))
for category in manipulation_categories
if manipulation_scores
}
logger.info("\nFinal manipulation scores:")
for category, score in manipulation_agg.items():
logger.info(f" - {category}: {score:.3f}")
# Calculate manipulation score based on multiple factors
manipulation_indicators = {
'emotional manipulation': 0.4,
'fear mongering': 0.3,
'propaganda': 0.3
}
if self.toxicity_available:
manipulation_indicators.update({
'toxic': 0.2,
'severe_toxic': 0.3,
'threat': 0.2
})
# Combine toxicity and manipulation scores
combined_scores = {**toxicity_scores, **manipulation_agg}
# Calculate manipulation score with fallback
if combined_scores:
manipulation_score = min(100, sum(
combined_scores.get(k, 0) * weight
for k, weight in manipulation_indicators.items()
) * 100)
else:
# Fallback to traditional analysis if no scores available
manipulation_score = len(self._detect_manipulative_phrases(text)) * 10
logger.info(f"\nFinal manipulation score: {manipulation_score:.1f}")
# Determine overall sentiment
positive_emotions = ['admiration', 'joy', 'amusement', 'approval']
negative_emotions = ['disgust', 'anger', 'disappointment', 'fear']
neutral_emotions = ['neutral', 'confusion', 'realization']
pos_score = sum(emotion_scores.get(emotion, 0) for emotion in positive_emotions)
neg_score = sum(emotion_scores.get(emotion, 0) for emotion in negative_emotions)
neu_score = sum(emotion_scores.get(emotion, 0) for emotion in neutral_emotions)
logger.info(f"\nSentiment component scores:")
logger.info(f" - Positive: {pos_score:.3f}")
logger.info(f" - Negative: {neg_score:.3f}")
logger.info(f" - Neutral: {neu_score:.3f}")
# Determine sentiment based on highest score
max_score = max(pos_score, neg_score, neu_score)
if max_score == pos_score and pos_score > 0.3:
sentiment = "Positive"
elif max_score == neg_score and neg_score > 0.3:
sentiment = "Negative"
else:
sentiment = "Neutral"
logger.info(f"\nFinal sentiment determination: {sentiment}")
# Sort and limit flagged phrases by manipulation score
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True)
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
clean_text = phrase['text'].strip()
if clean_text not in seen:
unique_phrases.append(clean_text)
seen.add(clean_text)
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique manipulative phrases")
logger.info("\nSentiment analysis completed successfully")
return {
"sentiment": sentiment,
"manipulation_score": round(manipulation_score, 1),
"flagged_phrases": unique_phrases,
"detailed_scores": {
"emotions": emotion_scores,
"manipulation": manipulation_agg,
"toxicity": toxicity_scores
}
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}", exc_info=True)
return None
def analyze(self, text: str) -> Dict[str, Any]:
"""
Analyze sentiment using LLM with fallback to traditional methods.
Args:
text: The text to analyze
Returns:
Dict containing sentiment analysis results
"""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional sentiment analysis")
blob = TextBlob(text)
sentiment_score = blob.sentiment.polarity
manipulative_phrases = self._detect_manipulative_phrases(text)
manipulation_score = len(manipulative_phrases) * 10
if sentiment_score > 0.2:
sentiment = "Positive"
elif sentiment_score < -0.2:
sentiment = "Negative"
else:
sentiment = "Neutral"
return {
"sentiment": sentiment,
"manipulation_score": min(manipulation_score, 100),
"flagged_phrases": manipulative_phrases[:5] # Limit to top 5 phrases
}
except Exception as e:
logger.error(f"Error in sentiment analysis: {str(e)}")
return {
"sentiment": "Error",
"manipulation_score": 0,
"flagged_phrases": []
}
def _detect_manipulative_phrases(self, text: str) -> List[str]:
"""Detect potentially manipulative phrases."""
found_phrases = []
text_lower = text.lower()
for pattern in self.manipulative_patterns:
if pattern in text_lower:
start = text_lower.find(pattern)
context = text[max(0, start-20):min(len(text), start+len(pattern)+20)]
found_phrases.append(context.strip())
return found_phrases