Spaces:
Sleeping
Sleeping
File size: 7,784 Bytes
7de43ca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# utils/token_manager.py
import logging
from typing import Dict, Optional, Tuple, Any
from transformers import AutoTokenizer
class TokenManager:
def __init__(self, config: Optional[Dict] = None):
"""Initialize the TokenManager with optional configuration."""
self.config = config or {}
self.token_counters = {} # Track usage by agent/model
self.token_budgets = self.config.get('budgets', {})
self.tokenizer_cache = {} # Cache tokenizers for efficiency
self.logger = logging.getLogger(__name__)
# Default budgets if not specified in config
if not self.token_budgets:
self.token_budgets = {
'text_analysis': 5000,
'image_captioning': 3000,
'report_generation': 4000,
'default': 2000
}
def register_model(self, model_name: str, model_type: str) -> None:
"""Register a model and load its tokenizer for accurate token counting."""
if model_name not in self.tokenizer_cache:
try:
self.tokenizer_cache[model_name] = AutoTokenizer.from_pretrained(model_name)
self.logger.info(f"Registered tokenizer for model: {model_name}")
except Exception as e:
self.logger.error(f"Failed to load tokenizer for {model_name}: {e}")
# Fallback to approximate counting
self.tokenizer_cache[model_name] = None
def estimate_tokens(self, text: str, model_name: str) -> int:
"""Estimate token count for given text and model."""
if not text:
return 0
tokenizer = self.tokenizer_cache.get(model_name)
if tokenizer:
# Use model-specific tokenizer for accurate counting
tokens = tokenizer(text, return_tensors="pt")
return tokens.input_ids.shape[1]
else:
# Fallback: approximate token count (4 chars ≈ 1 token)
return len(text) // 4 + 1
def request_tokens(self, agent_name: str, operation_type: str,
text: str, model_name: str) -> Tuple[bool, str]:
"""Request token budget for an operation. Returns (approved, reason)."""
# Get budget for this operation type
budget = self.token_budgets.get(operation_type,
self.token_budgets.get('default', 1000))
# Estimate token usage
estimated_tokens = self.estimate_tokens(text, model_name)
# Check if within budget
if estimated_tokens > budget:
reason = f"Token budget exceeded: {estimated_tokens} > {budget}"
return False, reason
# Initialize counter if needed
if agent_name not in self.token_counters:
self.token_counters[agent_name] = {}
# Approve request
return True, "Token budget approved"
def log_usage(self, agent_name: str, operation_type: str,
token_count: int, model_name: str) -> None:
"""Log actual token usage after operation."""
if agent_name not in self.token_counters:
self.token_counters[agent_name] = {}
if operation_type not in self.token_counters[agent_name]:
self.token_counters[agent_name][operation_type] = 0
self.token_counters[agent_name][operation_type] += token_count
self.logger.info(f"Logged {token_count} tokens for {agent_name}.{operation_type}")
def get_usage_stats(self) -> Dict[str, Any]:
"""Return current token usage statistics."""
total_usage = 0
for agent, operations in self.token_counters.items():
agent_total = sum(operations.values())
total_usage += agent_total
return {
'by_agent': self.token_counters,
'total_usage': total_usage,
'budgets': self.token_budgets
}
def optimize_prompt(self, prompt: str, model_name: str,
max_tokens: Optional[int] = None) -> str:
"""Apply token optimization techniques to prompt."""
if not max_tokens:
return prompt
tokenizer = self.tokenizer_cache.get(model_name)
if not tokenizer:
# Can't optimize without tokenizer
return prompt
# Check current token count
current_tokens = self.estimate_tokens(prompt, model_name)
if current_tokens <= max_tokens:
return prompt
# Simple truncation strategy (as a basic implementation)
# In a real system, we'd use more sophisticated techniques
tokens = tokenizer(prompt, return_tensors="pt").input_ids[0]
truncated_tokens = tokens[:max_tokens]
# Decode back to text
optimized_prompt = tokenizer.decode(truncated_tokens)
self.logger.info(f"Optimized prompt from {current_tokens} to {max_tokens} tokens")
return optimized_prompt
def calculate_energy_usage(self, token_count: int, model_name: str) -> float:
"""
Calculate approximate energy usage based on token count and model.
Returns energy usage in watt-hours.
"""
# Model energy coefficients (approximate watt-hours per 1K tokens)
# Based on research estimates for different model sizes
energy_coefficients = {
# Small models
'sentence-transformers/all-MiniLM-L6-v2': 0.0001,
'microsoft/deberta-v3-small': 0.0005,
'google/flan-t5-small': 0.0007,
# Medium models
'Salesforce/blip-image-captioning-base': 0.003,
't5-small': 0.001,
# Large models
'Salesforce/BLIP-2': 0.015,
# Default for unknown models (conservative estimate)
'default': 0.005
}
# Get coefficient for this model or use default
coefficient = energy_coefficients.get(model_name, energy_coefficients['default'])
# Calculate energy (convert tokens to thousands)
energy_usage = (token_count / 1000) * coefficient
self.logger.info(f"Estimated energy usage for {token_count} tokens with {model_name}: {energy_usage:.6f} watt-hours")
return energy_usage
def adjust_budget(self, operation_type: str, new_budget: int) -> None:
"""
Dynamically adjust token budget for an operation type.
This allows for runtime optimization based on task priority or resource constraints.
"""
if new_budget <= 0:
self.logger.warning(f"Invalid budget value: {new_budget}. Budget must be positive.")
return
old_budget = self.token_budgets.get(operation_type, self.token_budgets.get('default', 0))
self.token_budgets[operation_type] = new_budget
# Log the change
change_percent = ((new_budget - old_budget) / old_budget * 100) if old_budget else 100
self.logger.info(f"Adjusted budget for {operation_type}: {old_budget} → {new_budget} tokens ({change_percent:.1f}% change)")
# If this is a significant reduction, we might want to notify dependent systems
if new_budget < old_budget * 0.8: # More than 20% reduction
self.logger.warning(f"Significant budget reduction for {operation_type}. Dependent operations may be affected.")
|