# agents/metrics_agent.py import logging import time from typing import Dict, List, Optional, Tuple, Union, Any import json import matplotlib.pyplot as plt import io import base64 from datetime import datetime, timedelta class MetricsAgent: def __init__(self, metrics_calculator, token_manager=None, cache_manager=None): """Initialize the MetricsAgent with required utilities.""" self.logger = logging.getLogger(__name__) self.metrics_calculator = metrics_calculator self.token_manager = token_manager self.cache_manager = cache_manager # Track start time self.start_time = datetime.now() # Store historical metrics for trend analysis self.metrics_history = [] self.history_interval = 10 # seconds between history points self.last_history_time = self.start_time # Agent name for logging self.agent_name = "metrics_agent" def track_agent_performance(self) -> Dict[str, Any]: """ Track and report performance metrics for all agents. Returns a comprehensive metrics report. """ if not self.metrics_calculator: return {"error": "No metrics calculator available"} # Get all metrics from calculator all_metrics = self.metrics_calculator.get_all_metrics() # Add additional derived metrics derived_metrics = self._calculate_derived_metrics(all_metrics) all_metrics["derived_metrics"] = derived_metrics # Add timestamp all_metrics["timestamp"] = datetime.now().isoformat() all_metrics["elapsed_time"] = (datetime.now() - self.start_time).total_seconds() # Add to history if enough time has passed if (datetime.now() - self.last_history_time).total_seconds() >= self.history_interval: self.metrics_history.append(all_metrics) self.last_history_time = datetime.now() return all_metrics def _calculate_derived_metrics(self, metrics: Dict[str, Any]) -> Dict[str, Any]: """Calculate additional derived metrics from raw metrics.""" derived = {} # Calculate energy efficiency if metrics.get("token_metrics", {}).get("total_tokens", 0) > 0: energy_total = metrics.get("energy_usage", {}).get("total", 0.001) tokens_total = metrics.get("token_metrics", {}).get("total_tokens", 0) derived["energy_per_token"] = energy_total / tokens_total derived["tokens_per_watt_hour"] = tokens_total / energy_total # Calculate optimization effectiveness if metrics.get("optimization_gains", {}): opt_gains = metrics.get("optimization_gains", {}) total_energy = metrics.get("energy_usage", {}).get("total", 0) if total_energy > 0: energy_saved = opt_gains.get("total_energy_saved", 0) derived["optimization_effectiveness"] = energy_saved / (total_energy + energy_saved) # Breakdown by optimization type derived["optimization_breakdown"] = { "caching": opt_gains.get("cache_energy_saved", 0), "model_selection": opt_gains.get("model_selection_energy_saved", 0) } # Calculate cache effectiveness if self.cache_manager: cache_stats = self.cache_manager.get_stats() derived["cache_effectiveness"] = { "hit_rate": cache_stats.get("hit_rate", 0), "memory_efficiency": cache_stats.get("current_size", 0) / max(cache_stats.get("max_size", 1), 1) } return derived def generate_sustainability_report(self) -> Dict[str, Any]: """ Generate a comprehensive sustainability report. Includes environmental impact metrics and recommendations. """ # Get current metrics metrics = self.track_agent_performance() # Calculate key sustainability indicators carbon_footprint = metrics.get("carbon_footprint_kg", 0) energy_usage = metrics.get("energy_usage", {}).get("total", 0) optimization_gains = metrics.get("optimization_gains", {}) # Generate environmental equivalents env_equivalents = metrics.get("environmental_equivalents", {}) # Calculate potential future savings potential_savings = self._calculate_potential_savings(metrics) # Generate recommendations recommendations = self._generate_optimization_recommendations(metrics) # Prepare report report = { "timestamp": datetime.now().isoformat(), "runtime_seconds": metrics.get("runtime_seconds", 0), "sustainability_metrics": { "energy_usage_wh": energy_usage, "carbon_footprint_kg": carbon_footprint, "environmental_equivalents": env_equivalents }, "optimization_results": { "energy_saved_wh": optimization_gains.get("total_energy_saved", 0), "tokens_saved": optimization_gains.get("tokens_saved", 0), "tokens_saved_percent": optimization_gains.get("tokens_saved_pct", 0) }, "potential_future_savings": potential_savings, "recommendations": recommendations } return report def _calculate_potential_savings(self, metrics: Dict[str, Any]) -> Dict[str, Any]: """Calculate potential future energy savings based on current patterns.""" # Get current optimization effectiveness derived = metrics.get("derived_metrics", {}) current_effectiveness = derived.get("optimization_effectiveness", 0) # Calculate potential improvements cache_hit_rate = derived.get("cache_effectiveness", {}).get("hit_rate", 0) potential_hit_rate = min(cache_hit_rate + 0.2, 0.95) # Assume we can improve hit rate by up to 20% # Calculate energy that could be saved with improved caching energy_usage = metrics.get("energy_usage", {}).get("total", 0) potential_cache_savings = energy_usage * 0.3 * (potential_hit_rate - cache_hit_rate) / max(cache_hit_rate, 0.01) # Calculate potential model selection improvements model_selection_current = derived.get("optimization_breakdown", {}).get("model_selection", 0) potential_model_selection = model_selection_current * 1.5 # Assume 50% improvement return { "improved_caching_wh": potential_cache_savings, "improved_model_selection_wh": potential_model_selection - model_selection_current, "total_potential_wh": potential_cache_savings + (potential_model_selection - model_selection_current), "percent_improvement": (potential_cache_savings + (potential_model_selection - model_selection_current)) / max(energy_usage, 0.001) * 100 } def _generate_optimization_recommendations(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate actionable recommendations for improving efficiency.""" recommendations = [] # Check token usage by model token_metrics = metrics.get("token_metrics", {}) by_model = token_metrics.get("by_model", {}) # Find the most token-heavy model if by_model: heaviest_model = max(by_model.items(), key=lambda x: x[1]) if heaviest_model[1] > 1000: recommendations.append({ "type": "token_optimization", "target": heaviest_model[0], "description": f"Optimize prompts for {heaviest_model[0]} to reduce token usage", "potential_impact": "Medium" }) # Check cache effectiveness derived = metrics.get("derived_metrics", {}) cache_effectiveness = derived.get("cache_effectiveness", {}) if cache_effectiveness.get("hit_rate", 1) < 0.5: recommendations.append({ "type": "cache_optimization", "description": "Increase cache size or improve cache key generation for better hit rates", "potential_impact": "High" }) # Check model selection model_selection = derived.get("optimization_breakdown", {}).get("model_selection", 0) energy_usage = metrics.get("energy_usage", {}).get("total", 0) if model_selection < 0.1 * energy_usage: recommendations.append({ "type": "model_selection", "description": "Refine complexity detection to use smaller models more frequently", "potential_impact": "High" }) return recommendations def generate_metrics_dashboard(self, format: str = "json") -> Dict[str, Any]: """ Generate a visual or structured representation of system metrics. Returns data in the specified format (json, html, image_base64). """ # Get current metrics metrics = self.track_agent_performance() if format == "json": return metrics elif format == "image_base64": # Generate visualization img_data = self._generate_dashboard_visualization(metrics) return { "format": "image_base64", "data": img_data, "timestamp": datetime.now().isoformat() } elif format == "html": # Generate HTML representation html_data = self._generate_html_dashboard(metrics) return { "format": "html", "data": html_data, "timestamp": datetime.now().isoformat() } else: return {"error": f"Unsupported format: {format}"} def _generate_dashboard_visualization(self, metrics: Dict[str, Any]) -> str: """Generate a visualization of metrics and return as base64 image.""" # Create figure with subplots fig, axs = plt.subplots(2, 2, figsize=(12, 10)) # 1. Energy Usage by Model energy_by_model = metrics.get("energy_usage", {}).get("by_model", {}) if energy_by_model: models = list(energy_by_model.keys()) energy_values = list(energy_by_model.values()) axs[0, 0].bar(models, energy_values) axs[0, 0].set_title('Energy Usage by Model') axs[0, 0].set_ylabel('Watt-hours') axs[0, 0].tick_params(axis='x', rotation=45) # 2. Token Usage by Agent token_by_agent = metrics.get("token_metrics", {}).get("by_agent", {}) if token_by_agent: agents = list(token_by_agent.keys()) token_values = list(token_by_agent.values()) axs[0, 1].bar(agents, token_values) axs[0, 1].set_title('Token Usage by Agent') axs[0, 1].set_ylabel('Tokens') axs[0, 1].tick_params(axis='x', rotation=45) # 3. Optimization Gains opt_gains = metrics.get("optimization_gains", {}) if opt_gains: gain_types = ['Tokens Saved', 'Cache Energy Saved', 'Model Selection Energy Saved'] gain_values = [ opt_gains.get("tokens_saved", 0) / 100, # Scale down for visualization opt_gains.get("cache_energy_saved", 0), opt_gains.get("model_selection_energy_saved", 0) ] axs[1, 0].bar(gain_types, gain_values) axs[1, 0].set_title('Optimization Gains') axs[1, 0].set_ylabel('Value') axs[1, 0].tick_params(axis='x', rotation=45) # 4. Environmental Impact env_equiv = metrics.get("environmental_equivalents", {}) if env_equiv: impact_types = list(env_equiv.keys()) impact_values = list(env_equiv.values()) axs[1, 1].bar(impact_types, impact_values) axs[1, 1].set_title('Environmental Equivalents') axs[1, 1].set_ylabel('Value') axs[1, 1].tick_params(axis='x', rotation=45) # Adjust layout and convert to base64 plt.tight_layout() # Save to bytes buffer buffer = io.BytesIO() plt.savefig(buffer, format='png') buffer.seek(0) # Convert to base64 img_str = base64.b64encode(buffer.getvalue()).decode('utf-8') # Close plot to free memory plt.close(fig) return img_str def _generate_html_dashboard(self, metrics: Dict[str, Any]) -> str: """Generate an HTML representation of the metrics dashboard.""" # Simple HTML template html_template = """ Sustainability Metrics Dashboard

Sustainability Metrics Dashboard

Generated on: {timestamp}

Energy Usage

{energy_metrics}

Token Usage

{token_metrics}

Optimization Gains

{optimization_metrics}

Environmental Impact

{environmental_metrics}

Recommendations

{recommendations}
""" # Generate energy metrics HTML energy_usage = metrics.get("energy_usage", {}) energy_metrics_html = "" if energy_usage: energy_metrics_html += self._format_metric("Total Energy", f"{energy_usage.get('total', 0):.6f} Wh") energy_metrics_html += "

By Model

" for model, usage in energy_usage.get("by_model", {}).items(): energy_metrics_html += self._format_metric(model, f"{usage:.6f} Wh") # Generate token metrics HTML token_metrics = metrics.get("token_metrics", {}) token_metrics_html = "" if token_metrics: token_metrics_html += self._format_metric("Total Tokens", token_metrics.get("total_tokens", 0)) token_metrics_html += self._format_metric("Tokens Saved", token_metrics.get("tokens_saved", 0)) token_metrics_html += "

By Agent

" for agent, tokens in token_metrics.get("by_agent", {}).items(): token_metrics_html += self._format_metric(agent, tokens) # Generate optimization metrics HTML opt_gains = metrics.get("optimization_gains", {}) optimization_metrics_html = "" if opt_gains: optimization_metrics_html += self._format_metric("Tokens Saved", opt_gains.get("tokens_saved", 0)) optimization_metrics_html += self._format_metric("Tokens Saved %", f"{opt_gains.get('tokens_saved_pct', 0):.2f}%") optimization_metrics_html += self._format_metric("Cache Energy Saved", f"{opt_gains.get('cache_energy_saved', 0):.6f} Wh") optimization_metrics_html += self._format_metric("Model Selection Saved", f"{opt_gains.get('model_selection_energy_saved', 0):.6f} Wh") optimization_metrics_html += self._format_metric("Total Energy Saved", f"{opt_gains.get('total_energy_saved', 0):.6f} Wh", highlight=True) # Generate environmental metrics HTML env_equiv = metrics.get("environmental_equivalents", {}) environmental_metrics_html = "" if env_equiv: environmental_metrics_html += self._format_metric("Carbon Footprint", f"{metrics.get('carbon_footprint_kg', 0):.6f} kg CO₂") environmental_metrics_html += "

Equivalents

" for impact_type, value in env_equiv.items(): label = impact_type.replace('_', ' ').title() environmental_metrics_html += self._format_metric(label, f"{value:.2f}") # Generate recommendations HTML recommendations = metrics.get("recommendations", []) recommendations_html = "" # Format final HTML html = html_template.format( timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), energy_metrics=energy_metrics_html, token_metrics=token_metrics_html, optimization_metrics=optimization_metrics_html, environmental_metrics=environmental_metrics_html, recommendations=recommendations_html ) return html def _format_metric(self, name: str, value: Any, highlight: bool = False) -> str: """Helper method to format a metric as HTML.""" highlight_class = "highlight" if highlight else "" return f"
{name}: {value}
"