File size: 4,851 Bytes
6d11371
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Performance tracking utility for the Fake News Detector application.

This module provides functionality to track and analyze the
performance of the application, including processing times,
success rates, and resource utilization.
"""

import time
import logging

logger = logging.getLogger("misinformation_detector")

class PerformanceTracker:
    """
    Tracks and logs performance metrics for the fact-checking system.
    
    This class maintains counters and statistics for various performance
    metrics, such as processing times, evidence retrieval success rates,
    and confidence scores.
    """
    
    def __init__(self):
        """Initialize the performance tracker with empty metrics."""
        self.metrics = {
            "claims_processed": 0,
            "evidence_retrieval_success_rate": [],
            "processing_times": [],
            "confidence_scores": [],
            "source_types_used": {},
            "temporal_relevance": []
        }

    def log_claim_processed(self):
        """
        Increment the counter for processed claims.
        This should be called whenever a claim is processed successfully.
        """
        self.metrics["claims_processed"] += 1

    def log_evidence_retrieval(self, success, sources_count):
        """
        Log the success or failure of evidence retrieval.
        
        Args:
            success (bool): Whether evidence retrieval was successful
            sources_count (dict): Count of evidence items by source type
        """
        # Ensure success is a boolean
        success_value = 1 if success else 0
        self.metrics["evidence_retrieval_success_rate"].append(success_value)

        # Safely process source types
        if isinstance(sources_count, dict):
            for source_type, count in sources_count.items():
                # Ensure source_type is a string and count is an integer
                source_type = str(source_type)
                try:
                    count = int(count)
                except (ValueError, TypeError):
                    count = 1

                # Update source types used
                self.metrics["source_types_used"][source_type] = \
                    self.metrics["source_types_used"].get(source_type, 0) + count

    def log_processing_time(self, start_time):
        """
        Log the processing time for an operation.
        
        Args:
            start_time (float): Start time obtained from time.time()
        """
        end_time = time.time()
        processing_time = end_time - start_time
        self.metrics["processing_times"].append(processing_time)

    def log_confidence_score(self, score):
        """
        Log a confidence score.
        
        Args:
            score (float): Confidence score between 0 and 1
        """
        # Ensure score is a float between 0 and 1
        try:
            score = float(score)
            if 0 <= score <= 1:
                self.metrics["confidence_scores"].append(score)
        except (ValueError, TypeError):
            logger.warning(f"Invalid confidence score: {score}")

    def log_temporal_relevance(self, relevance_score):
        """
        Log a temporal relevance score.
        
        Args:
            relevance_score (float): Temporal relevance score between 0 and 1
        """
        # Ensure relevance score is a float between 0 and 1
        try:
            relevance_score = float(relevance_score)
            if 0 <= relevance_score <= 1:
                self.metrics["temporal_relevance"].append(relevance_score)
        except (ValueError, TypeError):
            logger.warning(f"Invalid temporal relevance score: {relevance_score}")

    def get_summary(self):
        """
        Get a summary of all performance metrics.
        
        Returns:
            dict: Summary of performance metrics
        """
        # Safely calculate averages with error handling
        def safe_avg(metric_list):
            try:
                return sum(metric_list) / max(len(metric_list), 1)
            except (TypeError, ValueError):
                return 0.0

        return {
            "claims_processed": self.metrics["claims_processed"],
            "avg_evidence_retrieval_success_rate": safe_avg(self.metrics["evidence_retrieval_success_rate"]),
            "avg_processing_time": safe_avg(self.metrics["processing_times"]),
            "avg_confidence_score": safe_avg(self.metrics["confidence_scores"]),
            "source_types_used": dict(self.metrics["source_types_used"]),
            "avg_temporal_relevance": safe_avg(self.metrics["temporal_relevance"])
        }

    def reset(self):
        """Reset all performance metrics."""
        self.__init__()
        logger.info("Performance metrics have been reset")
        return "Performance metrics reset successfully"