# app/data_manager.py import logging import os import json import pickle from typing import Dict, List, Optional, Tuple, Union, Any from datetime import datetime, timedelta import hashlib class DataManager: def __init__(self, cache_manager=None): """Initialize the DataManager with optional cache manager.""" self.logger = logging.getLogger(__name__) self.cache_manager = cache_manager # In-memory data store for current session self.data_store = {} # Track data transformations self.transformation_history = {} # Create data directory if it doesn't exist os.makedirs("data/transfers", exist_ok=True) def register_data(self, data_id: str, data: Any, source_agent: str, data_type: str, metadata: Optional[Dict[str, Any]] = None) -> str: """ Register data from an agent into the data store. Returns a unique data reference ID. """ # Generate a unique reference ID timestamp = datetime.now().isoformat() ref_hash = hashlib.md5(f"{data_id}:{timestamp}:{source_agent}".encode()).hexdigest() ref_id = f"{data_type}_{ref_hash[:10]}" # Store data with metadata self.data_store[ref_id] = { "data": data, "source_agent": source_agent, "data_type": data_type, "timestamp": timestamp, "metadata": metadata or {}, "access_count": 0, "last_accessed": None } self.logger.info(f"Registered data {ref_id} from {source_agent} of type {data_type}") return ref_id def get_data(self, ref_id: str, target_agent: str) -> Tuple[Any, Dict[str, Any]]: """ Retrieve data by reference ID. Returns the data and its metadata. """ if ref_id not in self.data_store: self.logger.warning(f"Data {ref_id} not found") return None, {} # Update access information data_entry = self.data_store[ref_id] data_entry["access_count"] += 1 data_entry["last_accessed"] = datetime.now().isoformat() # Log the access self.logger.info(f"Agent {target_agent} accessed data {ref_id} from {data_entry['source_agent']}") # Track data flow flow_key = f"{data_entry['source_agent']}_to_{target_agent}" if flow_key not in self.transformation_history: self.transformation_history[flow_key] = [] self.transformation_history[flow_key].append({ "ref_id": ref_id, "timestamp": datetime.now().isoformat(), "data_type": data_entry["data_type"] }) return data_entry["data"], data_entry["metadata"] def transform_data(self, input_ref_id: str, output_data: Any, source_agent: str, target_agent: str, transformation_type: str, output_type: str, metadata: Optional[Dict[str, Any]] = None) -> str: """ Register a data transformation from one agent to another. Returns a reference ID for the transformed data. """ if input_ref_id not in self.data_store: self.logger.warning(f"Input data {input_ref_id} not found") return None # Get input metadata input_entry = self.data_store[input_ref_id] # Combine metadata combined_metadata = { **input_entry.get("metadata", {}), **(metadata or {}), "transformation_type": transformation_type, "input_ref_id": input_ref_id, "input_type": input_entry["data_type"] } # Register the transformed data output_ref_id = self.register_data( input_ref_id, output_data, source_agent, output_type, combined_metadata) # Track the transformation if "transformations" not in self.transformation_history: self.transformation_history["transformations"] = [] self.transformation_history["transformations"].append({ "input_ref_id": input_ref_id, "output_ref_id": output_ref_id, "source_agent": source_agent, "target_agent": target_agent, "transformation_type": transformation_type, "timestamp": datetime.now().isoformat() }) self.logger.info(f"Transformed data {input_ref_id} to {output_ref_id} " f"({transformation_type}: {input_entry['data_type']} → {output_type})") return output_ref_id def save_data_to_disk(self, ref_id: str, directory: str = "data/transfers") -> str: """ Save data to disk for persistence or for large objects. Returns the file path. """ if ref_id not in self.data_store: self.logger.warning(f"Data {ref_id} not found") return None data_entry = self.data_store[ref_id] # Create directory if it doesn't exist os.makedirs(directory, exist_ok=True) # Determine file extension based on data type data_type = data_entry["data_type"] if data_type in ["text", "json"]: ext = "json" file_path = os.path.join(directory, f"{ref_id}.{ext}") # Save as JSON with open(file_path, 'w') as f: if data_type == "json": json.dump(data_entry["data"], f, indent=2) else: json.dump({"data": data_entry["data"], "metadata": data_entry["metadata"]}, f, indent=2) else: # Use pickle for other data types ext = "pkl" file_path = os.path.join(directory, f"{ref_id}.{ext}") # Save as pickle with open(file_path, 'wb') as f: pickle.dump(data_entry, f) self.logger.info(f"Saved data {ref_id} to {file_path}") return file_path def load_data_from_disk(self, file_path: str) -> str: """ Load data from disk into the data store. Returns the reference ID for the loaded data. """ if not os.path.exists(file_path): self.logger.warning(f"File {file_path} not found") return None # Determine file type from extension ext = os.path.splitext(file_path)[1].lower() try: if ext == '.json': # Load JSON data with open(file_path, 'r') as f: data_dict = json.load(f) if isinstance(data_dict, dict) and "data" in data_dict and "metadata" in data_dict: data = data_dict["data"] metadata = data_dict["metadata"] data_type = "text" else: data = data_dict metadata = {} data_type = "json" # Generate a reference ID ref_id = f"loaded_{os.path.basename(file_path).split('.')[0]}" # Store in data store self.data_store[ref_id] = { "data": data, "source_agent": "file_system", "data_type": data_type, "timestamp": datetime.now().isoformat(), "metadata": metadata, "access_count": 0, "last_accessed": None, "file_path": file_path } elif ext == '.pkl': # Load pickle data with open(file_path, 'rb') as f: data_entry = pickle.load(f) # Generate a reference ID ref_id = f"loaded_{os.path.basename(file_path).split('.')[0]}" # Store in data store self.data_store[ref_id] = data_entry else: self.logger.warning(f"Unsupported file extension: {ext}") return None self.logger.info(f"Loaded data from {file_path} with reference ID {ref_id}") return ref_id except Exception as e: self.logger.error(f"Error loading data from {file_path}: {e}") return None def get_data_flow_graph(self) -> Dict[str, Any]: """ Generate a graph representation of data flows between agents. Useful for visualization and debugging. """ nodes = [] edges = [] # Add nodes for each agent mentioned in transformations agents = set() # Extract agents from transformation history for flow_key, flows in self.transformation_history.items(): if flow_key == "transformations": for flow in flows: agents.add(flow["source_agent"]) agents.add(flow["target_agent"]) elif "_to_" in flow_key: source, target = flow_key.split("_to_") agents.add(source) agents.add(target) # Create nodes for agent in agents: nodes.append({ "id": agent, "label": agent.replace("_agent", "").title() }) # Create edges from transformations if "transformations" in self.transformation_history: for transform in self.transformation_history["transformations"]: edges.append({ "from": transform["source_agent"], "to": transform["target_agent"], "label": transform["transformation_type"], "data": { "input_ref": transform["input_ref_id"], "output_ref": transform["output_ref_id"], "timestamp": transform["timestamp"] } }) return { "nodes": nodes, "edges": edges } def cleanup_data(self, older_than_hours: Optional[int] = None) -> int: """ Clean up old data entries to free memory. Returns the number of entries removed. """ if older_than_hours is None: # Clear all count = len(self.data_store) self.data_store = {} self.transformation_history = {} self.logger.info(f"Cleared all {count} data entries") return count # Calculate cutoff time cutoff = datetime.now() - timedelta(hours=older_than_hours) cutoff_str = cutoff.isoformat() # Find entries to remove to_remove = [] for ref_id, entry in self.data_store.items(): timestamp = entry.get("timestamp", "") if timestamp < cutoff_str: to_remove.append(ref_id) # Remove entries for ref_id in to_remove: del self.data_store[ref_id] self.logger.info(f"Removed {len(to_remove)} data entries older than {older_than_hours} hours") return len(to_remove)