Spaces:
Sleeping
Sleeping
# utils/cache_manager.py | |
import hashlib | |
import logging | |
import time | |
from typing import Dict, Any, Optional, Tuple, List, Union | |
from datetime import datetime, timedelta | |
import numpy as np | |
class CacheManager: | |
def __init__(self, config: Optional[Dict] = None): | |
"""Initialize the CacheManager with optional configuration.""" | |
self.config = config or {} | |
self.logger = logging.getLogger(__name__) | |
# Main cache storage | |
self.cache = {} | |
# Cache statistics | |
self.stats = { | |
'hits': 0, | |
'misses': 0, | |
'entries': 0, | |
'evictions': 0 | |
} | |
# Cache configuration | |
self.max_entries = self.config.get('max_entries', 1000) | |
self.ttl = self.config.get('ttl', 3600) # Time to live in seconds | |
self.semantic_threshold = self.config.get('semantic_threshold', 0.85) | |
# For semantic caching | |
self.embedding_cache = {} | |
def _generate_key(self, data: Union[str, bytes], namespace: str = '') -> str: | |
"""Generate a cache key for the given data.""" | |
if isinstance(data, str): | |
data = data.encode('utf-8') | |
key = hashlib.md5(data).hexdigest() | |
if namespace: | |
key = f"{namespace}:{key}" | |
return key | |
def get(self, data: str, namespace: str = '') -> Tuple[bool, Any]: | |
""" | |
Try to retrieve data from cache. | |
Returns (hit, value) where hit is a boolean indicating cache hit/miss. | |
""" | |
key = self._generate_key(data, namespace) | |
# Check for exact match | |
if key in self.cache: | |
entry = self.cache[key] | |
# Check if entry has expired | |
if datetime.now() > entry['expiry']: | |
# Entry expired | |
del self.cache[key] | |
self.stats['evictions'] += 1 | |
self.stats['entries'] -= 1 | |
self.stats['misses'] += 1 | |
return False, None | |
# Update last accessed time | |
entry['last_accessed'] = datetime.now() | |
self.stats['hits'] += 1 | |
return True, entry['value'] | |
# No exact match found | |
self.stats['misses'] += 1 | |
return False, None | |
def get_semantic(self, data: str, embedding: np.ndarray, | |
namespace: str = '') -> Tuple[bool, Any]: | |
""" | |
Try to retrieve data from cache using semantic similarity. | |
Requires pre-computed embedding for the query. | |
""" | |
# First try exact match | |
hit, value = self.get(data, namespace) | |
if hit: | |
return hit, value | |
# No exact match, try semantic matching if we have embeddings | |
if namespace not in self.embedding_cache: | |
return False, None | |
# Find closest match | |
best_similarity = 0 | |
best_key = None | |
for key, stored_embedding in self.embedding_cache[namespace].items(): | |
similarity = np.dot(embedding, stored_embedding) / ( | |
np.linalg.norm(embedding) * np.linalg.norm(stored_embedding)) | |
if similarity > best_similarity: | |
best_similarity = similarity | |
best_key = key | |
# Check if best match exceeds threshold | |
if best_similarity >= self.semantic_threshold and best_key in self.cache: | |
entry = self.cache[best_key] | |
# Check expiry | |
if datetime.now() > entry['expiry']: | |
return False, None | |
# Update stats and return | |
self.stats['hits'] += 1 | |
return True, entry['value'] | |
return False, None | |
def put(self, data: str, value: Any, namespace: str = '', | |
ttl: Optional[int] = None, embedding: Optional[np.ndarray] = None) -> None: | |
""" | |
Store data in cache with optional embedding for semantic search. | |
""" | |
# Generate key | |
key = self._generate_key(data, namespace) | |
# Check if cache is full | |
if len(self.cache) >= self.max_entries and key not in self.cache: | |
self._evict_oldest() | |
# Set expiry time | |
expiry = datetime.now() + timedelta(seconds=ttl if ttl is not None else self.ttl) | |
# Store in cache | |
self.cache[key] = { | |
'value': value, | |
'expiry': expiry, | |
'last_accessed': datetime.now(), | |
'access_count': 1 | |
} | |
# Store embedding if provided | |
if embedding is not None: | |
if namespace not in self.embedding_cache: | |
self.embedding_cache[namespace] = {} | |
self.embedding_cache[namespace][key] = embedding | |
# Update stats | |
if key not in self.cache: | |
self.stats['entries'] += 1 | |
def _evict_oldest(self) -> None: | |
"""Evict the least recently used cache entry.""" | |
if not self.cache: | |
return | |
oldest_time = datetime.now() | |
oldest_key = None | |
for key, entry in self.cache.items(): | |
if entry['last_accessed'] < oldest_time: | |
oldest_time = entry['last_accessed'] | |
oldest_key = key | |
if oldest_key: | |
# Remove from main cache | |
del self.cache[oldest_key] | |
# Remove from embedding cache if present | |
for namespace in self.embedding_cache: | |
if oldest_key in self.embedding_cache[namespace]: | |
del self.embedding_cache[namespace][oldest_key] | |
self.stats['evictions'] += 1 | |
self.stats['entries'] -= 1 | |
def clear(self, namespace: Optional[str] = None) -> None: | |
""" | |
Clear the cache, optionally only for a specific namespace. | |
""" | |
if namespace: | |
# Clear only specific namespace | |
keys_to_remove = [] | |
for key in self.cache: | |
if key.startswith(f"{namespace}:"): | |
keys_to_remove.append(key) | |
for key in keys_to_remove: | |
del self.cache[key] | |
self.stats['entries'] -= 1 | |
# Clear embeddings for namespace | |
if namespace in self.embedding_cache: | |
del self.embedding_cache[namespace] | |
else: | |
# Clear entire cache | |
self.cache = {} | |
self.embedding_cache = {} | |
self.stats['entries'] = 0 | |
self.logger.info(f"Cleared cache{' for namespace: ' + namespace if namespace else ''}") | |
def get_stats(self) -> Dict[str, Any]: | |
"""Get cache statistics.""" | |
if self.stats['hits'] + self.stats['misses'] > 0: | |
hit_rate = self.stats['hits'] / (self.stats['hits'] + self.stats['misses']) | |
else: | |
hit_rate = 0 | |
return { | |
**self.stats, | |
'hit_rate': hit_rate, | |
'current_size': len(self.cache), | |
'max_size': self.max_entries | |
} | |