File size: 7,784 Bytes
7de43ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# utils/token_manager.py
import logging
from typing import Dict, Optional, Tuple, Any
from transformers import AutoTokenizer

class TokenManager:
    def __init__(self, config: Optional[Dict] = None):
        """Initialize the TokenManager with optional configuration."""
        self.config = config or {}
        self.token_counters = {}  # Track usage by agent/model
        self.token_budgets = self.config.get('budgets', {})
        self.tokenizer_cache = {}  # Cache tokenizers for efficiency
        self.logger = logging.getLogger(__name__)
        
        # Default budgets if not specified in config
        if not self.token_budgets:
            self.token_budgets = {
                'text_analysis': 5000,
                'image_captioning': 3000,
                'report_generation': 4000,
                'default': 2000
            }
    
    def register_model(self, model_name: str, model_type: str) -> None:
        """Register a model and load its tokenizer for accurate token counting."""
        if model_name not in self.tokenizer_cache:
            try:
                self.tokenizer_cache[model_name] = AutoTokenizer.from_pretrained(model_name)
                self.logger.info(f"Registered tokenizer for model: {model_name}")
            except Exception as e:
                self.logger.error(f"Failed to load tokenizer for {model_name}: {e}")
                # Fallback to approximate counting
                self.tokenizer_cache[model_name] = None
    
    def estimate_tokens(self, text: str, model_name: str) -> int:
        """Estimate token count for given text and model."""
        if not text:
            return 0
            
        tokenizer = self.tokenizer_cache.get(model_name)
        
        if tokenizer:
            # Use model-specific tokenizer for accurate counting
            tokens = tokenizer(text, return_tensors="pt")
            return tokens.input_ids.shape[1]
        else:
            # Fallback: approximate token count (4 chars ≈ 1 token)
            return len(text) // 4 + 1
    
    def request_tokens(self, agent_name: str, operation_type: str, 

                       text: str, model_name: str) -> Tuple[bool, str]:
        """Request token budget for an operation. Returns (approved, reason)."""
        # Get budget for this operation type
        budget = self.token_budgets.get(operation_type, 
                                        self.token_budgets.get('default', 1000))
        
        # Estimate token usage
        estimated_tokens = self.estimate_tokens(text, model_name)
        
        # Check if within budget
        if estimated_tokens > budget:
            reason = f"Token budget exceeded: {estimated_tokens} > {budget}"
            return False, reason
        
        # Initialize counter if needed
        if agent_name not in self.token_counters:
            self.token_counters[agent_name] = {}
        
        # Approve request
        return True, "Token budget approved"
    
    def log_usage(self, agent_name: str, operation_type: str, 

                  token_count: int, model_name: str) -> None:
        """Log actual token usage after operation."""
        if agent_name not in self.token_counters:
            self.token_counters[agent_name] = {}
            
        if operation_type not in self.token_counters[agent_name]:
            self.token_counters[agent_name][operation_type] = 0
            
        self.token_counters[agent_name][operation_type] += token_count
        self.logger.info(f"Logged {token_count} tokens for {agent_name}.{operation_type}")
    
    def get_usage_stats(self) -> Dict[str, Any]:
        """Return current token usage statistics."""
        total_usage = 0
        for agent, operations in self.token_counters.items():
            agent_total = sum(operations.values())
            total_usage += agent_total
            
        return {
            'by_agent': self.token_counters,
            'total_usage': total_usage,
            'budgets': self.token_budgets
        }
    
    def optimize_prompt(self, prompt: str, model_name: str, 

                        max_tokens: Optional[int] = None) -> str:
        """Apply token optimization techniques to prompt."""
        if not max_tokens:
            return prompt
            
        tokenizer = self.tokenizer_cache.get(model_name)
        if not tokenizer:
            # Can't optimize without tokenizer
            return prompt
            
        # Check current token count
        current_tokens = self.estimate_tokens(prompt, model_name)
        
        if current_tokens <= max_tokens:
            return prompt
            
        # Simple truncation strategy (as a basic implementation)
        # In a real system, we'd use more sophisticated techniques
        tokens = tokenizer(prompt, return_tensors="pt").input_ids[0]
        truncated_tokens = tokens[:max_tokens]
        
        # Decode back to text
        optimized_prompt = tokenizer.decode(truncated_tokens)
        
        self.logger.info(f"Optimized prompt from {current_tokens} to {max_tokens} tokens")
        return optimized_prompt
    
    def calculate_energy_usage(self, token_count: int, model_name: str) -> float:
        """

        Calculate approximate energy usage based on token count and model.

        Returns energy usage in watt-hours.

        """
        # Model energy coefficients (approximate watt-hours per 1K tokens)
        # Based on research estimates for different model sizes
        energy_coefficients = {
            # Small models
            'sentence-transformers/all-MiniLM-L6-v2': 0.0001,
            'microsoft/deberta-v3-small': 0.0005,
            'google/flan-t5-small': 0.0007,
            
            # Medium models
            'Salesforce/blip-image-captioning-base': 0.003,
            't5-small': 0.001,
            
            # Large models
            'Salesforce/BLIP-2': 0.015,
            
            # Default for unknown models (conservative estimate)
            'default': 0.005
        }
        
        # Get coefficient for this model or use default
        coefficient = energy_coefficients.get(model_name, energy_coefficients['default'])
        
        # Calculate energy (convert tokens to thousands)
        energy_usage = (token_count / 1000) * coefficient
        
        self.logger.info(f"Estimated energy usage for {token_count} tokens with {model_name}: {energy_usage:.6f} watt-hours")
        return energy_usage

    def adjust_budget(self, operation_type: str, new_budget: int) -> None:
        """

        Dynamically adjust token budget for an operation type.

        This allows for runtime optimization based on task priority or resource constraints.

        """
        if new_budget <= 0:
            self.logger.warning(f"Invalid budget value: {new_budget}. Budget must be positive.")
            return
            
        old_budget = self.token_budgets.get(operation_type, self.token_budgets.get('default', 0))
        self.token_budgets[operation_type] = new_budget
        
        # Log the change
        change_percent = ((new_budget - old_budget) / old_budget * 100) if old_budget else 100
        self.logger.info(f"Adjusted budget for {operation_type}: {old_budget}{new_budget} tokens ({change_percent:.1f}% change)")
        
        # If this is a significant reduction, we might want to notify dependent systems
        if new_budget < old_budget * 0.8:  # More than 20% reduction
            self.logger.warning(f"Significant budget reduction for {operation_type}. Dependent operations may be affected.")