Spaces:
Sleeping
Sleeping
# agents/metrics_agent.py | |
import logging | |
import time | |
from typing import Dict, List, Optional, Tuple, Union, Any | |
import json | |
import matplotlib.pyplot as plt | |
import io | |
import base64 | |
from datetime import datetime, timedelta | |
class MetricsAgent: | |
def __init__(self, metrics_calculator, token_manager=None, cache_manager=None): | |
"""Initialize the MetricsAgent with required utilities.""" | |
self.logger = logging.getLogger(__name__) | |
self.metrics_calculator = metrics_calculator | |
self.token_manager = token_manager | |
self.cache_manager = cache_manager | |
# Track start time | |
self.start_time = datetime.now() | |
# Store historical metrics for trend analysis | |
self.metrics_history = [] | |
self.history_interval = 10 # seconds between history points | |
self.last_history_time = self.start_time | |
# Agent name for logging | |
self.agent_name = "metrics_agent" | |
def track_agent_performance(self) -> Dict[str, Any]: | |
""" | |
Track and report performance metrics for all agents. | |
Returns a comprehensive metrics report. | |
""" | |
if not self.metrics_calculator: | |
return {"error": "No metrics calculator available"} | |
# Get all metrics from calculator | |
all_metrics = self.metrics_calculator.get_all_metrics() | |
# Add additional derived metrics | |
derived_metrics = self._calculate_derived_metrics(all_metrics) | |
all_metrics["derived_metrics"] = derived_metrics | |
# Add timestamp | |
all_metrics["timestamp"] = datetime.now().isoformat() | |
all_metrics["elapsed_time"] = (datetime.now() - self.start_time).total_seconds() | |
# Add to history if enough time has passed | |
if (datetime.now() - self.last_history_time).total_seconds() >= self.history_interval: | |
self.metrics_history.append(all_metrics) | |
self.last_history_time = datetime.now() | |
return all_metrics | |
def _calculate_derived_metrics(self, metrics: Dict[str, Any]) -> Dict[str, Any]: | |
"""Calculate additional derived metrics from raw metrics.""" | |
derived = {} | |
# Calculate energy efficiency | |
if metrics.get("token_metrics", {}).get("total_tokens", 0) > 0: | |
energy_total = metrics.get("energy_usage", {}).get("total", 0.001) | |
tokens_total = metrics.get("token_metrics", {}).get("total_tokens", 0) | |
derived["energy_per_token"] = energy_total / tokens_total | |
derived["tokens_per_watt_hour"] = tokens_total / energy_total | |
# Calculate optimization effectiveness | |
if metrics.get("optimization_gains", {}): | |
opt_gains = metrics.get("optimization_gains", {}) | |
total_energy = metrics.get("energy_usage", {}).get("total", 0) | |
if total_energy > 0: | |
energy_saved = opt_gains.get("total_energy_saved", 0) | |
derived["optimization_effectiveness"] = energy_saved / (total_energy + energy_saved) | |
# Breakdown by optimization type | |
derived["optimization_breakdown"] = { | |
"caching": opt_gains.get("cache_energy_saved", 0), | |
"model_selection": opt_gains.get("model_selection_energy_saved", 0) | |
} | |
# Calculate cache effectiveness | |
if self.cache_manager: | |
cache_stats = self.cache_manager.get_stats() | |
derived["cache_effectiveness"] = { | |
"hit_rate": cache_stats.get("hit_rate", 0), | |
"memory_efficiency": cache_stats.get("current_size", 0) / max(cache_stats.get("max_size", 1), 1) | |
} | |
return derived | |
def generate_sustainability_report(self) -> Dict[str, Any]: | |
""" | |
Generate a comprehensive sustainability report. | |
Includes environmental impact metrics and recommendations. | |
""" | |
# Get current metrics | |
metrics = self.track_agent_performance() | |
# Calculate key sustainability indicators | |
carbon_footprint = metrics.get("carbon_footprint_kg", 0) | |
energy_usage = metrics.get("energy_usage", {}).get("total", 0) | |
optimization_gains = metrics.get("optimization_gains", {}) | |
# Generate environmental equivalents | |
env_equivalents = metrics.get("environmental_equivalents", {}) | |
# Calculate potential future savings | |
potential_savings = self._calculate_potential_savings(metrics) | |
# Generate recommendations | |
recommendations = self._generate_optimization_recommendations(metrics) | |
# Prepare report | |
report = { | |
"timestamp": datetime.now().isoformat(), | |
"runtime_seconds": metrics.get("runtime_seconds", 0), | |
"sustainability_metrics": { | |
"energy_usage_wh": energy_usage, | |
"carbon_footprint_kg": carbon_footprint, | |
"environmental_equivalents": env_equivalents | |
}, | |
"optimization_results": { | |
"energy_saved_wh": optimization_gains.get("total_energy_saved", 0), | |
"tokens_saved": optimization_gains.get("tokens_saved", 0), | |
"tokens_saved_percent": optimization_gains.get("tokens_saved_pct", 0) | |
}, | |
"potential_future_savings": potential_savings, | |
"recommendations": recommendations | |
} | |
return report | |
def _calculate_potential_savings(self, metrics: Dict[str, Any]) -> Dict[str, Any]: | |
"""Calculate potential future energy savings based on current patterns.""" | |
# Get current optimization effectiveness | |
derived = metrics.get("derived_metrics", {}) | |
current_effectiveness = derived.get("optimization_effectiveness", 0) | |
# Calculate potential improvements | |
cache_hit_rate = derived.get("cache_effectiveness", {}).get("hit_rate", 0) | |
potential_hit_rate = min(cache_hit_rate + 0.2, 0.95) # Assume we can improve hit rate by up to 20% | |
# Calculate energy that could be saved with improved caching | |
energy_usage = metrics.get("energy_usage", {}).get("total", 0) | |
potential_cache_savings = energy_usage * 0.3 * (potential_hit_rate - cache_hit_rate) / max(cache_hit_rate, 0.01) | |
# Calculate potential model selection improvements | |
model_selection_current = derived.get("optimization_breakdown", {}).get("model_selection", 0) | |
potential_model_selection = model_selection_current * 1.5 # Assume 50% improvement | |
return { | |
"improved_caching_wh": potential_cache_savings, | |
"improved_model_selection_wh": potential_model_selection - model_selection_current, | |
"total_potential_wh": potential_cache_savings + (potential_model_selection - model_selection_current), | |
"percent_improvement": (potential_cache_savings + (potential_model_selection - model_selection_current)) / max(energy_usage, 0.001) * 100 | |
} | |
def _generate_optimization_recommendations(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]: | |
"""Generate actionable recommendations for improving efficiency.""" | |
recommendations = [] | |
# Check token usage by model | |
token_metrics = metrics.get("token_metrics", {}) | |
by_model = token_metrics.get("by_model", {}) | |
# Find the most token-heavy model | |
if by_model: | |
heaviest_model = max(by_model.items(), key=lambda x: x[1]) | |
if heaviest_model[1] > 1000: | |
recommendations.append({ | |
"type": "token_optimization", | |
"target": heaviest_model[0], | |
"description": f"Optimize prompts for {heaviest_model[0]} to reduce token usage", | |
"potential_impact": "Medium" | |
}) | |
# Check cache effectiveness | |
derived = metrics.get("derived_metrics", {}) | |
cache_effectiveness = derived.get("cache_effectiveness", {}) | |
if cache_effectiveness.get("hit_rate", 1) < 0.5: | |
recommendations.append({ | |
"type": "cache_optimization", | |
"description": "Increase cache size or improve cache key generation for better hit rates", | |
"potential_impact": "High" | |
}) | |
# Check model selection | |
model_selection = derived.get("optimization_breakdown", {}).get("model_selection", 0) | |
energy_usage = metrics.get("energy_usage", {}).get("total", 0) | |
if model_selection < 0.1 * energy_usage: | |
recommendations.append({ | |
"type": "model_selection", | |
"description": "Refine complexity detection to use smaller models more frequently", | |
"potential_impact": "High" | |
}) | |
return recommendations | |
def generate_metrics_dashboard(self, format: str = "json") -> Dict[str, Any]: | |
""" | |
Generate a visual or structured representation of system metrics. | |
Returns data in the specified format (json, html, image_base64). | |
""" | |
# Get current metrics | |
metrics = self.track_agent_performance() | |
if format == "json": | |
return metrics | |
elif format == "image_base64": | |
# Generate visualization | |
img_data = self._generate_dashboard_visualization(metrics) | |
return { | |
"format": "image_base64", | |
"data": img_data, | |
"timestamp": datetime.now().isoformat() | |
} | |
elif format == "html": | |
# Generate HTML representation | |
html_data = self._generate_html_dashboard(metrics) | |
return { | |
"format": "html", | |
"data": html_data, | |
"timestamp": datetime.now().isoformat() | |
} | |
else: | |
return {"error": f"Unsupported format: {format}"} | |
def _generate_dashboard_visualization(self, metrics: Dict[str, Any]) -> str: | |
"""Generate a visualization of metrics and return as base64 image.""" | |
# Create figure with subplots | |
fig, axs = plt.subplots(2, 2, figsize=(12, 10)) | |
# 1. Energy Usage by Model | |
energy_by_model = metrics.get("energy_usage", {}).get("by_model", {}) | |
if energy_by_model: | |
models = list(energy_by_model.keys()) | |
energy_values = list(energy_by_model.values()) | |
axs[0, 0].bar(models, energy_values) | |
axs[0, 0].set_title('Energy Usage by Model') | |
axs[0, 0].set_ylabel('Watt-hours') | |
axs[0, 0].tick_params(axis='x', rotation=45) | |
# 2. Token Usage by Agent | |
token_by_agent = metrics.get("token_metrics", {}).get("by_agent", {}) | |
if token_by_agent: | |
agents = list(token_by_agent.keys()) | |
token_values = list(token_by_agent.values()) | |
axs[0, 1].bar(agents, token_values) | |
axs[0, 1].set_title('Token Usage by Agent') | |
axs[0, 1].set_ylabel('Tokens') | |
axs[0, 1].tick_params(axis='x', rotation=45) | |
# 3. Optimization Gains | |
opt_gains = metrics.get("optimization_gains", {}) | |
if opt_gains: | |
gain_types = ['Tokens Saved', 'Cache Energy Saved', 'Model Selection Energy Saved'] | |
gain_values = [ | |
opt_gains.get("tokens_saved", 0) / 100, # Scale down for visualization | |
opt_gains.get("cache_energy_saved", 0), | |
opt_gains.get("model_selection_energy_saved", 0) | |
] | |
axs[1, 0].bar(gain_types, gain_values) | |
axs[1, 0].set_title('Optimization Gains') | |
axs[1, 0].set_ylabel('Value') | |
axs[1, 0].tick_params(axis='x', rotation=45) | |
# 4. Environmental Impact | |
env_equiv = metrics.get("environmental_equivalents", {}) | |
if env_equiv: | |
impact_types = list(env_equiv.keys()) | |
impact_values = list(env_equiv.values()) | |
axs[1, 1].bar(impact_types, impact_values) | |
axs[1, 1].set_title('Environmental Equivalents') | |
axs[1, 1].set_ylabel('Value') | |
axs[1, 1].tick_params(axis='x', rotation=45) | |
# Adjust layout and convert to base64 | |
plt.tight_layout() | |
# Save to bytes buffer | |
buffer = io.BytesIO() | |
plt.savefig(buffer, format='png') | |
buffer.seek(0) | |
# Convert to base64 | |
img_str = base64.b64encode(buffer.getvalue()).decode('utf-8') | |
# Close plot to free memory | |
plt.close(fig) | |
return img_str | |
def _generate_html_dashboard(self, metrics: Dict[str, Any]) -> str: | |
"""Generate an HTML representation of the metrics dashboard.""" | |
# Simple HTML template | |
html_template = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Sustainability Metrics Dashboard</title> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 20px; } | |
.dashboard { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; } | |
.card { border: 1px solid #ddd; border-radius: 5px; padding: 15px; } | |
.card h2 { margin-top: 0; color: #333; } | |
.metric { margin: 10px 0; } | |
.metric-name { font-weight: bold; } | |
.metric-value { float: right; } | |
.highlight { color: #2a6496; font-weight: bold; } | |
.recommendations { grid-column: span 2; } | |
</style> | |
</head> | |
<body> | |
<h1>Sustainability Metrics Dashboard</h1> | |
<p>Generated on: {timestamp}</p> | |
<div class="dashboard"> | |
<div class="card"> | |
<h2>Energy Usage</h2> | |
{energy_metrics} | |
</div> | |
<div class="card"> | |
<h2>Token Usage</h2> | |
{token_metrics} | |
</div> | |
<div class="card"> | |
<h2>Optimization Gains</h2> | |
{optimization_metrics} | |
</div> | |
<div class="card"> | |
<h2>Environmental Impact</h2> | |
{environmental_metrics} | |
</div> | |
<div class="card recommendations"> | |
<h2>Recommendations</h2> | |
{recommendations} | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
# Generate energy metrics HTML | |
energy_usage = metrics.get("energy_usage", {}) | |
energy_metrics_html = "" | |
if energy_usage: | |
energy_metrics_html += self._format_metric("Total Energy", f"{energy_usage.get('total', 0):.6f} Wh") | |
energy_metrics_html += "<h3>By Model</h3>" | |
for model, usage in energy_usage.get("by_model", {}).items(): | |
energy_metrics_html += self._format_metric(model, f"{usage:.6f} Wh") | |
# Generate token metrics HTML | |
token_metrics = metrics.get("token_metrics", {}) | |
token_metrics_html = "" | |
if token_metrics: | |
token_metrics_html += self._format_metric("Total Tokens", token_metrics.get("total_tokens", 0)) | |
token_metrics_html += self._format_metric("Tokens Saved", token_metrics.get("tokens_saved", 0)) | |
token_metrics_html += "<h3>By Agent</h3>" | |
for agent, tokens in token_metrics.get("by_agent", {}).items(): | |
token_metrics_html += self._format_metric(agent, tokens) | |
# Generate optimization metrics HTML | |
opt_gains = metrics.get("optimization_gains", {}) | |
optimization_metrics_html = "" | |
if opt_gains: | |
optimization_metrics_html += self._format_metric("Tokens Saved", opt_gains.get("tokens_saved", 0)) | |
optimization_metrics_html += self._format_metric("Tokens Saved %", f"{opt_gains.get('tokens_saved_pct', 0):.2f}%") | |
optimization_metrics_html += self._format_metric("Cache Energy Saved", f"{opt_gains.get('cache_energy_saved', 0):.6f} Wh") | |
optimization_metrics_html += self._format_metric("Model Selection Saved", f"{opt_gains.get('model_selection_energy_saved', 0):.6f} Wh") | |
optimization_metrics_html += self._format_metric("Total Energy Saved", f"{opt_gains.get('total_energy_saved', 0):.6f} Wh", highlight=True) | |
# Generate environmental metrics HTML | |
env_equiv = metrics.get("environmental_equivalents", {}) | |
environmental_metrics_html = "" | |
if env_equiv: | |
environmental_metrics_html += self._format_metric("Carbon Footprint", f"{metrics.get('carbon_footprint_kg', 0):.6f} kg CO₂") | |
environmental_metrics_html += "<h3>Equivalents</h3>" | |
for impact_type, value in env_equiv.items(): | |
label = impact_type.replace('_', ' ').title() | |
environmental_metrics_html += self._format_metric(label, f"{value:.2f}") | |
# Generate recommendations HTML | |
recommendations = metrics.get("recommendations", []) | |
recommendations_html = "<ul>" | |
if recommendations: | |
for rec in recommendations: | |
impact = rec.get("potential_impact", "") | |
impact_class = "highlight" if impact == "High" else "" | |
recommendations_html += f"<li><span class='{impact_class}'>{rec.get('description', '')}</span> (Impact: {impact})</li>" | |
else: | |
recommendations_html += "<li>No recommendations available</li>" | |
recommendations_html += "</ul>" | |
# Format final HTML | |
html = html_template.format( | |
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
energy_metrics=energy_metrics_html, | |
token_metrics=token_metrics_html, | |
optimization_metrics=optimization_metrics_html, | |
environmental_metrics=environmental_metrics_html, | |
recommendations=recommendations_html | |
) | |
return html | |
def _format_metric(self, name: str, value: Any, highlight: bool = False) -> str: | |
"""Helper method to format a metric as HTML.""" | |
highlight_class = "highlight" if highlight else "" | |
return f"<div class='metric'><span class='metric-name'>{name}:</span> <span class='metric-value {highlight_class}'>{value}</span></div>" | |