# utils/metrics_calculator.py import logging import time from typing import Dict, Any, Optional, List from datetime import datetime, timedelta import numpy as np class MetricsCalculator: def __init__(self, config: Optional[Dict] = None): """Initialize the MetricsCalculator with optional configuration.""" self.config = config or {} self.logger = logging.getLogger(__name__) # Initialize metrics storage self.energy_usage = { 'total': 0.0, # Total watt-hours 'by_model': {}, 'by_agent': {}, 'by_operation': {} } self.token_metrics = { 'total_tokens': 0, 'by_model': {}, 'by_agent': {}, 'tokens_saved': 0 } self.cache_metrics = { 'hits': 0, 'misses': 0, 'hit_rate': 0.0, 'estimated_energy_saved': 0.0 } self.model_selection_metrics = { 'downgrades': 0, # Times smaller model was used 'estimated_energy_saved': 0.0 } # Carbon intensity (kg CO2 per kWh) - can be updated based on region self.carbon_intensity = self.config.get('carbon_intensity', 0.5) # PUE (Power Usage Effectiveness) of the data center self.pue = self.config.get('pue', 1.2) # Timestamps for calculating rates self.start_time = datetime.now() self.last_update = self.start_time def log_energy_usage(self, watt_hours: float, model_name: str, agent_name: str, operation_type: str) -> None: """Log energy usage for an operation.""" # Apply PUE to get total data center energy adjusted_watt_hours = watt_hours * self.pue # Update total self.energy_usage['total'] += adjusted_watt_hours # Update by model if model_name not in self.energy_usage['by_model']: self.energy_usage['by_model'][model_name] = 0.0 self.energy_usage['by_model'][model_name] += adjusted_watt_hours # Update by agent if agent_name not in self.energy_usage['by_agent']: self.energy_usage['by_agent'][agent_name] = 0.0 self.energy_usage['by_agent'][agent_name] += adjusted_watt_hours # Update by operation if operation_type not in self.energy_usage['by_operation']: self.energy_usage['by_operation'][operation_type] = 0.0 self.energy_usage['by_operation'][operation_type] += adjusted_watt_hours self.logger.debug(f"Logged {adjusted_watt_hours:.6f} Wh for {agent_name}.{operation_type} using {model_name}") def log_token_usage(self, token_count: int, model_name: str, agent_name: str) -> None: """Log token usage.""" # Update total self.token_metrics['total_tokens'] += token_count # Update by model if model_name not in self.token_metrics['by_model']: self.token_metrics['by_model'][model_name] = 0 self.token_metrics['by_model'][model_name] += token_count # Update by agent if agent_name not in self.token_metrics['by_agent']: self.token_metrics['by_agent'][agent_name] = 0 self.token_metrics['by_agent'][agent_name] += token_count def log_tokens_saved(self, tokens_saved: int) -> None: """Log tokens saved through optimization techniques.""" self.token_metrics['tokens_saved'] += tokens_saved def update_cache_metrics(self, hits: int, misses: int, estimated_energy_saved: float) -> None: """Update cache performance metrics.""" self.cache_metrics['hits'] += hits self.cache_metrics['misses'] += misses total = self.cache_metrics['hits'] + self.cache_metrics['misses'] if total > 0: self.cache_metrics['hit_rate'] = self.cache_metrics['hits'] / total self.cache_metrics['estimated_energy_saved'] += estimated_energy_saved def log_model_downgrade(self, original_model: str, used_model: str, estimated_energy_saved: float) -> None: """Log when a smaller model was used instead of a larger one.""" self.model_selection_metrics['downgrades'] += 1 self.model_selection_metrics['estimated_energy_saved'] += estimated_energy_saved def calculate_carbon_footprint(self) -> float: """Calculate carbon footprint in kg CO2 equivalent.""" # Convert watt-hours to kilowatt-hours kwh = self.energy_usage['total'] / 1000.0 # Apply carbon intensity carbon_kg = kwh * self.carbon_intensity return carbon_kg def calculate_efficiency_metrics(self) -> Dict[str, float]: """Calculate efficiency metrics.""" # Calculate time elapsed elapsed_seconds = (datetime.now() - self.start_time).total_seconds() # Avoid division by zero if elapsed_seconds == 0: elapsed_seconds = 0.001 if self.energy_usage['total'] == 0: energy_wh = 0.001 else: energy_wh = self.energy_usage['total'] return { 'tokens_per_watt_hour': self.token_metrics['total_tokens'] / energy_wh, 'watt_hours_per_hour': energy_wh * 3600 / elapsed_seconds, 'carbon_per_hour': self.calculate_carbon_footprint() * 3600 / elapsed_seconds } def get_environmental_equivalents(self) -> Dict[str, float]: """Convert energy usage to relatable environmental equivalents.""" carbon_kg = self.calculate_carbon_footprint() # Approximate equivalents based on EPA and other sources return { 'miles_driven': carbon_kg * 2.5, # ~400g CO2/mile 'smartphone_charges': self.energy_usage['total'] / 10.0, # ~10 Wh per charge 'trees_needed_day': carbon_kg / 0.04, # ~40g CO2 absorbed per tree per day } def get_optimization_gains(self) -> Dict[str, Any]: """Calculate gains from various optimization techniques.""" return { 'tokens_saved': self.token_metrics['tokens_saved'], 'tokens_saved_pct': (self.token_metrics['tokens_saved'] / (self.token_metrics['total_tokens'] + self.token_metrics['tokens_saved'] + 0.001)) * 100, 'cache_energy_saved': self.cache_metrics['estimated_energy_saved'], 'model_selection_energy_saved': self.model_selection_metrics['estimated_energy_saved'], 'total_energy_saved': (self.cache_metrics['estimated_energy_saved'] + self.model_selection_metrics['estimated_energy_saved']) } def get_all_metrics(self) -> Dict[str, Any]: """Get all metrics in a single dictionary.""" return { 'energy_usage': self.energy_usage, 'token_metrics': self.token_metrics, 'cache_metrics': self.cache_metrics, 'model_selection_metrics': self.model_selection_metrics, 'carbon_footprint_kg': self.calculate_carbon_footprint(), 'efficiency_metrics': self.calculate_efficiency_metrics(), 'environmental_equivalents': self.get_environmental_equivalents(), 'optimization_gains': self.get_optimization_gains(), 'runtime_seconds': (datetime.now() - self.start_time).total_seconds() }