Spaces:
Sleeping
Sleeping
File size: 7,558 Bytes
7de43ca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# utils/cache_manager.py
import hashlib
import logging
import time
from typing import Dict, Any, Optional, Tuple, List, Union
from datetime import datetime, timedelta
import numpy as np
class CacheManager:
def __init__(self, config: Optional[Dict] = None):
"""Initialize the CacheManager with optional configuration."""
self.config = config or {}
self.logger = logging.getLogger(__name__)
# Main cache storage
self.cache = {}
# Cache statistics
self.stats = {
'hits': 0,
'misses': 0,
'entries': 0,
'evictions': 0
}
# Cache configuration
self.max_entries = self.config.get('max_entries', 1000)
self.ttl = self.config.get('ttl', 3600) # Time to live in seconds
self.semantic_threshold = self.config.get('semantic_threshold', 0.85)
# For semantic caching
self.embedding_cache = {}
def _generate_key(self, data: Union[str, bytes], namespace: str = '') -> str:
"""Generate a cache key for the given data."""
if isinstance(data, str):
data = data.encode('utf-8')
key = hashlib.md5(data).hexdigest()
if namespace:
key = f"{namespace}:{key}"
return key
def get(self, data: str, namespace: str = '') -> Tuple[bool, Any]:
"""
Try to retrieve data from cache.
Returns (hit, value) where hit is a boolean indicating cache hit/miss.
"""
key = self._generate_key(data, namespace)
# Check for exact match
if key in self.cache:
entry = self.cache[key]
# Check if entry has expired
if datetime.now() > entry['expiry']:
# Entry expired
del self.cache[key]
self.stats['evictions'] += 1
self.stats['entries'] -= 1
self.stats['misses'] += 1
return False, None
# Update last accessed time
entry['last_accessed'] = datetime.now()
self.stats['hits'] += 1
return True, entry['value']
# No exact match found
self.stats['misses'] += 1
return False, None
def get_semantic(self, data: str, embedding: np.ndarray,
namespace: str = '') -> Tuple[bool, Any]:
"""
Try to retrieve data from cache using semantic similarity.
Requires pre-computed embedding for the query.
"""
# First try exact match
hit, value = self.get(data, namespace)
if hit:
return hit, value
# No exact match, try semantic matching if we have embeddings
if namespace not in self.embedding_cache:
return False, None
# Find closest match
best_similarity = 0
best_key = None
for key, stored_embedding in self.embedding_cache[namespace].items():
similarity = np.dot(embedding, stored_embedding) / (
np.linalg.norm(embedding) * np.linalg.norm(stored_embedding))
if similarity > best_similarity:
best_similarity = similarity
best_key = key
# Check if best match exceeds threshold
if best_similarity >= self.semantic_threshold and best_key in self.cache:
entry = self.cache[best_key]
# Check expiry
if datetime.now() > entry['expiry']:
return False, None
# Update stats and return
self.stats['hits'] += 1
return True, entry['value']
return False, None
def put(self, data: str, value: Any, namespace: str = '',
ttl: Optional[int] = None, embedding: Optional[np.ndarray] = None) -> None:
"""
Store data in cache with optional embedding for semantic search.
"""
# Generate key
key = self._generate_key(data, namespace)
# Check if cache is full
if len(self.cache) >= self.max_entries and key not in self.cache:
self._evict_oldest()
# Set expiry time
expiry = datetime.now() + timedelta(seconds=ttl if ttl is not None else self.ttl)
# Store in cache
self.cache[key] = {
'value': value,
'expiry': expiry,
'last_accessed': datetime.now(),
'access_count': 1
}
# Store embedding if provided
if embedding is not None:
if namespace not in self.embedding_cache:
self.embedding_cache[namespace] = {}
self.embedding_cache[namespace][key] = embedding
# Update stats
if key not in self.cache:
self.stats['entries'] += 1
def _evict_oldest(self) -> None:
"""Evict the least recently used cache entry."""
if not self.cache:
return
oldest_time = datetime.now()
oldest_key = None
for key, entry in self.cache.items():
if entry['last_accessed'] < oldest_time:
oldest_time = entry['last_accessed']
oldest_key = key
if oldest_key:
# Remove from main cache
del self.cache[oldest_key]
# Remove from embedding cache if present
for namespace in self.embedding_cache:
if oldest_key in self.embedding_cache[namespace]:
del self.embedding_cache[namespace][oldest_key]
self.stats['evictions'] += 1
self.stats['entries'] -= 1
def clear(self, namespace: Optional[str] = None) -> None:
"""
Clear the cache, optionally only for a specific namespace.
"""
if namespace:
# Clear only specific namespace
keys_to_remove = []
for key in self.cache:
if key.startswith(f"{namespace}:"):
keys_to_remove.append(key)
for key in keys_to_remove:
del self.cache[key]
self.stats['entries'] -= 1
# Clear embeddings for namespace
if namespace in self.embedding_cache:
del self.embedding_cache[namespace]
else:
# Clear entire cache
self.cache = {}
self.embedding_cache = {}
self.stats['entries'] = 0
self.logger.info(f"Cleared cache{' for namespace: ' + namespace if namespace else ''}")
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
if self.stats['hits'] + self.stats['misses'] > 0:
hit_rate = self.stats['hits'] / (self.stats['hits'] + self.stats['misses'])
else:
hit_rate = 0
return {
**self.stats,
'hit_rate': hit_rate,
'current_size': len(self.cache),
'max_size': self.max_entries
}
|