Spaces:
Running
Running
import logging | |
from typing import Dict, Any, List, Optional | |
from transformers import pipeline | |
import numpy as np | |
import nltk | |
from nltk.tokenize import sent_tokenize | |
logger = logging.getLogger(__name__) | |
class EvidenceAnalyzer: | |
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): | |
""" | |
Initialize evidence analyzer with LLM and traditional approaches. | |
Args: | |
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) | |
model_registry: Optional shared model registry for better performance | |
""" | |
self.use_ai = use_ai | |
self.llm_available = False | |
self.model_registry = model_registry | |
if use_ai: | |
try: | |
if model_registry and model_registry.is_available: | |
# Use shared models | |
self.classifier = model_registry.zero_shot | |
self.llm_available = True | |
logger.info("Using shared model pipeline for evidence analysis") | |
else: | |
# Initialize own pipeline | |
self.classifier = pipeline( | |
"zero-shot-classification", | |
model="facebook/bart-large-mnli", | |
device=-1, | |
batch_size=8 | |
) | |
self.llm_available = True | |
logger.info("Initialized dedicated model pipeline for evidence analysis") | |
except Exception as e: | |
logger.warning(f"Failed to initialize LLM pipeline: {str(e)}") | |
self.llm_available = False | |
else: | |
logger.info("Initializing evidence analyzer in traditional mode") | |
# Traditional markers for fallback | |
self.citation_markers = [ | |
"according to", | |
"said", | |
"reported", | |
"stated", | |
"shows", | |
"found", | |
"study", | |
"research", | |
"data", | |
"evidence" | |
] | |
self.vague_markers = [ | |
"some say", | |
"many believe", | |
"people think", | |
"experts claim", | |
"sources say", | |
"it is believed", | |
"reportedly", | |
"allegedly" | |
] | |
def _analyze_with_llm(self, text: str) -> Dict[str, Any]: | |
"""Analyze evidence using LLM.""" | |
try: | |
logger.info("\n" + "="*50) | |
logger.info("EVIDENCE ANALYSIS STARTED") | |
logger.info("="*50) | |
# Clean the text of formatting markers | |
logger.info("Cleaning and preparing text...") | |
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') | |
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') | |
if not line.startswith('[') and not line.startswith('More on')) | |
logger.info(f"Text prepared - Length: {len(cleaned_text)} characters") | |
# Download NLTK data if needed | |
try: | |
nltk.data.find('tokenizers/punkt') | |
except LookupError: | |
logger.info("Downloading required NLTK data...") | |
nltk.download('punkt') | |
# Split text into chunks | |
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)] | |
logger.info(f"Split text into {len(chunks)} chunks for processing") | |
# Categories for evidence classification | |
evidence_categories = [ | |
"factual statement with source", | |
"verifiable claim", | |
"expert opinion", | |
"data-backed claim", | |
"unsubstantiated claim", | |
"opinion statement" | |
] | |
logger.info("\nUsing evidence categories:") | |
for cat in evidence_categories: | |
logger.info(f" - {cat}") | |
chunk_scores = [] | |
flagged_phrases = [] | |
for i, chunk in enumerate(chunks, 1): | |
logger.info(f"\n{'-'*30}") | |
logger.info(f"Processing chunk {i}/{len(chunks)}") | |
logger.info(f"Chunk length: {len(chunk)} characters") | |
# Analyze each sentence in the chunk | |
sentences = sent_tokenize(chunk) | |
logger.info(f"Found {len(sentences)} sentences to analyze") | |
sentence_count = 0 | |
strong_evidence_count = 0 | |
for sentence in sentences: | |
if len(sentence.strip()) > 10: | |
sentence_count += 1 | |
# Classify the type of evidence | |
result = self.classifier( | |
sentence.strip(), | |
evidence_categories, | |
multi_label=True | |
) | |
# Calculate evidence score for the sentence | |
evidence_scores = { | |
label: score | |
for label, score in zip(result['labels'], result['scores']) | |
} | |
# Strong evidence indicators | |
strong_evidence = sum([ | |
evidence_scores.get("factual statement with source", 0), | |
evidence_scores.get("data-backed claim", 0), | |
evidence_scores.get("expert opinion", 0) | |
]) / 3 # Average the strong evidence scores | |
# Weak or no evidence indicators | |
weak_evidence = sum([ | |
evidence_scores.get("unsubstantiated claim", 0), | |
evidence_scores.get("opinion statement", 0) | |
]) / 2 # Average the weak evidence scores | |
# Store scores for overall calculation | |
chunk_scores.append({ | |
'strong_evidence': strong_evidence, | |
'weak_evidence': weak_evidence | |
}) | |
# Flag high-quality evidence | |
if strong_evidence > 0.7 and not any( | |
marker in sentence.lower() | |
for marker in ['more on this story', 'click here', 'read more'] | |
): | |
strong_evidence_count += 1 | |
logger.info(f"Found strong evidence (score: {strong_evidence:.3f}):") | |
logger.info(f" \"{sentence.strip()}\"") | |
flagged_phrases.append({ | |
'text': sentence.strip(), | |
'type': 'strong_evidence', | |
'score': strong_evidence | |
}) | |
logger.info(f"Processed {sentence_count} sentences in chunk {i}") | |
logger.info(f"Found {strong_evidence_count} sentences with strong evidence") | |
# Calculate overall evidence score | |
logger.info("\nCalculating final evidence scores...") | |
if chunk_scores: | |
avg_strong = np.mean([s['strong_evidence'] for s in chunk_scores]) | |
avg_weak = np.mean([s['weak_evidence'] for s in chunk_scores]) | |
logger.info("Average evidence scores:") | |
logger.info(f" - Strong evidence: {avg_strong:.3f}") | |
logger.info(f" - Weak evidence: {avg_weak:.3f}") | |
# Evidence score formula: | |
# - Reward strong evidence (70% weight) | |
# - Penalize weak/unsubstantiated claims (30% weight) | |
# - Ensure score is between 0 and 100 | |
evidence_score = min(100, ( | |
(avg_strong * 0.7) + | |
((1 - avg_weak) * 0.3) | |
) * 100) | |
else: | |
evidence_score = 0 | |
logger.warning("No scores available, defaulting to 0") | |
logger.info(f"Final evidence score: {evidence_score:.1f}") | |
# Sort and select top evidence phrases | |
sorted_phrases = sorted( | |
flagged_phrases, | |
key=lambda x: x['score'], | |
reverse=True | |
) | |
# Filter out formatting text and duplicates | |
unique_phrases = [] | |
seen = set() | |
for phrase in sorted_phrases: | |
clean_text = phrase['text'].strip() | |
if clean_text not in seen and not any( | |
marker in clean_text.lower() | |
for marker in ['more on this story', 'click here', 'read more'] | |
): | |
unique_phrases.append(clean_text) | |
seen.add(clean_text) | |
if len(unique_phrases) >= 5: | |
break | |
logger.info(f"\nFlagged {len(unique_phrases)} unique evidence-based phrases") | |
logger.info("\nEvidence analysis completed successfully") | |
return { | |
"evidence_based_score": round(evidence_score, 1), | |
"flagged_phrases": unique_phrases | |
} | |
except Exception as e: | |
logger.error(f"LLM analysis failed: {str(e)}") | |
return None | |
def _analyze_traditional(self, text: str) -> Dict[str, Any]: | |
"""Traditional evidence analysis as fallback.""" | |
try: | |
text_lower = text.lower() | |
# Find citations and evidence | |
evidence_phrases = [] | |
for marker in self.citation_markers: | |
index = text_lower.find(marker) | |
while index != -1: | |
# Get the sentence containing the marker | |
start = max(0, text_lower.rfind('.', 0, index) + 1) | |
end = text_lower.find('.', index) | |
if end == -1: | |
end = len(text_lower) | |
evidence_phrases.append(text[start:end].strip()) | |
index = text_lower.find(marker, end) | |
# Count vague references | |
vague_count = sum(1 for marker in self.vague_markers if marker in text_lower) | |
# Calculate score | |
citation_count = len(evidence_phrases) | |
base_score = min(citation_count * 20, 100) | |
penalty = vague_count * 10 | |
evidence_score = max(0, base_score - penalty) | |
return { | |
"evidence_based_score": evidence_score, | |
"flagged_phrases": list(set(evidence_phrases))[:5] # Limit to top 5 unique phrases | |
} | |
except Exception as e: | |
logger.error(f"Traditional analysis failed: {str(e)}") | |
return { | |
"evidence_based_score": 0, | |
"flagged_phrases": [] | |
} | |
def analyze(self, text: str) -> Dict[str, Any]: | |
"""Analyze evidence using LLM with fallback to traditional method.""" | |
try: | |
# Try LLM analysis if enabled and available | |
if self.use_ai and self.llm_available: | |
llm_result = self._analyze_with_llm(text) | |
if llm_result: | |
return llm_result | |
# Use traditional analysis | |
logger.info("Using traditional evidence analysis") | |
return self._analyze_traditional(text) | |
except Exception as e: | |
logger.error(f"Error in evidence analysis: {str(e)}") | |
return { | |
"evidence_based_score": 0, | |
"flagged_phrases": [] | |
} |