import os import gradio as gr import pandas as pd from datetime import datetime from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional import numpy as np from mistralai import Mistral from openai import OpenAI import re import json import logging import time import concurrent.futures from concurrent.futures import ThreadPoolExecutor import threading import pymongo from pymongo import MongoClient from bson.objectid import ObjectId from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class HallucinationJudgment(BaseModel): hallucination_detected: bool = Field(description="Whether a hallucination is detected across the responses") confidence_score: float = Field(description="Confidence score between 0-1 for the hallucination judgment") conflicting_facts: List[Dict[str, Any]] = Field(description="List of conflicting facts found in the responses") reasoning: str = Field(description="Detailed reasoning for the judgment") summary: str = Field(description="A summary of the analysis") class PAS2: """Paraphrase-based Approach for LLM Systems - Using llm-as-judge methods""" def __init__(self, mistral_api_key=None, openai_api_key=None, xai_api_key=None, qwen_api_key=None, deepseek_api_key=None, gemini_api_key=None, progress_callback=None): """Initialize the PAS2 with API keys""" # For Hugging Face Spaces, we prioritize getting API keys from HF_* environment variables # which are set from the Secrets tab in the Space settings self.mistral_api_key = mistral_api_key or os.environ.get("HF_MISTRAL_API_KEY") or os.environ.get("MISTRAL_API_KEY") self.openai_api_key = openai_api_key or os.environ.get("HF_OPENAI_API_KEY") or os.environ.get("OPENAI_API_KEY") self.xai_api_key = xai_api_key or os.environ.get("HF_XAI_API_KEY") or os.environ.get("XAI_API_KEY") self.qwen_api_key = qwen_api_key or os.environ.get("HF_QWEN_API_KEY") or os.environ.get("QWEN_API_KEY") self.deepseek_api_key = deepseek_api_key or os.environ.get("HF_DEEPSEEK_API_KEY") or os.environ.get("DEEPSEEK_API_KEY") self.gemini_api_key = gemini_api_key or os.environ.get("HF_GEMINI_API_KEY") or os.environ.get("GEMINI_API_KEY") self.progress_callback = progress_callback if not self.mistral_api_key: raise ValueError("Mistral API key is required. Set it via HF_MISTRAL_API_KEY in Hugging Face Spaces secrets or pass it as a parameter.") if not self.openai_api_key: raise ValueError("OpenAI API key is required. Set it via HF_OPENAI_API_KEY in Hugging Face Spaces secrets or pass it as a parameter.") self.mistral_client = Mistral(api_key=self.mistral_api_key) self.openai_client = OpenAI(api_key=self.openai_api_key) self.xai_client = OpenAI(api_key=self.xai_api_key, base_url="https://api.x.ai/v1") self.qwen_client = OpenAI(api_key=self.qwen_api_key, base_url="https://router.huggingface.co/nebius/v1") self.deepseek_client = OpenAI(api_key=self.deepseek_api_key, base_url="https://api.deepseek.com") self.gemini_client = OpenAI(api_key=self.gemini_api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/") # Define model names self.mistral_model = "mistral-large-latest" self.openai_o4mini = "o4-mini" self.openai_4o = "gpt-4o" self.deepseek_model = "deepseek-reasoner" self.grok_model = "grok-3-beta" self.qwen_model = "Qwen/Qwen3-235B-A22B" self.gemini_model = "gemini-2.5-pro-preview-05-06" # Create a dictionary mapping model names to their clients and model identifiers self.model_configs = { "mistral-large": { "client": self.mistral_client, "model_id": self.mistral_model, "type": "mistral" }, "o4-mini": { "client": self.openai_client, "model_id": self.openai_o4mini, "type": "openai" }, "gpt-4o": { "client": self.openai_client, "model_id": self.openai_4o, "type": "openai" }, "deepseek-reasoner": { "client": self.deepseek_client, "model_id": self.deepseek_model, "type": "openai" }, "grok-3": { "client": self.xai_client, "model_id": self.grok_model, "type": "openai" }, "qwen-235b": { "client": self.qwen_client, "model_id": self.qwen_model, "type": "openai" }, "gemini-2.5-pro": { "client": self.gemini_client, "model_id": self.gemini_model, "type": "openai" } } # Set default models (will be randomized later) self.generator_model = "mistral-large" self.judge_model = "o4-mini" logger.info("PAS2 initialized with available models: %s", ", ".join(self.model_configs.keys())) def generate_paraphrases(self, query: str, n_paraphrases: int = 3) -> List[str]: """Generate paraphrases of the input query using Mistral API""" logger.info("Generating %d paraphrases for query: %s", n_paraphrases, query) start_time = time.time() messages = [ { "role": "system", "content": f"You are an expert at creating semantically equivalent paraphrases. Generate {n_paraphrases} different paraphrases of the given query that preserve the original meaning but vary in wording and structure. Return a JSON array of strings, each containing one paraphrase." }, { "role": "user", "content": query } ] try: logger.info("Sending paraphrase generation request to Mistral API...") response = self.mistral_client.chat.complete( model=self.mistral_model, messages=messages, response_format={"type": "json_object"} ) content = response.choices[0].message.content logger.debug("Received raw paraphrase response: %s", content) paraphrases_data = json.loads(content) # Handle different possible JSON structures if isinstance(paraphrases_data, dict) and "paraphrases" in paraphrases_data: paraphrases = paraphrases_data["paraphrases"] elif isinstance(paraphrases_data, dict) and "results" in paraphrases_data: paraphrases = paraphrases_data["results"] elif isinstance(paraphrases_data, list): paraphrases = paraphrases_data else: # Try to extract a list from any field for key, value in paraphrases_data.items(): if isinstance(value, list) and len(value) > 0: paraphrases = value break else: logger.warning("Could not extract paraphrases from response: %s", content) raise ValueError(f"Could not extract paraphrases from response: {content}") # Ensure we have the right number of paraphrases paraphrases = paraphrases[:n_paraphrases] # Add the original query as the first item all_queries = [query] + paraphrases elapsed_time = time.time() - start_time logger.info("Generated %d paraphrases in %.2f seconds", len(paraphrases), elapsed_time) for i, p in enumerate(paraphrases, 1): logger.info("Paraphrase %d: %s", i, p) return all_queries except Exception as e: logger.error("Error generating paraphrases: %s", str(e), exc_info=True) # Return original plus simple paraphrases as fallback fallback_paraphrases = [ query, f"Could you tell me about {query.strip('?')}?", f"I'd like to know: {query}", f"Please provide information on {query.strip('?')}." ][:n_paraphrases+1] logger.info("Using fallback paraphrases due to error") for i, p in enumerate(fallback_paraphrases[1:], 1): logger.info("Fallback paraphrase %d: %s", i, p) return fallback_paraphrases def set_random_model_pair(self): """Randomly select a pair of generator and judge models""" import random # Get list of available models available_models = list(self.model_configs.keys()) # Randomly select generator and judge models self.generator_model = random.choice(available_models) # Make sure judge is different from generator judge_options = [m for m in available_models if m != self.generator_model] self.judge_model = random.choice(judge_options) logger.info("Randomly selected model pair - Generator: %s, Judge: %s", self.generator_model, self.judge_model) return self.generator_model, self.judge_model def _get_single_response(self, query: str, index: int = None) -> str: """Get a single response from the selected generator model for a query""" try: query_description = f"Query {index}: {query}" if index is not None else f"Query: {query}" logger.info("Getting response for %s using %s", query_description, self.generator_model) start_time = time.time() # Get the model configuration model_config = self.model_configs[self.generator_model] client = model_config["client"] model_id = model_config["model_id"] model_type = model_config["type"] # Customize messages based on model system_content = "You are a helpful AI assistant. Provide accurate, factual information in response to questions." user_content = query # Special handling for deepseek-reasoner if model_id == "deepseek-reasoner": user_content = f"Extract the following information and format it as JSON:\n\n{query}" messages = [ { "role": "system", "content": system_content }, { "role": "user", "content": user_content } ] # Use the appropriate client and model based on the type if model_type == "mistral": response = client.chat.complete( model=model_id, messages=messages ) result = response.choices[0].message.content else: # openai-compatible API response = client.chat.completions.create( model=model_id, messages=messages ) result = response.choices[0].message.content elapsed_time = time.time() - start_time logger.info("Received response from %s for %s (%.2f seconds)", self.generator_model, query_description, elapsed_time) logger.debug("Response content for %s: %s", query_description, result[:100] + "..." if len(result) > 100 else result) return result except Exception as e: error_msg = f"Error getting response for query '{query}' with model {self.generator_model}: {e}" logger.error(error_msg, exc_info=True) return f"Error: Failed to get response for this query with model {self.generator_model}." def get_responses(self, queries: List[str]) -> List[str]: """Get responses from Mistral API for each query in parallel""" logger.info("Getting responses for %d queries in parallel", len(queries)) start_time = time.time() # Use ThreadPoolExecutor for parallel API calls with ThreadPoolExecutor(max_workers=min(len(queries), 5)) as executor: # Submit tasks and map them to their original indices future_to_index = { executor.submit(self._get_single_response, query, i): i for i, query in enumerate(queries) } # Prepare a list with the correct length responses = [""] * len(queries) # Counter for completed responses completed_count = 0 # Collect results as they complete for future in concurrent.futures.as_completed(future_to_index): index = future_to_index[future] try: responses[index] = future.result() # Update completion count and report progress completed_count += 1 if self.progress_callback: self.progress_callback("responses_progress", completed_responses=completed_count, total_responses=len(queries)) except Exception as e: logger.error("Error processing response for index %d: %s", index, str(e)) responses[index] = f"Error: Failed to get response for query {index}." # Still update completion count even for errors completed_count += 1 if self.progress_callback: self.progress_callback("responses_progress", completed_responses=completed_count, total_responses=len(queries)) elapsed_time = time.time() - start_time logger.info("Received all %d responses in %.2f seconds total", len(responses), elapsed_time) return responses def detect_hallucination(self, query: str, n_paraphrases: int = 3) -> Dict: """ Detect hallucinations by comparing responses to paraphrased queries using a judge model Returns: Dict containing hallucination judgment and all responses """ logger.info("Starting hallucination detection for query: %s", query) start_time = time.time() # Randomly select a model pair for this detection generator_model, judge_model = self.set_random_model_pair() logger.info("Using %s as generator and %s as judge for this detection", generator_model, judge_model) # Report progress if self.progress_callback: self.progress_callback("starting", query=query) # Generate paraphrases logger.info("Step 1: Generating paraphrases") if self.progress_callback: self.progress_callback("generating_paraphrases", query=query) all_queries = self.generate_paraphrases(query, n_paraphrases) if self.progress_callback: self.progress_callback("paraphrases_complete", query=query, count=len(all_queries)) # Get responses to all queries logger.info("Step 2: Getting responses to all %d queries using %s", len(all_queries), generator_model) if self.progress_callback: self.progress_callback("getting_responses", query=query, total=len(all_queries), model=generator_model) all_responses = [] for i, q in enumerate(all_queries): logger.info("Getting response %d/%d for query: %s", i+1, len(all_queries), q) if self.progress_callback: self.progress_callback("responses_progress", query=query, completed=i, total=len(all_queries)) response = self._get_single_response(q, index=i) all_responses.append(response) if self.progress_callback: self.progress_callback("responses_complete", query=query) # Judge the responses for hallucinations logger.info("Step 3: Judging for hallucinations using %s", judge_model) if self.progress_callback: self.progress_callback("judging", query=query, model=judge_model) # The first query is the original, rest are paraphrases original_query = all_queries[0] original_response = all_responses[0] paraphrased_queries = all_queries[1:] if len(all_queries) > 1 else [] paraphrased_responses = all_responses[1:] if len(all_responses) > 1 else [] # Judge the responses judgment = self.judge_hallucination( original_query=original_query, original_response=original_response, paraphrased_queries=paraphrased_queries, paraphrased_responses=paraphrased_responses ) # Assemble the results results = { "original_query": original_query, "original_response": original_response, "paraphrased_queries": paraphrased_queries, "paraphrased_responses": paraphrased_responses, "hallucination_detected": judgment.hallucination_detected, "confidence_score": judgment.confidence_score, "conflicting_facts": judgment.conflicting_facts, "reasoning": judgment.reasoning, "summary": judgment.summary, "generator_model": generator_model, "judge_model": judge_model } # Report completion if self.progress_callback: self.progress_callback("complete", query=query, generator=generator_model, judge=judge_model) logger.info("Hallucination detection completed in %.2f seconds using %s (generator) and %s (judge)", time.time() - start_time, generator_model, judge_model) return results def judge_hallucination(self, original_query: str, original_response: str, paraphrased_queries: List[str], paraphrased_responses: List[str]) -> HallucinationJudgment: """ Use the selected judge model to detect hallucinations in the responses """ logger.info("Judging hallucinations with %s model", self.judge_model) start_time = time.time() # Get the model configuration for the judge model_config = self.model_configs[self.judge_model] client = model_config["client"] model_id = model_config["model_id"] model_type = model_config["type"] # Prepare the context for the judge context = f""" Original Question: {original_query} Original Response: {original_response} Paraphrased Questions and their Responses: """ for i, (query, response) in enumerate(zip(paraphrased_queries, paraphrased_responses), 1): context += f"\nParaphrased Question {i}: {query}\n\nResponse {i}:\n{response}\n" system_prompt = """ You are a judge evaluating whether an AI is hallucinating across different responses to semantically equivalent questions. Analyze all responses carefully to identify any factual inconsistencies or contradictions. Focus on factual discrepancies, not stylistic differences. A hallucination is when the AI states different facts in response to questions that are asking for the same information. Your response should be a JSON with the following fields: - hallucination_detected: boolean indicating whether hallucinations were found - confidence_score: number between 0 and 1 representing your confidence in the judgment - conflicting_facts: an array of objects describing any conflicting information found - reasoning: detailed explanation for your judgment - summary: a concise summary of your analysis """ try: logger.info("Sending judgment request to %s...", self.judge_model) # Customize the system prompt for deepseek-reasoner customized_system_prompt = system_prompt user_content = f"Evaluate these responses for hallucinations:\n\n{context}" # Additional prompt engineering for deepseek-reasoner if model_id == "deepseek-reasoner": user_content = f"""Extract the following information and format it as JSON: Evaluate these responses for hallucinations:\n\n{context}\n\n - hallucination_detected: boolean indicating whether hallucinations were found - confidence_score: number between 0 and 1 representing your confidence in the judgment - conflicting_facts: an array of objects describing any conflicting information found - reasoning: detailed explanation for your judgment - summary: a concise summary of your analysis Respond ONLY with valid JSON and no other text. """ # Use the appropriate client and model based on the type if model_type == "mistral": response = client.chat.complete( model=model_id, messages=[ {"role": "system", "content": customized_system_prompt}, {"role": "user", "content": user_content} ], response_format={"type": "json_object"} ) content = response.choices[0].message.content # Normal JSON parsing for mistral result_json = json.loads(content) else: # openai-compatible API response = client.chat.completions.create( model=model_id, messages=[ {"role": "system", "content": customized_system_prompt}, {"role": "user", "content": user_content} ], response_format={"type": "json_object"} ) content = response.choices[0].message.content result_json = json.loads(content) logger.debug("Received judgment response from %s: %s", self.judge_model, result_json) # Create the HallucinationJudgment object from the JSON response judgment = HallucinationJudgment( hallucination_detected=result_json.get("hallucination_detected", False), confidence_score=result_json.get("confidence_score", 0.0), conflicting_facts=result_json.get("conflicting_facts", []), reasoning=result_json.get("reasoning", "No reasoning provided."), summary=result_json.get("summary", "No summary provided.") ) elapsed_time = time.time() - start_time logger.info("Judgment completed by %s in %.2f seconds", self.judge_model, elapsed_time) return judgment except Exception as e: logger.error("Error in hallucination judgment with %s: %s", self.judge_model, str(e), exc_info=True) # Return a fallback judgment return HallucinationJudgment( hallucination_detected=False, confidence_score=0.0, conflicting_facts=[], reasoning=f"Failed to obtain judgment from the {self.judge_model} model: {str(e)}", summary="Analysis failed due to API error." ) class HallucinationDetectorApp: def __init__(self): self.pas2 = None logger.info("Initializing HallucinationDetectorApp") self._initialize_database() self.progress_callback = None def _initialize_database(self): """Initialize MongoDB connection for persistent feedback storage""" try: # Get MongoDB connection string from environment variable mongo_uri = os.environ.get("MONGODB_URI") if not mongo_uri: logger.warning("MONGODB_URI not found in environment variables. Please set it in HuggingFace Spaces secrets.") logger.warning("Using a placeholder URI for now - connection will fail until proper URI is provided.") # Use a placeholder - this will fail but allows the app to initialize mongo_uri = "mongodb+srv://username:password@cluster.mongodb.net/?retryWrites=true&w=majority" # Connect to MongoDB self.mongo_client = MongoClient(mongo_uri) # Access or create database self.db = self.mongo_client["hallucination_detector"] # Access or create collection self.feedback_collection = self.db["feedback"] # Create index on timestamp for faster querying self.feedback_collection.create_index("timestamp") # Test connection self.mongo_client.admin.command('ping') logger.info("MongoDB connection successful") except Exception as e: logger.error(f"Error initializing MongoDB: {str(e)}", exc_info=True) logger.warning("Proceeding without database connection. Data will not be saved persistently.") self.mongo_client = None self.db = None self.feedback_collection = None def set_progress_callback(self, callback): """Set the progress callback function""" self.progress_callback = callback def initialize_api(self, mistral_api_key, openai_api_key): """Initialize the PAS2 with API keys""" try: logger.info("Initializing PAS2 with API keys") self.pas2 = PAS2( mistral_api_key=mistral_api_key, openai_api_key=openai_api_key, progress_callback=self.progress_callback ) logger.info("API initialization successful") return "API keys set successfully! You can now use the application." except Exception as e: logger.error("Error initializing API: %s", str(e), exc_info=True) return f"Error initializing API: {str(e)}" def process_query(self, query: str): """Process the query using PAS2""" if not self.pas2: logger.error("PAS2 not initialized") return { "error": "Please set API keys first before processing queries." } if not query.strip(): logger.warning("Empty query provided") return { "error": "Please enter a query." } try: # Set the progress callback if needed if self.progress_callback and self.pas2.progress_callback != self.progress_callback: self.pas2.progress_callback = self.progress_callback # Process the query logger.info("Processing query with PAS2: %s", query) results = self.pas2.detect_hallucination(query) logger.info("Query processing completed successfully") return results except Exception as e: logger.error("Error processing query: %s", str(e), exc_info=True) return { "error": f"Error processing query: {str(e)}" } def save_feedback(self, results, feedback): """Save results and user feedback to MongoDB""" try: logger.info("Saving user feedback: %s", feedback) if self.feedback_collection is None: logger.error("MongoDB connection not available. Cannot save feedback.") return "Database connection not available. Feedback not saved." # Prepare document for MongoDB document = { "timestamp": datetime.now(), "original_query": results.get('original_query', ''), "original_response": results.get('original_response', ''), "paraphrased_queries": results.get('paraphrased_queries', []), "paraphrased_responses": results.get('paraphrased_responses', []), "hallucination_detected": results.get('hallucination_detected', False), "confidence_score": results.get('confidence_score', 0.0), "conflicting_facts": results.get('conflicting_facts', []), "reasoning": results.get('reasoning', ''), "summary": results.get('summary', ''), "generator_model": results.get('generator_model', 'unknown'), "judge_model": results.get('judge_model', 'unknown'), "user_feedback": feedback } # Insert document into collection result = self.feedback_collection.insert_one(document) # Update model leaderboard scores self._update_model_scores( generator=results.get('generator_model', 'unknown'), judge=results.get('judge_model', 'unknown'), feedback=feedback, hallucination_detected=results.get('hallucination_detected', False) ) logger.info("Feedback saved successfully to MongoDB") return "Feedback saved successfully!" except Exception as e: logger.error("Error saving feedback: %s", str(e), exc_info=True) return f"Error saving feedback: {str(e)}" def _update_model_scores(self, generator, judge, feedback, hallucination_detected): """Update the ELO scores for the generator and judge models based on feedback""" try: if self.db is None: logger.error("MongoDB connection not available. Cannot update model scores.") return # Access or create the models collection models_collection = self.db.get_collection("model_scores") # Create indexes if they don't exist models_collection.create_index("model_name", unique=True) # Parse the feedback to determine scenario actual_hallucination = "Yes, there was a hallucination" in feedback no_hallucination = "No, there was no hallucination" in feedback judge_correct = "Yes, the judge was correct" in feedback judge_incorrect = "No, the judge was incorrect" in feedback # Determine scores based on different scenarios: # 1. Actual hallucination + Judge correct = positive for judge, negative for generator # 2. No hallucination + Judge correct = positive for both # 3. No hallucination + Judge incorrect = negative for judge, positive for generator # 4. Actual hallucination + Judge incorrect = negative for both if judge_correct: if actual_hallucination: # Scenario 1: Judge correctly detected hallucination judge_score = 1 # Positive for judge generator_score = 0 # Negative for generator (hallucinated) logger.info("Judge %s correctly detected hallucination from generator %s", judge, generator) elif no_hallucination: # Scenario 2: Judge correctly determined no hallucination judge_score = 1 # Positive for judge generator_score = 1 # Positive for generator (didn't hallucinate) logger.info("Judge %s correctly determined no hallucination from generator %s", judge, generator) else: # User unsure about hallucination, but confirmed judge was correct judge_score = 1 # Positive for judge generator_score = 0.5 # Neutral for generator (unclear) logger.info("User confirmed judge %s was correct, but unclear about hallucination from %s", judge, generator) elif judge_incorrect: if no_hallucination: # Scenario 3: Judge incorrectly claimed hallucination (false positive) judge_score = 0 # Negative for judge generator_score = 1 # Positive for generator (unfairly accused) logger.info("Judge %s incorrectly claimed hallucination from generator %s", judge, generator) elif actual_hallucination: # Scenario 4: Judge missed actual hallucination (false negative) judge_score = 0 # Negative for judge generator_score = 0 # Negative for generator (hallucination went undetected) logger.info("Judge %s missed actual hallucination from generator %s", judge, generator) else: # User unsure about hallucination, but confirmed judge was incorrect judge_score = 0 # Negative for judge generator_score = 0.5 # Neutral for generator (unclear) logger.info("User confirmed judge %s was incorrect, but unclear about hallucination from %s", judge, generator) else: # User unsure about judge correctness, don't update scores judge_score = 0.5 # Neutral for judge (unclear) generator_score = 0.5 # Neutral for generator (unclear) logger.info("User unsure about judge %s correctness and generator %s hallucination", judge, generator) # Update generator model stats with specific score self._update_model_stats(models_collection, generator, generator_score, "generator") # Update judge model stats with specific score self._update_model_stats(models_collection, judge, judge_score, "judge") # Determine if the detection was correct based on judge correctness detection_correct = judge_correct # Determine if there was actually hallucination based on user feedback actual_hallucination_present = actual_hallucination # Update model pair stats self._update_model_pair_stats(generator, judge, detection_correct, actual_hallucination_present, generator_score, judge_score) logger.info("Updated model scores based on feedback: generator(%s)=%s, judge(%s)=%s", generator, generator_score, judge, judge_score) except Exception as e: logger.error("Error updating model scores: %s", str(e), exc_info=True) def _update_model_stats(self, collection, model_name, score, role): """Update statistics for a single model""" # Simplified ELO calculation K_FACTOR = 32 # Standard K-factor for ELO # Get current model data or create if not exists model_data = collection.find_one({"model_name": model_name}) if model_data is None: # Initialize new model with default values model_data = { "model_name": model_name, "elo_score": 1500, # Starting ELO "total_samples": 0, "correct_predictions": 0, "accuracy": 0.0, "as_generator": 0, "as_judge": 0, "as_generator_correct": 0, "as_judge_correct": 0, "neutral_samples": 0 # Add a counter for neutral samples } # Skip counting for neutral feedback (0.5) if score == 0.5: # Increment neutral samples counter instead if "neutral_samples" not in model_data: model_data["neutral_samples"] = 0 model_data["neutral_samples"] += 1 # Expected score based on current rating (vs average rating) expected_score = 1 / (1 + 10**((1500 - model_data["elo_score"]) / 400)) # For neutral score, use a much smaller K factor to slightly adjust the ELO # This handles the "unsure" case with minimal impact model_data["elo_score"] = model_data["elo_score"] + (K_FACTOR/4) * (0.5 - expected_score) # Update or insert the model data collection.replace_one( {"model_name": model_name}, model_data, upsert=True ) return # Update sample counts for non-neutral cases model_data["total_samples"] += 1 if role == "generator": model_data["as_generator"] += 1 if score == 1: # Only count as correct if score is 1 (not 0) model_data["as_generator_correct"] += 1 else: # role == "judge" model_data["as_judge"] += 1 if score == 1: # Only count as correct if score is 1 (not 0) model_data["as_judge_correct"] += 1 # Update correct predictions based on score if score == 1: model_data["correct_predictions"] += 1 # Calculate new accuracy model_data["accuracy"] = model_data["correct_predictions"] / model_data["total_samples"] # Update ELO score based on the specific score value (0 or 1) # Expected score based on current rating (vs average rating) expected_score = 1 / (1 + 10**((1500 - model_data["elo_score"]) / 400)) # Use the provided score (0 or 1) actual_score = score # New ELO calculation model_data["elo_score"] = model_data["elo_score"] + K_FACTOR * (actual_score - expected_score) # Update or insert the model data collection.replace_one( {"model_name": model_name}, model_data, upsert=True ) def _update_model_pair_stats(self, generator, judge, detection_correct, hallucination_detected, generator_score, judge_score): """Update statistics for a model pair combination""" try: # Access or create the model pairs collection pairs_collection = self.db.get_collection("model_pairs") # Create compound index if it doesn't exist pairs_collection.create_index([("generator", 1), ("judge", 1)], unique=True) # Get current pair data or create if not exists pair_data = pairs_collection.find_one({ "generator": generator, "judge": judge }) if pair_data is None: # Initialize new pair with default values pair_data = { "generator": generator, "judge": judge, "elo_score": 1500, # Starting ELO "total_samples": 0, "correct_predictions": 0, "accuracy": 0.0, "hallucinations_detected": 0, "generator_performance": 0.0, "judge_performance": 0.0, "consistency_score": 0.0 } # Update sample counts pair_data["total_samples"] += 1 if detection_correct: pair_data["correct_predictions"] += 1 if hallucination_detected: pair_data["hallucinations_detected"] += 1 # Track model-specific performances within the pair if "generator_correct_count" not in pair_data: pair_data["generator_correct_count"] = 0 if "judge_correct_count" not in pair_data: pair_data["judge_correct_count"] = 0 # Update individual performance counters based on scores if generator_score == 1: pair_data["generator_correct_count"] += 1 if judge_score == 1: pair_data["judge_correct_count"] += 1 # Calculate individual performance rates within the pair pair_data["generator_performance"] = pair_data["generator_correct_count"] / pair_data["total_samples"] pair_data["judge_performance"] = pair_data["judge_correct_count"] / pair_data["total_samples"] # Calculate new accuracy for the pair (detection accuracy) pair_data["accuracy"] = pair_data["correct_predictions"] / pair_data["total_samples"] # Calculate consistency score - weighted average of individual performances # Gives more weight to the generator when hallucinations are detected if hallucination_detected: # When hallucination is detected, judge's role is more critical pair_data["consistency_score"] = (0.4 * pair_data["generator_performance"] + 0.6 * pair_data["judge_performance"]) else: # When no hallucination is detected, both roles are equally important pair_data["consistency_score"] = (0.5 * pair_data["generator_performance"] + 0.5 * pair_data["judge_performance"]) # Update ELO score (simplified version) K_FACTOR = 24 # Slightly lower K-factor for pairs # Expected score based on current rating expected_score = 1 / (1 + 10**((1500 - pair_data["elo_score"]) / 400)) # Actual score - use the average of both model scores (0-1 range) # This represents the pair's overall performance actual_score = (generator_score + judge_score) / 2 # New ELO calculation pair_data["elo_score"] = pair_data["elo_score"] + K_FACTOR * (actual_score - expected_score) # Update or insert the pair data pairs_collection.replace_one( {"generator": generator, "judge": judge}, pair_data, upsert=True ) logger.info("Updated model pair stats for %s (generator) and %s (judge)", generator, judge) except Exception as e: logger.error("Error updating model pair stats: %s", str(e), exc_info=True) return None def get_feedback_stats(self): """Get statistics about collected feedback from MongoDB""" try: if self.feedback_collection is None: logger.error("MongoDB connection not available. Cannot get feedback stats.") return None # Get total feedback count total_count = self.feedback_collection.count_documents({}) # Get accuracy stats based on user feedback correct_predictions = 0 # Fetch all feedback documents feedback_docs = list(self.feedback_collection.find({}, {"user_feedback": 1})) # Count correct predictions based on user feedback for doc in feedback_docs: if "user_feedback" in doc: # If feedback starts with "Yes", it's a correct prediction if doc["user_feedback"].startswith("Yes"): correct_predictions += 1 # Calculate accuracy percentage accuracy = correct_predictions / max(total_count, 1) return { "total_feedback": total_count, "correct_predictions": correct_predictions, "accuracy": accuracy } except Exception as e: logger.error("Error getting feedback stats: %s", str(e), exc_info=True) return None def get_model_leaderboard(self): """Get the current model leaderboard data""" try: if self.db is None: logger.error("MongoDB connection not available. Cannot get model leaderboard.") return None # Access models collection models_collection = self.db.get_collection("model_scores") # Get all models and sort by ELO score models = list(models_collection.find().sort("elo_score", pymongo.DESCENDING)) # Format percentages and convert ObjectId for model in models: model["_id"] = str(model["_id"]) model["accuracy"] = round(model["accuracy"] * 100, 1) if "as_generator" in model and model["as_generator"] > 0: model["generator_accuracy"] = round((model["as_generator_correct"] / model["as_generator"]) * 100, 1) else: model["generator_accuracy"] = 0.0 if "as_judge" in model and model["as_judge"] > 0: model["judge_accuracy"] = round((model["as_judge_correct"] / model["as_judge"]) * 100, 1) else: model["judge_accuracy"] = 0.0 return models except Exception as e: logger.error("Error getting model leaderboard: %s", str(e), exc_info=True) return [] def get_pair_leaderboard(self): """Get the current model pair leaderboard data""" try: if self.db is None: logger.error("MongoDB connection not available. Cannot get pair leaderboard.") return None # Access model pairs collection pairs_collection = self.db.get_collection("model_pairs") # Get all pairs and sort by ELO score pairs = list(pairs_collection.find().sort("elo_score", pymongo.DESCENDING)) # Format percentages and convert ObjectId for pair in pairs: pair["_id"] = str(pair["_id"]) pair["accuracy"] = round(pair["accuracy"] * 100, 1) pair["consistency_score"] = round(pair["consistency_score"] * 100, 1) return pairs except Exception as e: logger.error("Error getting pair leaderboard: %s", str(e), exc_info=True) return [] def export_data_to_csv(self, filepath=None): """Export all feedback data to a CSV file for analysis""" try: if self.feedback_collection is None: logger.error("MongoDB connection not available. Cannot export data.") return "Database connection not available. Cannot export data." # Query all feedback data cursor = self.feedback_collection.find({}) # Convert cursor to list of dictionaries records = list(cursor) # Convert MongoDB documents to pandas DataFrame # Handle nested arrays and complex objects for record in records: # Convert ObjectId to string record['_id'] = str(record['_id']) # Convert datetime objects to string if 'timestamp' in record: record['timestamp'] = record['timestamp'].strftime("%Y-%m-%d %H:%M:%S") # Convert lists to strings for CSV storage if 'paraphrased_queries' in record: record['paraphrased_queries'] = json.dumps(record['paraphrased_queries']) if 'paraphrased_responses' in record: record['paraphrased_responses'] = json.dumps(record['paraphrased_responses']) if 'conflicting_facts' in record: record['conflicting_facts'] = json.dumps(record['conflicting_facts']) # Create DataFrame df = pd.DataFrame(records) # Define default filepath if not provided if not filepath: filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"hallucination_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv") # Export to CSV df.to_csv(filepath, index=False) logger.info(f"Data successfully exported to {filepath}") return filepath except Exception as e: logger.error(f"Error exporting data: {str(e)}", exc_info=True) return f"Error exporting data: {str(e)}" def get_recent_queries(self, limit=10): """Get most recent queries for display in the UI""" try: if self.feedback_collection is None: logger.error("MongoDB connection not available. Cannot get recent queries.") return [] # Get most recent queries cursor = self.feedback_collection.find( {}, {"original_query": 1, "hallucination_detected": 1, "timestamp": 1} ).sort("timestamp", pymongo.DESCENDING).limit(limit) # Convert to list of dictionaries recent_queries = [] for doc in cursor: recent_queries.append({ "id": str(doc["_id"]), "query": doc["original_query"], "hallucination_detected": doc.get("hallucination_detected", False), "timestamp": doc["timestamp"].strftime("%Y-%m-%d %H:%M:%S") if isinstance(doc["timestamp"], datetime) else doc["timestamp"] }) return recent_queries except Exception as e: logger.error(f"Error getting recent queries: {str(e)}", exc_info=True) return [] def get_query_details(self, query_id): """Get full details for a specific query by ID""" try: if self.feedback_collection is None: logger.error("MongoDB connection not available. Cannot get query details.") return None # Convert string ID to ObjectId obj_id = ObjectId(query_id) # Find the query by ID doc = self.feedback_collection.find_one({"_id": obj_id}) if doc is None: logger.warning(f"No query found with ID {query_id}") return None # Convert ObjectId to string for JSON serialization doc["_id"] = str(doc["_id"]) # Convert timestamp to string if "timestamp" in doc and isinstance(doc["timestamp"], datetime): doc["timestamp"] = doc["timestamp"].strftime("%Y-%m-%d %H:%M:%S") return doc except Exception as e: logger.error(f"Error getting query details: {str(e)}", exc_info=True) return None # Progress tracking for UI updates class ProgressTracker: """Tracks progress of hallucination detection for UI updates""" STAGES = { "idle": {"status": "Ready", "progress": 0, "color": "#757575"}, "starting": {"status": "Starting process...", "progress": 5, "color": "#2196F3"}, "generating_paraphrases": {"status": "Generating paraphrases...", "progress": 15, "color": "#2196F3"}, "paraphrases_complete": {"status": "Paraphrases generated", "progress": 30, "color": "#2196F3"}, "getting_responses": {"status": "Getting responses using {model}...", "progress": 35, "color": "#2196F3"}, "responses_progress": {"status": "Getting responses ({completed}/{total})...", "progress": 40, "color": "#2196F3"}, "responses_complete": {"status": "All responses received", "progress": 65, "color": "#2196F3"}, "judging": {"status": "Analyzing responses for hallucinations using {model}...", "progress": 70, "color": "#2196F3"}, "complete": {"status": "Analysis complete! Using {generator} (generator) and {judge} (judge)", "progress": 100, "color": "#4CAF50"}, "error": {"status": "Error: {error_message}", "progress": 100, "color": "#F44336"} } def __init__(self): self.stage = "idle" self.stage_data = self.STAGES[self.stage].copy() self.query = "" self.completed_responses = 0 self.total_responses = 0 self.error_message = "" self.generator_model = "" self.judge_model = "" self.model = "" # For general model reference in status messages self._lock = threading.Lock() self._status_callback = None self._stop_event = threading.Event() self._update_thread = None def register_callback(self, callback_fn): """Register callback function to update UI""" self._status_callback = callback_fn def update_stage(self, stage, **kwargs): """Update the current stage and trigger callback""" with self._lock: if stage in self.STAGES: self.stage = stage self.stage_data = self.STAGES[stage].copy() # Update with any additional parameters for key, value in kwargs.items(): if key == 'query': self.query = value elif key == 'completed_responses': self.completed_responses = value elif key == 'total_responses': self.total_responses = value elif key == 'error_message': self.error_message = value elif key == 'model': self.model = value elif key == 'generator': self.generator_model = value elif key == 'judge': self.judge_model = value # Format status message if stage == 'responses_progress': self.stage_data['status'] = self.stage_data['status'].format( completed=self.completed_responses, total=self.total_responses ) elif stage == 'getting_responses' and 'model' in kwargs: self.stage_data['status'] = self.stage_data['status'].format( model=kwargs.get('model', 'selected model') ) elif stage == 'judging' and 'model' in kwargs: self.stage_data['status'] = self.stage_data['status'].format( model=kwargs.get('model', 'selected model') ) elif stage == 'complete' and 'generator' in kwargs and 'judge' in kwargs: self.stage_data['status'] = self.stage_data['status'].format( generator=self.generator_model, judge=self.judge_model ) elif stage == 'error': self.stage_data['status'] = self.stage_data['status'].format( error_message=self.error_message ) if self._status_callback: self._status_callback(self.get_html_status()) def get_html_status(self): """Get HTML representation of current status""" progress_width = f"{self.stage_data['progress']}%" status_text = self.stage_data['status'] color = self.stage_data['color'] query_info = f'
{summary}
Reasoning:
{reasoning_safe}
Conflicting Facts:
{conflicting_facts_text_safe}
This tool detects hallucinations in AI responses by comparing answers to semantically equivalent questions and using a specialized judge model.
Rank | " f"Generator Model | " f"Judge Model | " f"ELO Score | " f"Accuracy | " f"Generator Perf. | " f"Judge Perf. | " f"Consistency | " f"Sample Size | " f"
---|
Model Pair Performance Metrics:
" f"Our ELO rating system assigns scores to model pairs based on user feedback, using the following formula:
" + "ELO_new = ELO_old + K * (S - E)
The system randomly selects from these models for each hallucination detection:
" + "Rank | " f"Model | " f"ELO Score | " f"Overall Accuracy | " f"Generator Accuracy | " f"Judge Accuracy | " f"Sample Size | " f"Generator/Judge Ratio | " f"
---|
Our ELO rating system assigns scores to individual models based on user feedback, using the following formula:
" + "ELO_new = ELO_old + K * (S - E)
All models start with a base ELO of 1500. Scores are updated after each user evaluation.
" + "Note: ELO scores are comparative and reflect relative performance between models in our specific hallucination detection tasks.
" + "