askveracity / utils /performance.py
ankanghosh's picture
Upload 21 files
6d11371 verified
"""
Performance tracking utility for the Fake News Detector application.
This module provides functionality to track and analyze the
performance of the application, including processing times,
success rates, and resource utilization.
"""
import time
import logging
logger = logging.getLogger("misinformation_detector")
class PerformanceTracker:
"""
Tracks and logs performance metrics for the fact-checking system.
This class maintains counters and statistics for various performance
metrics, such as processing times, evidence retrieval success rates,
and confidence scores.
"""
def __init__(self):
"""Initialize the performance tracker with empty metrics."""
self.metrics = {
"claims_processed": 0,
"evidence_retrieval_success_rate": [],
"processing_times": [],
"confidence_scores": [],
"source_types_used": {},
"temporal_relevance": []
}
def log_claim_processed(self):
"""
Increment the counter for processed claims.
This should be called whenever a claim is processed successfully.
"""
self.metrics["claims_processed"] += 1
def log_evidence_retrieval(self, success, sources_count):
"""
Log the success or failure of evidence retrieval.
Args:
success (bool): Whether evidence retrieval was successful
sources_count (dict): Count of evidence items by source type
"""
# Ensure success is a boolean
success_value = 1 if success else 0
self.metrics["evidence_retrieval_success_rate"].append(success_value)
# Safely process source types
if isinstance(sources_count, dict):
for source_type, count in sources_count.items():
# Ensure source_type is a string and count is an integer
source_type = str(source_type)
try:
count = int(count)
except (ValueError, TypeError):
count = 1
# Update source types used
self.metrics["source_types_used"][source_type] = \
self.metrics["source_types_used"].get(source_type, 0) + count
def log_processing_time(self, start_time):
"""
Log the processing time for an operation.
Args:
start_time (float): Start time obtained from time.time()
"""
end_time = time.time()
processing_time = end_time - start_time
self.metrics["processing_times"].append(processing_time)
def log_confidence_score(self, score):
"""
Log a confidence score.
Args:
score (float): Confidence score between 0 and 1
"""
# Ensure score is a float between 0 and 1
try:
score = float(score)
if 0 <= score <= 1:
self.metrics["confidence_scores"].append(score)
except (ValueError, TypeError):
logger.warning(f"Invalid confidence score: {score}")
def log_temporal_relevance(self, relevance_score):
"""
Log a temporal relevance score.
Args:
relevance_score (float): Temporal relevance score between 0 and 1
"""
# Ensure relevance score is a float between 0 and 1
try:
relevance_score = float(relevance_score)
if 0 <= relevance_score <= 1:
self.metrics["temporal_relevance"].append(relevance_score)
except (ValueError, TypeError):
logger.warning(f"Invalid temporal relevance score: {relevance_score}")
def get_summary(self):
"""
Get a summary of all performance metrics.
Returns:
dict: Summary of performance metrics
"""
# Safely calculate averages with error handling
def safe_avg(metric_list):
try:
return sum(metric_list) / max(len(metric_list), 1)
except (TypeError, ValueError):
return 0.0
return {
"claims_processed": self.metrics["claims_processed"],
"avg_evidence_retrieval_success_rate": safe_avg(self.metrics["evidence_retrieval_success_rate"]),
"avg_processing_time": safe_avg(self.metrics["processing_times"]),
"avg_confidence_score": safe_avg(self.metrics["confidence_scores"]),
"source_types_used": dict(self.metrics["source_types_used"]),
"avg_temporal_relevance": safe_avg(self.metrics["temporal_relevance"])
}
def reset(self):
"""Reset all performance metrics."""
self.__init__()
logger.info("Performance metrics have been reset")
return "Performance metrics reset successfully"