wozwize's picture
updating logging
1360e33
raw
history blame contribute delete
12.3 kB
import logging
from typing import Dict, Any, List, Optional
from transformers import pipeline
import numpy as np
import nltk
from nltk.tokenize import sent_tokenize
logger = logging.getLogger(__name__)
class EvidenceAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize evidence analyzer with LLM and traditional approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
if use_ai:
try:
if model_registry and model_registry.is_available:
# Use shared models
self.classifier = model_registry.zero_shot
self.llm_available = True
logger.info("Using shared model pipeline for evidence analysis")
else:
# Initialize own pipeline
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
self.llm_available = True
logger.info("Initialized dedicated model pipeline for evidence analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing evidence analyzer in traditional mode")
# Traditional markers for fallback
self.citation_markers = [
"according to",
"said",
"reported",
"stated",
"shows",
"found",
"study",
"research",
"data",
"evidence"
]
self.vague_markers = [
"some say",
"many believe",
"people think",
"experts claim",
"sources say",
"it is believed",
"reportedly",
"allegedly"
]
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Analyze evidence using LLM."""
try:
logger.info("\n" + "="*50)
logger.info("EVIDENCE ANALYSIS STARTED")
logger.info("="*50)
# Clean the text of formatting markers
logger.info("Cleaning and preparing text...")
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters")
# Download NLTK data if needed
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
logger.info("Downloading required NLTK data...")
nltk.download('punkt')
# Split text into chunks
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)]
logger.info(f"Split text into {len(chunks)} chunks for processing")
# Categories for evidence classification
evidence_categories = [
"factual statement with source",
"verifiable claim",
"expert opinion",
"data-backed claim",
"unsubstantiated claim",
"opinion statement"
]
logger.info("\nUsing evidence categories:")
for cat in evidence_categories:
logger.info(f" - {cat}")
chunk_scores = []
flagged_phrases = []
for i, chunk in enumerate(chunks, 1):
logger.info(f"\n{'-'*30}")
logger.info(f"Processing chunk {i}/{len(chunks)}")
logger.info(f"Chunk length: {len(chunk)} characters")
# Analyze each sentence in the chunk
sentences = sent_tokenize(chunk)
logger.info(f"Found {len(sentences)} sentences to analyze")
sentence_count = 0
strong_evidence_count = 0
for sentence in sentences:
if len(sentence.strip()) > 10:
sentence_count += 1
# Classify the type of evidence
result = self.classifier(
sentence.strip(),
evidence_categories,
multi_label=True
)
# Calculate evidence score for the sentence
evidence_scores = {
label: score
for label, score in zip(result['labels'], result['scores'])
}
# Strong evidence indicators
strong_evidence = sum([
evidence_scores.get("factual statement with source", 0),
evidence_scores.get("data-backed claim", 0),
evidence_scores.get("expert opinion", 0)
]) / 3 # Average the strong evidence scores
# Weak or no evidence indicators
weak_evidence = sum([
evidence_scores.get("unsubstantiated claim", 0),
evidence_scores.get("opinion statement", 0)
]) / 2 # Average the weak evidence scores
# Store scores for overall calculation
chunk_scores.append({
'strong_evidence': strong_evidence,
'weak_evidence': weak_evidence
})
# Flag high-quality evidence
if strong_evidence > 0.7 and not any(
marker in sentence.lower()
for marker in ['more on this story', 'click here', 'read more']
):
strong_evidence_count += 1
logger.info(f"Found strong evidence (score: {strong_evidence:.3f}):")
logger.info(f" \"{sentence.strip()}\"")
flagged_phrases.append({
'text': sentence.strip(),
'type': 'strong_evidence',
'score': strong_evidence
})
logger.info(f"Processed {sentence_count} sentences in chunk {i}")
logger.info(f"Found {strong_evidence_count} sentences with strong evidence")
# Calculate overall evidence score
logger.info("\nCalculating final evidence scores...")
if chunk_scores:
avg_strong = np.mean([s['strong_evidence'] for s in chunk_scores])
avg_weak = np.mean([s['weak_evidence'] for s in chunk_scores])
logger.info("Average evidence scores:")
logger.info(f" - Strong evidence: {avg_strong:.3f}")
logger.info(f" - Weak evidence: {avg_weak:.3f}")
# Evidence score formula:
# - Reward strong evidence (70% weight)
# - Penalize weak/unsubstantiated claims (30% weight)
# - Ensure score is between 0 and 100
evidence_score = min(100, (
(avg_strong * 0.7) +
((1 - avg_weak) * 0.3)
) * 100)
else:
evidence_score = 0
logger.warning("No scores available, defaulting to 0")
logger.info(f"Final evidence score: {evidence_score:.1f}")
# Sort and select top evidence phrases
sorted_phrases = sorted(
flagged_phrases,
key=lambda x: x['score'],
reverse=True
)
# Filter out formatting text and duplicates
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
clean_text = phrase['text'].strip()
if clean_text not in seen and not any(
marker in clean_text.lower()
for marker in ['more on this story', 'click here', 'read more']
):
unique_phrases.append(clean_text)
seen.add(clean_text)
if len(unique_phrases) >= 5:
break
logger.info(f"\nFlagged {len(unique_phrases)} unique evidence-based phrases")
logger.info("\nEvidence analysis completed successfully")
return {
"evidence_based_score": round(evidence_score, 1),
"flagged_phrases": unique_phrases
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}")
return None
def _analyze_traditional(self, text: str) -> Dict[str, Any]:
"""Traditional evidence analysis as fallback."""
try:
text_lower = text.lower()
# Find citations and evidence
evidence_phrases = []
for marker in self.citation_markers:
index = text_lower.find(marker)
while index != -1:
# Get the sentence containing the marker
start = max(0, text_lower.rfind('.', 0, index) + 1)
end = text_lower.find('.', index)
if end == -1:
end = len(text_lower)
evidence_phrases.append(text[start:end].strip())
index = text_lower.find(marker, end)
# Count vague references
vague_count = sum(1 for marker in self.vague_markers if marker in text_lower)
# Calculate score
citation_count = len(evidence_phrases)
base_score = min(citation_count * 20, 100)
penalty = vague_count * 10
evidence_score = max(0, base_score - penalty)
return {
"evidence_based_score": evidence_score,
"flagged_phrases": list(set(evidence_phrases))[:5] # Limit to top 5 unique phrases
}
except Exception as e:
logger.error(f"Traditional analysis failed: {str(e)}")
return {
"evidence_based_score": 0,
"flagged_phrases": []
}
def analyze(self, text: str) -> Dict[str, Any]:
"""Analyze evidence using LLM with fallback to traditional method."""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional evidence analysis")
return self._analyze_traditional(text)
except Exception as e:
logger.error(f"Error in evidence analysis: {str(e)}")
return {
"evidence_based_score": 0,
"flagged_phrases": []
}