File size: 7,558 Bytes
7de43ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# utils/cache_manager.py
import hashlib
import logging
import time
from typing import Dict, Any, Optional, Tuple, List, Union
from datetime import datetime, timedelta
import numpy as np

class CacheManager:
    def __init__(self, config: Optional[Dict] = None):
        """Initialize the CacheManager with optional configuration."""
        self.config = config or {}
        self.logger = logging.getLogger(__name__)
        
        # Main cache storage
        self.cache = {}
        
        # Cache statistics
        self.stats = {
            'hits': 0,
            'misses': 0,
            'entries': 0,
            'evictions': 0
        }
        
        # Cache configuration
        self.max_entries = self.config.get('max_entries', 1000)
        self.ttl = self.config.get('ttl', 3600)  # Time to live in seconds
        self.semantic_threshold = self.config.get('semantic_threshold', 0.85)
        
        # For semantic caching
        self.embedding_cache = {}
        
    def _generate_key(self, data: Union[str, bytes], namespace: str = '') -> str:
        """Generate a cache key for the given data."""
        if isinstance(data, str):
            data = data.encode('utf-8')
            
        key = hashlib.md5(data).hexdigest()
        if namespace:
            key = f"{namespace}:{key}"
            
        return key
    
    def get(self, data: str, namespace: str = '') -> Tuple[bool, Any]:
        """

        Try to retrieve data from cache.

        Returns (hit, value) where hit is a boolean indicating cache hit/miss.

        """
        key = self._generate_key(data, namespace)
        
        # Check for exact match
        if key in self.cache:
            entry = self.cache[key]
            
            # Check if entry has expired
            if datetime.now() > entry['expiry']:
                # Entry expired
                del self.cache[key]
                self.stats['evictions'] += 1
                self.stats['entries'] -= 1
                self.stats['misses'] += 1
                return False, None
                
            # Update last accessed time
            entry['last_accessed'] = datetime.now()
            self.stats['hits'] += 1
            return True, entry['value']
            
        # No exact match found
        self.stats['misses'] += 1
        return False, None
        
    def get_semantic(self, data: str, embedding: np.ndarray, 

                     namespace: str = '') -> Tuple[bool, Any]:
        """

        Try to retrieve data from cache using semantic similarity.

        Requires pre-computed embedding for the query.

        """
        # First try exact match
        hit, value = self.get(data, namespace)
        if hit:
            return hit, value
            
        # No exact match, try semantic matching if we have embeddings
        if namespace not in self.embedding_cache:
            return False, None
            
        # Find closest match
        best_similarity = 0
        best_key = None
        
        for key, stored_embedding in self.embedding_cache[namespace].items():
            similarity = np.dot(embedding, stored_embedding) / (
                np.linalg.norm(embedding) * np.linalg.norm(stored_embedding))
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_key = key
                
        # Check if best match exceeds threshold
        if best_similarity >= self.semantic_threshold and best_key in self.cache:
            entry = self.cache[best_key]
            
            # Check expiry
            if datetime.now() > entry['expiry']:
                return False, None
                
            # Update stats and return
            self.stats['hits'] += 1
            return True, entry['value']
            
        return False, None
    
    def put(self, data: str, value: Any, namespace: str = '', 

            ttl: Optional[int] = None, embedding: Optional[np.ndarray] = None) -> None:
        """

        Store data in cache with optional embedding for semantic search.

        """
        # Generate key
        key = self._generate_key(data, namespace)
        
        # Check if cache is full
        if len(self.cache) >= self.max_entries and key not in self.cache:
            self._evict_oldest()
            
        # Set expiry time
        expiry = datetime.now() + timedelta(seconds=ttl if ttl is not None else self.ttl)
        
        # Store in cache
        self.cache[key] = {
            'value': value,
            'expiry': expiry,
            'last_accessed': datetime.now(),
            'access_count': 1
        }
        
        # Store embedding if provided
        if embedding is not None:
            if namespace not in self.embedding_cache:
                self.embedding_cache[namespace] = {}
                
            self.embedding_cache[namespace][key] = embedding
            
        # Update stats
        if key not in self.cache:
            self.stats['entries'] += 1
    
    def _evict_oldest(self) -> None:
        """Evict the least recently used cache entry."""
        if not self.cache:
            return
            
        oldest_time = datetime.now()
        oldest_key = None
        
        for key, entry in self.cache.items():
            if entry['last_accessed'] < oldest_time:
                oldest_time = entry['last_accessed']
                oldest_key = key
                
        if oldest_key:
            # Remove from main cache
            del self.cache[oldest_key]
            
            # Remove from embedding cache if present
            for namespace in self.embedding_cache:
                if oldest_key in self.embedding_cache[namespace]:
                    del self.embedding_cache[namespace][oldest_key]
                    
            self.stats['evictions'] += 1
            self.stats['entries'] -= 1
            
    def clear(self, namespace: Optional[str] = None) -> None:
        """

        Clear the cache, optionally only for a specific namespace.

        """
        if namespace:
            # Clear only specific namespace
            keys_to_remove = []
            for key in self.cache:
                if key.startswith(f"{namespace}:"):
                    keys_to_remove.append(key)
                    
            for key in keys_to_remove:
                del self.cache[key]
                self.stats['entries'] -= 1
                
            # Clear embeddings for namespace
            if namespace in self.embedding_cache:
                del self.embedding_cache[namespace]
        else:
            # Clear entire cache
            self.cache = {}
            self.embedding_cache = {}
            self.stats['entries'] = 0
            
        self.logger.info(f"Cleared cache{' for namespace: ' + namespace if namespace else ''}")
    
    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics."""
        if self.stats['hits'] + self.stats['misses'] > 0:
            hit_rate = self.stats['hits'] / (self.stats['hits'] + self.stats['misses'])
        else:
            hit_rate = 0
            
        return {
            **self.stats,
            'hit_rate': hit_rate,
            'current_size': len(self.cache),
            'max_size': self.max_entries
        }