Spaces:
Sleeping
Sleeping
# app/data_manager.py | |
import logging | |
import os | |
import json | |
import pickle | |
from typing import Dict, List, Optional, Tuple, Union, Any | |
from datetime import datetime, timedelta | |
import hashlib | |
class DataManager: | |
def __init__(self, cache_manager=None): | |
"""Initialize the DataManager with optional cache manager.""" | |
self.logger = logging.getLogger(__name__) | |
self.cache_manager = cache_manager | |
# In-memory data store for current session | |
self.data_store = {} | |
# Track data transformations | |
self.transformation_history = {} | |
# Create data directory if it doesn't exist | |
os.makedirs("data/transfers", exist_ok=True) | |
def register_data(self, data_id: str, data: Any, source_agent: str, | |
data_type: str, metadata: Optional[Dict[str, Any]] = None) -> str: | |
""" | |
Register data from an agent into the data store. | |
Returns a unique data reference ID. | |
""" | |
# Generate a unique reference ID | |
timestamp = datetime.now().isoformat() | |
ref_hash = hashlib.md5(f"{data_id}:{timestamp}:{source_agent}".encode()).hexdigest() | |
ref_id = f"{data_type}_{ref_hash[:10]}" | |
# Store data with metadata | |
self.data_store[ref_id] = { | |
"data": data, | |
"source_agent": source_agent, | |
"data_type": data_type, | |
"timestamp": timestamp, | |
"metadata": metadata or {}, | |
"access_count": 0, | |
"last_accessed": None | |
} | |
self.logger.info(f"Registered data {ref_id} from {source_agent} of type {data_type}") | |
return ref_id | |
def get_data(self, ref_id: str, target_agent: str) -> Tuple[Any, Dict[str, Any]]: | |
""" | |
Retrieve data by reference ID. | |
Returns the data and its metadata. | |
""" | |
if ref_id not in self.data_store: | |
self.logger.warning(f"Data {ref_id} not found") | |
return None, {} | |
# Update access information | |
data_entry = self.data_store[ref_id] | |
data_entry["access_count"] += 1 | |
data_entry["last_accessed"] = datetime.now().isoformat() | |
# Log the access | |
self.logger.info(f"Agent {target_agent} accessed data {ref_id} from {data_entry['source_agent']}") | |
# Track data flow | |
flow_key = f"{data_entry['source_agent']}_to_{target_agent}" | |
if flow_key not in self.transformation_history: | |
self.transformation_history[flow_key] = [] | |
self.transformation_history[flow_key].append({ | |
"ref_id": ref_id, | |
"timestamp": datetime.now().isoformat(), | |
"data_type": data_entry["data_type"] | |
}) | |
return data_entry["data"], data_entry["metadata"] | |
def transform_data(self, input_ref_id: str, output_data: Any, | |
source_agent: str, target_agent: str, | |
transformation_type: str, output_type: str, | |
metadata: Optional[Dict[str, Any]] = None) -> str: | |
""" | |
Register a data transformation from one agent to another. | |
Returns a reference ID for the transformed data. | |
""" | |
if input_ref_id not in self.data_store: | |
self.logger.warning(f"Input data {input_ref_id} not found") | |
return None | |
# Get input metadata | |
input_entry = self.data_store[input_ref_id] | |
# Combine metadata | |
combined_metadata = { | |
**input_entry.get("metadata", {}), | |
**(metadata or {}), | |
"transformation_type": transformation_type, | |
"input_ref_id": input_ref_id, | |
"input_type": input_entry["data_type"] | |
} | |
# Register the transformed data | |
output_ref_id = self.register_data( | |
input_ref_id, output_data, source_agent, output_type, combined_metadata) | |
# Track the transformation | |
if "transformations" not in self.transformation_history: | |
self.transformation_history["transformations"] = [] | |
self.transformation_history["transformations"].append({ | |
"input_ref_id": input_ref_id, | |
"output_ref_id": output_ref_id, | |
"source_agent": source_agent, | |
"target_agent": target_agent, | |
"transformation_type": transformation_type, | |
"timestamp": datetime.now().isoformat() | |
}) | |
self.logger.info(f"Transformed data {input_ref_id} to {output_ref_id} " | |
f"({transformation_type}: {input_entry['data_type']} → {output_type})") | |
return output_ref_id | |
def save_data_to_disk(self, ref_id: str, directory: str = "data/transfers") -> str: | |
""" | |
Save data to disk for persistence or for large objects. | |
Returns the file path. | |
""" | |
if ref_id not in self.data_store: | |
self.logger.warning(f"Data {ref_id} not found") | |
return None | |
data_entry = self.data_store[ref_id] | |
# Create directory if it doesn't exist | |
os.makedirs(directory, exist_ok=True) | |
# Determine file extension based on data type | |
data_type = data_entry["data_type"] | |
if data_type in ["text", "json"]: | |
ext = "json" | |
file_path = os.path.join(directory, f"{ref_id}.{ext}") | |
# Save as JSON | |
with open(file_path, 'w') as f: | |
if data_type == "json": | |
json.dump(data_entry["data"], f, indent=2) | |
else: | |
json.dump({"data": data_entry["data"], "metadata": data_entry["metadata"]}, f, indent=2) | |
else: | |
# Use pickle for other data types | |
ext = "pkl" | |
file_path = os.path.join(directory, f"{ref_id}.{ext}") | |
# Save as pickle | |
with open(file_path, 'wb') as f: | |
pickle.dump(data_entry, f) | |
self.logger.info(f"Saved data {ref_id} to {file_path}") | |
return file_path | |
def load_data_from_disk(self, file_path: str) -> str: | |
""" | |
Load data from disk into the data store. | |
Returns the reference ID for the loaded data. | |
""" | |
if not os.path.exists(file_path): | |
self.logger.warning(f"File {file_path} not found") | |
return None | |
# Determine file type from extension | |
ext = os.path.splitext(file_path)[1].lower() | |
try: | |
if ext == '.json': | |
# Load JSON data | |
with open(file_path, 'r') as f: | |
data_dict = json.load(f) | |
if isinstance(data_dict, dict) and "data" in data_dict and "metadata" in data_dict: | |
data = data_dict["data"] | |
metadata = data_dict["metadata"] | |
data_type = "text" | |
else: | |
data = data_dict | |
metadata = {} | |
data_type = "json" | |
# Generate a reference ID | |
ref_id = f"loaded_{os.path.basename(file_path).split('.')[0]}" | |
# Store in data store | |
self.data_store[ref_id] = { | |
"data": data, | |
"source_agent": "file_system", | |
"data_type": data_type, | |
"timestamp": datetime.now().isoformat(), | |
"metadata": metadata, | |
"access_count": 0, | |
"last_accessed": None, | |
"file_path": file_path | |
} | |
elif ext == '.pkl': | |
# Load pickle data | |
with open(file_path, 'rb') as f: | |
data_entry = pickle.load(f) | |
# Generate a reference ID | |
ref_id = f"loaded_{os.path.basename(file_path).split('.')[0]}" | |
# Store in data store | |
self.data_store[ref_id] = data_entry | |
else: | |
self.logger.warning(f"Unsupported file extension: {ext}") | |
return None | |
self.logger.info(f"Loaded data from {file_path} with reference ID {ref_id}") | |
return ref_id | |
except Exception as e: | |
self.logger.error(f"Error loading data from {file_path}: {e}") | |
return None | |
def get_data_flow_graph(self) -> Dict[str, Any]: | |
""" | |
Generate a graph representation of data flows between agents. | |
Useful for visualization and debugging. | |
""" | |
nodes = [] | |
edges = [] | |
# Add nodes for each agent mentioned in transformations | |
agents = set() | |
# Extract agents from transformation history | |
for flow_key, flows in self.transformation_history.items(): | |
if flow_key == "transformations": | |
for flow in flows: | |
agents.add(flow["source_agent"]) | |
agents.add(flow["target_agent"]) | |
elif "_to_" in flow_key: | |
source, target = flow_key.split("_to_") | |
agents.add(source) | |
agents.add(target) | |
# Create nodes | |
for agent in agents: | |
nodes.append({ | |
"id": agent, | |
"label": agent.replace("_agent", "").title() | |
}) | |
# Create edges from transformations | |
if "transformations" in self.transformation_history: | |
for transform in self.transformation_history["transformations"]: | |
edges.append({ | |
"from": transform["source_agent"], | |
"to": transform["target_agent"], | |
"label": transform["transformation_type"], | |
"data": { | |
"input_ref": transform["input_ref_id"], | |
"output_ref": transform["output_ref_id"], | |
"timestamp": transform["timestamp"] | |
} | |
}) | |
return { | |
"nodes": nodes, | |
"edges": edges | |
} | |
def cleanup_data(self, older_than_hours: Optional[int] = None) -> int: | |
""" | |
Clean up old data entries to free memory. | |
Returns the number of entries removed. | |
""" | |
if older_than_hours is None: | |
# Clear all | |
count = len(self.data_store) | |
self.data_store = {} | |
self.transformation_history = {} | |
self.logger.info(f"Cleared all {count} data entries") | |
return count | |
# Calculate cutoff time | |
cutoff = datetime.now() - timedelta(hours=older_than_hours) | |
cutoff_str = cutoff.isoformat() | |
# Find entries to remove | |
to_remove = [] | |
for ref_id, entry in self.data_store.items(): | |
timestamp = entry.get("timestamp", "") | |
if timestamp < cutoff_str: | |
to_remove.append(ref_id) | |
# Remove entries | |
for ref_id in to_remove: | |
del self.data_store[ref_id] | |
self.logger.info(f"Removed {len(to_remove)} data entries older than {older_than_hours} hours") | |
return len(to_remove) | |