ai_agents_sustainable / app /data_manager.py
Chamin09's picture
initial commit
7de43ca verified
# app/data_manager.py
import logging
import os
import json
import pickle
from typing import Dict, List, Optional, Tuple, Union, Any
from datetime import datetime, timedelta
import hashlib
class DataManager:
def __init__(self, cache_manager=None):
"""Initialize the DataManager with optional cache manager."""
self.logger = logging.getLogger(__name__)
self.cache_manager = cache_manager
# In-memory data store for current session
self.data_store = {}
# Track data transformations
self.transformation_history = {}
# Create data directory if it doesn't exist
os.makedirs("data/transfers", exist_ok=True)
def register_data(self, data_id: str, data: Any, source_agent: str,
data_type: str, metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Register data from an agent into the data store.
Returns a unique data reference ID.
"""
# Generate a unique reference ID
timestamp = datetime.now().isoformat()
ref_hash = hashlib.md5(f"{data_id}:{timestamp}:{source_agent}".encode()).hexdigest()
ref_id = f"{data_type}_{ref_hash[:10]}"
# Store data with metadata
self.data_store[ref_id] = {
"data": data,
"source_agent": source_agent,
"data_type": data_type,
"timestamp": timestamp,
"metadata": metadata or {},
"access_count": 0,
"last_accessed": None
}
self.logger.info(f"Registered data {ref_id} from {source_agent} of type {data_type}")
return ref_id
def get_data(self, ref_id: str, target_agent: str) -> Tuple[Any, Dict[str, Any]]:
"""
Retrieve data by reference ID.
Returns the data and its metadata.
"""
if ref_id not in self.data_store:
self.logger.warning(f"Data {ref_id} not found")
return None, {}
# Update access information
data_entry = self.data_store[ref_id]
data_entry["access_count"] += 1
data_entry["last_accessed"] = datetime.now().isoformat()
# Log the access
self.logger.info(f"Agent {target_agent} accessed data {ref_id} from {data_entry['source_agent']}")
# Track data flow
flow_key = f"{data_entry['source_agent']}_to_{target_agent}"
if flow_key not in self.transformation_history:
self.transformation_history[flow_key] = []
self.transformation_history[flow_key].append({
"ref_id": ref_id,
"timestamp": datetime.now().isoformat(),
"data_type": data_entry["data_type"]
})
return data_entry["data"], data_entry["metadata"]
def transform_data(self, input_ref_id: str, output_data: Any,
source_agent: str, target_agent: str,
transformation_type: str, output_type: str,
metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Register a data transformation from one agent to another.
Returns a reference ID for the transformed data.
"""
if input_ref_id not in self.data_store:
self.logger.warning(f"Input data {input_ref_id} not found")
return None
# Get input metadata
input_entry = self.data_store[input_ref_id]
# Combine metadata
combined_metadata = {
**input_entry.get("metadata", {}),
**(metadata or {}),
"transformation_type": transformation_type,
"input_ref_id": input_ref_id,
"input_type": input_entry["data_type"]
}
# Register the transformed data
output_ref_id = self.register_data(
input_ref_id, output_data, source_agent, output_type, combined_metadata)
# Track the transformation
if "transformations" not in self.transformation_history:
self.transformation_history["transformations"] = []
self.transformation_history["transformations"].append({
"input_ref_id": input_ref_id,
"output_ref_id": output_ref_id,
"source_agent": source_agent,
"target_agent": target_agent,
"transformation_type": transformation_type,
"timestamp": datetime.now().isoformat()
})
self.logger.info(f"Transformed data {input_ref_id} to {output_ref_id} "
f"({transformation_type}: {input_entry['data_type']}{output_type})")
return output_ref_id
def save_data_to_disk(self, ref_id: str, directory: str = "data/transfers") -> str:
"""
Save data to disk for persistence or for large objects.
Returns the file path.
"""
if ref_id not in self.data_store:
self.logger.warning(f"Data {ref_id} not found")
return None
data_entry = self.data_store[ref_id]
# Create directory if it doesn't exist
os.makedirs(directory, exist_ok=True)
# Determine file extension based on data type
data_type = data_entry["data_type"]
if data_type in ["text", "json"]:
ext = "json"
file_path = os.path.join(directory, f"{ref_id}.{ext}")
# Save as JSON
with open(file_path, 'w') as f:
if data_type == "json":
json.dump(data_entry["data"], f, indent=2)
else:
json.dump({"data": data_entry["data"], "metadata": data_entry["metadata"]}, f, indent=2)
else:
# Use pickle for other data types
ext = "pkl"
file_path = os.path.join(directory, f"{ref_id}.{ext}")
# Save as pickle
with open(file_path, 'wb') as f:
pickle.dump(data_entry, f)
self.logger.info(f"Saved data {ref_id} to {file_path}")
return file_path
def load_data_from_disk(self, file_path: str) -> str:
"""
Load data from disk into the data store.
Returns the reference ID for the loaded data.
"""
if not os.path.exists(file_path):
self.logger.warning(f"File {file_path} not found")
return None
# Determine file type from extension
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == '.json':
# Load JSON data
with open(file_path, 'r') as f:
data_dict = json.load(f)
if isinstance(data_dict, dict) and "data" in data_dict and "metadata" in data_dict:
data = data_dict["data"]
metadata = data_dict["metadata"]
data_type = "text"
else:
data = data_dict
metadata = {}
data_type = "json"
# Generate a reference ID
ref_id = f"loaded_{os.path.basename(file_path).split('.')[0]}"
# Store in data store
self.data_store[ref_id] = {
"data": data,
"source_agent": "file_system",
"data_type": data_type,
"timestamp": datetime.now().isoformat(),
"metadata": metadata,
"access_count": 0,
"last_accessed": None,
"file_path": file_path
}
elif ext == '.pkl':
# Load pickle data
with open(file_path, 'rb') as f:
data_entry = pickle.load(f)
# Generate a reference ID
ref_id = f"loaded_{os.path.basename(file_path).split('.')[0]}"
# Store in data store
self.data_store[ref_id] = data_entry
else:
self.logger.warning(f"Unsupported file extension: {ext}")
return None
self.logger.info(f"Loaded data from {file_path} with reference ID {ref_id}")
return ref_id
except Exception as e:
self.logger.error(f"Error loading data from {file_path}: {e}")
return None
def get_data_flow_graph(self) -> Dict[str, Any]:
"""
Generate a graph representation of data flows between agents.
Useful for visualization and debugging.
"""
nodes = []
edges = []
# Add nodes for each agent mentioned in transformations
agents = set()
# Extract agents from transformation history
for flow_key, flows in self.transformation_history.items():
if flow_key == "transformations":
for flow in flows:
agents.add(flow["source_agent"])
agents.add(flow["target_agent"])
elif "_to_" in flow_key:
source, target = flow_key.split("_to_")
agents.add(source)
agents.add(target)
# Create nodes
for agent in agents:
nodes.append({
"id": agent,
"label": agent.replace("_agent", "").title()
})
# Create edges from transformations
if "transformations" in self.transformation_history:
for transform in self.transformation_history["transformations"]:
edges.append({
"from": transform["source_agent"],
"to": transform["target_agent"],
"label": transform["transformation_type"],
"data": {
"input_ref": transform["input_ref_id"],
"output_ref": transform["output_ref_id"],
"timestamp": transform["timestamp"]
}
})
return {
"nodes": nodes,
"edges": edges
}
def cleanup_data(self, older_than_hours: Optional[int] = None) -> int:
"""
Clean up old data entries to free memory.
Returns the number of entries removed.
"""
if older_than_hours is None:
# Clear all
count = len(self.data_store)
self.data_store = {}
self.transformation_history = {}
self.logger.info(f"Cleared all {count} data entries")
return count
# Calculate cutoff time
cutoff = datetime.now() - timedelta(hours=older_than_hours)
cutoff_str = cutoff.isoformat()
# Find entries to remove
to_remove = []
for ref_id, entry in self.data_store.items():
timestamp = entry.get("timestamp", "")
if timestamp < cutoff_str:
to_remove.append(ref_id)
# Remove entries
for ref_id in to_remove:
del self.data_store[ref_id]
self.logger.info(f"Removed {len(to_remove)} data entries older than {older_than_hours} hours")
return len(to_remove)