Spaces:
Running
Running
import logging | |
import re | |
from utils.models import get_llm_model, get_nlp_model | |
from utils.performance import PerformanceTracker | |
logger = logging.getLogger("misinformation_detector") | |
performance_tracker = PerformanceTracker() | |
def classify_with_llm(query, evidence): | |
""" | |
Classification function that evaluates evidence against a claim | |
to determine support, contradiction, or insufficient evidence. | |
This function analyzes the provided evidence to evaluate if it supports, | |
contradicts, or is insufficient to verify the claim. It implements: | |
- Strict output formatting requirements | |
- Evidence source validation | |
- Confidence scoring based on confirmation strength | |
- Flexible regex pattern matching | |
- Detailed debug logging | |
- Fallback parsing for non-standard responses | |
Args: | |
query (str): The factual claim being verified | |
evidence (list): Evidence items to evaluate against the claim | |
Returns: | |
list: Classification results with labels and confidence scores | |
""" | |
logger.info(f"Classifying evidence for claim: {query}") | |
# Get the LLM model | |
llm_model = get_llm_model() | |
# Skip if no evidence | |
if not evidence: | |
logger.warning("No evidence provided for classification") | |
return [] | |
# Normalize evidence to a list | |
if not isinstance(evidence, list): | |
if evidence: | |
try: | |
evidence = [evidence] | |
except Exception as e: | |
logger.error(f"Could not convert evidence to list: {e}") | |
return [] | |
else: | |
return [] | |
# Extract essential claim components for improved keyword detection | |
claim_components = extract_claim_keywords(query) | |
essential_keywords = claim_components.get("keywords", []) | |
essential_entities = claim_components.get("entities", []) | |
# Ensure processing is limited to top 10 evidence items to reduce token usage | |
evidence = evidence[:10] | |
# Validate evidence for verifiable sources | |
validated_evidence = [] | |
for idx, chunk in enumerate(evidence): | |
# Basic evidence validation | |
if not isinstance(chunk, str) or not chunk.strip(): | |
continue | |
# Check if evidence contains source information | |
has_valid_source = False | |
if "URL:" in chunk and ("http://" in chunk or "https://" in chunk): | |
has_valid_source = True | |
elif "Source:" in chunk and len(chunk.split("Source:")[1].strip()) > 3: | |
has_valid_source = True | |
# Add validation flag to evidence | |
validated_evidence.append({ | |
"text": chunk, | |
"index": idx + 1, | |
"has_valid_source": has_valid_source | |
}) | |
# If no valid evidence remains, return early | |
if not validated_evidence: | |
logger.warning("No valid evidence items to classify") | |
return [] | |
try: | |
# Format evidence items with validation information | |
evidence_text = "" | |
for item in validated_evidence: | |
# Truncate long evidence | |
chunk_text = item["text"] | |
if len(chunk_text) > 1000: | |
chunk_text = chunk_text[:1000] + "..." | |
# Include validation status in the prompt | |
source_status = "WITH VALID SOURCE" if item["has_valid_source"] else "WARNING: NO CLEAR SOURCE" | |
evidence_text += f"EVIDENCE {item['index']}:\n{chunk_text}\n[{source_status}]\n\n" | |
# Create a structured prompt with explicit format instructions and validation requirements | |
prompt = f""" | |
CLAIM: {query} | |
EVIDENCE: | |
{evidence_text} | |
TASK: Evaluate if each evidence supports, contradicts, or is insufficient/irrelevant to the claim. | |
INSTRUCTIONS: | |
1. For each evidence, provide your analysis in EXACTLY this format: | |
EVIDENCE [number] ANALYSIS: | |
Classification: [Choose exactly one: support/contradict/insufficient] | |
Confidence: [number between 0-100] | |
Reason: [brief explanation] | |
2. Support = Evidence EXPLICITLY confirms ALL parts of the claim are true | |
3. Contradict = Evidence EXPLICITLY confirms the claim is false | |
4. Insufficient = Evidence is irrelevant, ambiguous, or doesn't provide enough information | |
CRITICAL VALIDATION RULES: | |
- Mark as "support" ONLY when evidence EXPLICITLY mentions ALL key entities AND actions from the claim | |
- Do not label evidence as "support" if it only discusses the same topic without confirming the specific claim | |
- Do not make inferential leaps - if the evidence doesn't explicitly state the claim, mark as "insufficient" | |
- Assign LOW confidence (0-50) when evidence doesn't explicitly mention all claim elements | |
- Assign ZERO confidence (0) to evidence without valid sources | |
- If evidence describes similar but different events, mark as "insufficient", not "support" or "contradict" | |
- If evidence describes the same topic as the claim but does not confirm or contradict the claim, mark as "insufficient", not "support" or "contradict" | |
- If evidence is in a different language or unrelated topic, mark as "insufficient" with 0 confidence | |
- Check that all entities (names, places, dates, numbers) in the claim are explicitly confirmed | |
FOCUS ON THE EXACT CLAIM ONLY. | |
ESSENTIAL KEYWORDS TO LOOK FOR: {', '.join(essential_keywords)} | |
ESSENTIAL ENTITIES TO VERIFY: {', '.join(essential_entities)} | |
IMPORTANT NOTE ABOUT VERB TENSES: When analyzing this claim, treat present tense verbs (like "unveils") | |
and perfect form verbs (like "has unveiled") as equivalent to their simple past tense forms | |
(like "unveiled"). The tense variation should not affect your classification decision. | |
""" | |
# Get response with temperature=0 for consistency | |
result = llm_model.invoke(prompt, temperature=0) | |
result_text = result.content.strip() | |
# Log the raw LLM response for debugging | |
logger.debug(f"Raw LLM classification response:\n{result_text}") | |
# Define a more flexible regex pattern matching the requested format | |
# This pattern accommodates variations in whitespace and formatting | |
analysis_pattern = r'EVIDENCE\s+(\d+)\s+ANALYSIS:[\s\n]*Classification:[\s\n]*(support|contradict|insufficient)[\s\n]*Confidence:[\s\n]*(\d+)[\s\n]*Reason:[\s\n]*(.*?)(?=[\s\n]*EVIDENCE\s+\d+\s+ANALYSIS:|[\s\n]*$)' | |
# Parse each evidence analysis | |
classification_results = [] | |
# Try matching with our pattern | |
matches = list(re.finditer(analysis_pattern, result_text, re.IGNORECASE | re.DOTALL)) | |
# Log match information for debugging | |
logger.debug(f"Found {len(matches)} structured evidence analyses in response") | |
# Process matches | |
for match in matches: | |
try: | |
evidence_idx = int(match.group(1)) - 1 | |
classification = match.group(2).lower() | |
confidence = int(match.group(3)) / 100.0 # Convert to 0-1 scale | |
reason = match.group(4).strip() | |
# Check if this evidence item exists in our original list | |
if 0 <= evidence_idx < len(evidence): | |
# Get the original evidence text | |
evidence_text = evidence[evidence_idx] | |
# Check for valid source | |
source_valid = False | |
if "URL:" in evidence_text and ("http://" in evidence_text or "https://" in evidence_text): | |
source_valid = True | |
elif "Source:" in evidence_text: | |
source_valid = True | |
# Reduce confidence for evidence without valid sources | |
if not source_valid and confidence > 0.3: | |
confidence = 0.3 | |
reason += " (Confidence reduced due to lack of verifiable source)" | |
# Create result entry | |
classification_results.append({ | |
"label": classification, | |
"confidence": confidence, | |
"evidence": evidence_text, | |
"reason": reason | |
}) | |
except (ValueError, IndexError) as e: | |
logger.error(f"Error parsing evidence analysis: {e}") | |
# If no structured matches were found, try using a simpler approach | |
if not classification_results: | |
logger.warning("No structured evidence analysis found, using fallback method") | |
# Log detailed information about the failure | |
logger.warning(f"Expected format not found in response. Response excerpt: {result_text[:200]}...") | |
# Simple fallback parsing based on keywords | |
for idx, ev in enumerate(evidence): | |
# Check for keywords in the LLM response | |
ev_mention = f"EVIDENCE {idx+1}" | |
if ev_mention in result_text: | |
# Find the section for this evidence | |
parts = result_text.split(ev_mention) | |
if len(parts) > 1: | |
analysis_text = parts[1].split("EVIDENCE")[0] if "EVIDENCE" in parts[1] else parts[1] | |
# Determine classification | |
label = "insufficient" # Default | |
confidence = 0.0 # Default - zero confidence for fallback parsing | |
# Check for support indicators | |
if "support" in analysis_text.lower() or "confirms" in analysis_text.lower(): | |
label = "support" | |
confidence = 0.4 # Lower confidence for fallback support | |
# Check for contradict indicators | |
elif "contradict" in analysis_text.lower() or "false" in analysis_text.lower(): | |
label = "contradict" | |
confidence = 0.4 # Lower confidence for fallback contradict | |
# Check for valid source to adjust confidence | |
source_valid = False | |
if "URL:" in ev and ("http://" in ev or "https://" in ev): | |
source_valid = True | |
elif "Source:" in ev: | |
source_valid = True | |
if not source_valid: | |
confidence = min(confidence, 0.3) | |
# Create basic result | |
classification_results.append({ | |
"label": label, | |
"confidence": confidence, | |
"evidence": ev, | |
"reason": f"Determined via fallback parsing. {'Valid source found.' if source_valid else 'Warning: No clear source identified.'}" | |
}) | |
logger.debug(f"Fallback parsing for evidence {idx+1}: {label} with confidence {confidence}") | |
logger.info(f"Classified {len(classification_results)} evidence items") | |
return classification_results | |
except Exception as e: | |
logger.error(f"Error in evidence classification: {str(e)}") | |
# Provide a basic fallback | |
fallback_results = [] | |
for ev in evidence: | |
fallback_results.append({ | |
"label": "insufficient", | |
"confidence": 0.5, | |
"evidence": ev, | |
"reason": "Classification failed with error, using fallback" | |
}) | |
return fallback_results | |
def normalize_tense(claim): | |
""" | |
Normalize verb tenses in claims to ensure consistent classification. | |
This function standardizes verb forms by converting present simple tense | |
verbs (e.g., "unveils") and perfect forms (e.g., "has unveiled") to their | |
past tense equivalents (e.g., "unveiled"). This ensures that semantically | |
equivalent claims are processed consistently regardless of verb tense | |
variations. | |
Args: | |
claim (str): The original claim text to normalize | |
Returns: | |
str: The normalized claim with consistent tense handling | |
Note: | |
This function specifically targets present simple and perfect forms, | |
preserving the semantic differences of continuous forms (is unveiling) | |
and future tense (will unveil). | |
""" | |
# Define patterns to normalize common verb forms. | |
# Each tuple contains (regex_pattern, replacement_text) | |
tense_patterns = [ | |
# Present simple to past tense conversions | |
(r'\bunveils\b', r'unveiled'), | |
(r'\blaunches\b', r'launched'), | |
(r'\breleases\b', r'released'), | |
(r'\bannounces\b', r'announced'), | |
(r'\binvites\b', r'invited'), | |
(r'\bretaliates\b', r'retaliated'), | |
(r'\bends\b', r'ended'), | |
(r'\bbegins\b', r'began'), | |
(r'\bstarts\b', r'started'), | |
(r'\bcompletes\b', r'completed'), | |
(r'\bfinishes\b', r'finished'), | |
(r'\bintroduces\b', r'introduced'), | |
(r'\bcreates\b', r'created'), | |
(r'\bdevelops\b', r'developed'), | |
(r'\bpublishes\b', r'published'), | |
(r'\bacquires\b', r'acquired'), | |
(r'\bbuys\b', r'bought'), | |
(r'\bsells\b', r'sold'), | |
# Perfect forms (has/have/had + past participle) to simple past | |
(r'\b(has|have|had)\s+unveiled\b', r'unveiled'), | |
(r'\b(has|have|had)\s+launched\b', r'launched'), | |
(r'\b(has|have|had)\s+released\b', r'released'), | |
(r'\b(has|have|had)\s+announced\b', r'announced'), | |
(r'\b(has|have|had)\s+invited\b', r'invited'), | |
(r'\b(has|have|had)\s+retaliated\b', r'retaliated'), | |
(r'\b(has|have|had)\s+ended\b', r'ended'), | |
(r'\b(has|have|had)\s+begun\b', r'began'), | |
(r'\b(has|have|had)\s+started\b', r'started'), | |
(r'\b(has|have|had)\s+introduced\b', r'introduced'), | |
(r'\b(has|have|had)\s+created\b', r'created'), | |
(r'\b(has|have|had)\s+developed\b', r'developed'), | |
(r'\b(has|have|had)\s+published\b', r'published'), | |
(r'\b(has|have|had)\s+acquired\b', r'acquired'), | |
(r'\b(has|have|had)\s+bought\b', r'bought'), | |
(r'\b(has|have|had)\s+sold\b', r'sold') | |
] | |
# Apply normalization patterns | |
normalized = claim | |
for pattern, replacement in tense_patterns: | |
normalized = re.sub(pattern, replacement, normalized, flags=re.IGNORECASE) | |
# Log if normalization occurred for debugging purposes | |
if normalized != claim: | |
logger.info(f"Normalized claim from: '{claim}' to: '{normalized}'") | |
return normalized | |
def aggregate_evidence(classification_results): | |
""" | |
Aggregate evidence classifications to determine overall verdict | |
using a weighted scoring system of evidence count and quality. | |
Args: | |
classification_results (list): List of evidence classification results | |
Returns: | |
tuple: (verdict, confidence) - The final verdict and confidence score | |
""" | |
logger.info(f"Aggregating evidence from {len(classification_results) if classification_results else 0} results") | |
if not classification_results: | |
logger.warning("No classification results to aggregate") | |
return "Uncertain", 0.0 # Default with zero confidence | |
# Only consider support and contradict evidence items | |
support_items = [item for item in classification_results if item.get("label") == "support"] | |
contradict_items = [item for item in classification_results if item.get("label") == "contradict"] | |
# Count number of support and contradict items | |
support_count = len(support_items) | |
contradict_count = len(contradict_items) | |
# Calculate confidence scores for support and contradict items | |
support_confidence_sum = sum(item.get("confidence", 0) for item in support_items) | |
contradict_confidence_sum = sum(item.get("confidence", 0) for item in contradict_items) | |
# Apply weights: 55% for count, 45% for quality (confidence) | |
# Normalize counts to avoid division by zero | |
max_count = max(1, max(support_count, contradict_count)) | |
# Calculate weighted scores | |
count_support_score = (support_count / max_count) * 0.55 | |
count_contradict_score = (contradict_count / max_count) * 0.55 | |
# Normalize confidence scores to avoid division by zero | |
max_confidence_sum = max(1, max(support_confidence_sum, contradict_confidence_sum)) | |
quality_support_score = (support_confidence_sum / max_confidence_sum) * 0.45 | |
quality_contradict_score = (contradict_confidence_sum / max_confidence_sum) * 0.45 | |
# Total scores | |
total_support = count_support_score + quality_support_score | |
total_contradict = count_contradict_score + quality_contradict_score | |
# Check if all evidence is irrelevant/insufficient | |
if support_count == 0 and contradict_count == 0: | |
logger.info("All evidence items are irrelevant/insufficient") | |
return "Uncertain", 0.0 | |
# Determine verdict based on higher total score | |
if total_support > total_contradict: | |
verdict = "True (Based on Evidence)" | |
min_score = total_contradict | |
max_score = total_support | |
else: | |
verdict = "False (Based on Evidence)" | |
min_score = total_support | |
max_score = total_contradict | |
# Calculate final confidence using the formula: | |
# (1 - min_score/max_score) * 100% | |
if max_score > 0: | |
final_confidence = 1.0 - (min_score / max_score) | |
else: | |
final_confidence = 0.0 | |
# Handle cases where confidence is very low | |
if final_confidence == 0.0: | |
return "Uncertain", 0.0 | |
elif final_confidence < 0.1: # Less than 10% | |
# Keep the verdict but with very low confidence | |
logger.info(f"Very low confidence verdict: {verdict} with {final_confidence:.2f} confidence") | |
logger.info(f"Final verdict: {verdict}, confidence: {final_confidence:.2f}") | |
return verdict, final_confidence | |
def extract_claim_keywords(claim): | |
""" | |
Extract important keywords from claim using NLP processing | |
Args: | |
claim (str): The claim text | |
Returns: | |
dict: Dictionary containing keywords and other claim components | |
""" | |
try: | |
# Get NLP model | |
nlp = get_nlp_model() | |
# Process claim with NLP | |
doc = nlp(claim) | |
# Extract entities | |
entities = [ent.text for ent in doc.ents] | |
# Extract important keywords (non-stopword nouns, adjectives, and verbs longer than 3 chars) | |
keywords = [] | |
for token in doc: | |
# Keep all important parts of speech, longer than 3 characters | |
if token.pos_ in ["NOUN", "PROPN", "ADJ", "VERB"] and not token.is_stop and len(token.text) > 3: | |
keywords.append(token.text.lower()) | |
# Also include some important modifiers and quantifiers | |
elif token.pos_ in ["NUM", "ADV"] and not token.is_stop and len(token.text) > 1: | |
keywords.append(token.text.lower()) | |
# Extract verbs separately | |
verbs = [token.lemma_.lower() for token in doc if token.pos_ == "VERB" and not token.is_stop] | |
# Also extract multi-word phrases that might be important | |
noun_phrases = [] | |
for chunk in doc.noun_chunks: | |
if len(chunk.text) > 3 and not all(token.is_stop for token in chunk): | |
noun_phrases.append(chunk.text.lower()) | |
# Add phrases to keywords if not already included | |
for phrase in noun_phrases: | |
if phrase not in keywords and phrase.lower() not in [k.lower() for k in keywords]: | |
keywords.append(phrase.lower()) | |
# Return all components | |
return { | |
"entities": entities, | |
"keywords": keywords, | |
"verbs": verbs, | |
"noun_phrases": noun_phrases | |
} | |
except Exception as e: | |
logger.error(f"Error extracting claim keywords: {e}") | |
# Return basic fallback using simple word extraction | |
words = [word.lower() for word in claim.split() if len(word) > 3] | |
return {"keywords": words, "entities": [], "verbs": [], "noun_phrases": []} |