File size: 10,886 Bytes
6aee514 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
import os
import logging
import time
import functools
import inspect
import re
from typing import Dict, Any, List, Tuple, Callable, Optional
import queue
# Create a singleton log queue that can be shared between modules
class LogQueueSingleton:
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = queue.Queue()
return cls._instance
# Custom logging wrapper for tools
def log_tool_usage(func):
"""
Decorator to log when a tool is being used.
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
logging.info(f"π§ TOOL TRIGGERED: {tool_name}")
try:
# Sanitize arguments to avoid logging sensitive info
safe_args = sanitize_args(args)
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()}
logging.info(f"π TOOL ARGS: {tool_name} called with {len(safe_kwargs)} parameters")
result = await func(*args, **kwargs)
# Log completion but not the actual result content (might be large or sensitive)
logging.info(f"β
TOOL COMPLETED: {tool_name}")
return result
except Exception as e:
logging.error(f"β TOOL ERROR: {tool_name} - {str(e)}")
raise
return wrapper
# Non-async version for synchronous functions
def log_tool_usage_sync(func):
"""
Decorator to log when a synchronous tool is being used.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
tool_name = func.__name__
logging.info(f"π§ TOOL TRIGGERED: {tool_name}")
try:
# Sanitize arguments to avoid logging sensitive info
safe_args = sanitize_args(args)
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()}
logging.info(f"π TOOL ARGS: {tool_name} called with {len(safe_kwargs)} parameters")
result = func(*args, **kwargs)
# Log completion but not the actual result content (might be large or sensitive)
logging.info(f"β
TOOL COMPLETED: {tool_name}")
return result
except Exception as e:
logging.error(f"β TOOL ERROR: {tool_name} - {str(e)}")
raise
return wrapper
def sanitize_args(args):
"""Sanitize arguments for logging to avoid sensitive data."""
safe_args = []
for arg in args:
safe_args.append(sanitize_value(arg))
return safe_args
def sanitize_value(value):
"""Sanitize a value for logging."""
if isinstance(value, str):
if len(value) > 50:
return value[:47] + "..."
return value
elif isinstance(value, (list, tuple)):
return f"{type(value).__name__} with {len(value)} items"
elif isinstance(value, dict):
return f"dict with {len(value)} items"
else:
return f"{type(value).__name__}"
class LoggingToolkitWrapper:
"""
Wrapper class to add logging to toolkit methods.
"""
def __init__(self, toolkit):
self.toolkit = toolkit
self.toolkit_name = toolkit.__class__.__name__
logging.info(f"π¦ TOOLKIT INITIALIZED: {self.toolkit_name}")
def __getattr__(self, name):
attr = getattr(self.toolkit, name)
if callable(attr) and not name.startswith('_'):
if inspect.iscoroutinefunction(attr):
# It's an async function, wrap it with our async decorator
return log_tool_usage(attr)
else:
# For non-async functions
@functools.wraps(attr)
def wrapper(*args, **kwargs):
logging.info(f"π§ TOOL TRIGGERED: {self.toolkit_name}.{name}")
try:
# Sanitize arguments to avoid logging sensitive info
safe_args = sanitize_args(args)
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()}
logging.info(f"π TOOL ARGS: {name} called with {len(safe_kwargs)} parameters")
result = attr(*args, **kwargs)
logging.info(f"β
TOOL COMPLETED: {self.toolkit_name}.{name}")
return result
except Exception as e:
logging.error(f"β TOOL ERROR: {self.toolkit_name}.{name} - {str(e)}")
raise
return wrapper
return attr
def wrap_toolkits(toolkits_list):
"""
Wrap a list of toolkits with logging functionality.
"""
wrapped_toolkits = []
for toolkit in toolkits_list:
wrapped_toolkits.append(LoggingToolkitWrapper(toolkit))
return wrapped_toolkits
# Find this function in logging_utils.py and replace it with this corrected version
# Enhanced run_society function with logging
def enhanced_run_society(society, verbose=True):
"""
Enhanced wrapper around the OWL run_society function with detailed logging.
"""
from owl.utils import run_society as original_run_society
# Log the society setup
user_role = getattr(society, 'user_role_name', 'User')
assistant_role = getattr(society, 'assistant_role_name', 'Assistant')
logging.info(f"π STARTING AGENT SOCIETY: {user_role} & {assistant_role}")
logging.info(f"π TASK: {society.task_prompt[:100]}...")
# Log agent initialization
logging.info(f"π€ INITIALIZING AGENT: {assistant_role}")
# Add hooks to log message exchanges if possible
original_send_message = None
if hasattr(society, 'assistant_agent') and hasattr(society.assistant_agent, 'send_message'):
original_send_message = society.assistant_agent.send_message
@functools.wraps(original_send_message)
def logged_send_message(*args, **kwargs):
logging.info(f"π¬ AGENT MESSAGE: {assistant_role} is processing...")
result = original_send_message(*args, **kwargs)
logging.info(f"π¨ AGENT RESPONSE RECEIVED from {assistant_role}")
return result
society.assistant_agent.send_message = logged_send_message
# Try to log tool usage if possible
if hasattr(society, 'assistant_agent') and hasattr(society.assistant_agent, 'tools'):
tools = getattr(society.assistant_agent, 'tools', [])
logging.info(f"π§° AGENT HAS {len(tools)} TOOLS AVAILABLE")
# Attempt to wrap each tool with logging
for i, tool in enumerate(tools):
if callable(tool):
tool_name = getattr(tool, '__name__', f"tool_{i}")
logging.info(f"π§ TOOL AVAILABLE: {tool_name}")
# Run the original function
start_time = time.time()
try:
logging.info(f"β³ RUNNING SOCIETY...")
# Remove the verbose parameter from the call to original_run_society
answer, chat_history, token_count = original_run_society(society)
end_time = time.time()
duration = end_time - start_time
# Log prompt and completion tokens separately if available
if isinstance(token_count, dict):
prompt_tokens = token_count.get('prompt_token_count', 0)
completion_tokens = token_count.get('completion_token_count', 0)
logging.info(f"π° TOKEN USAGE: Prompt={prompt_tokens}, Completion={completion_tokens}, Total={prompt_tokens + completion_tokens}")
else:
logging.info(f"π° TOKEN USAGE: {token_count}")
logging.info(f"β
AGENT SOCIETY COMPLETED: Duration {duration:.2f}s")
return answer, chat_history, token_count
except Exception as e:
logging.error(f"β AGENT SOCIETY ERROR: {str(e)}")
raise
finally:
# Restore original method if we modified it
if original_send_message and hasattr(society, 'assistant_agent'):
society.assistant_agent.send_message = original_send_message
# Function to sanitize logs to avoid exposing sensitive information
def sanitize_log(log_message):
"""
Sanitize log messages to avoid exposing sensitive information like IPs.
"""
# Simple IP address pattern matching
ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
sanitized = re.sub(ip_pattern, '[REDACTED_IP]', log_message)
# Redact API keys (common patterns)
api_key_pattern = r'(api[_-]?key|apikey|key|token)["\']?\s*[:=]\s*["\']?([a-zA-Z0-9]{20,})["\']?'
sanitized = re.sub(api_key_pattern, r'\1: [REDACTED_API_KEY]', sanitized, flags=re.IGNORECASE)
# Redact URLs with authentication information
url_auth_pattern = r'(https?://)([^:@/]+:[^@/]+@)([^\s/]+)'
sanitized = re.sub(url_auth_pattern, r'\1[REDACTED_AUTH]@\3', sanitized)
return sanitized
# Enhanced StreamlitLogHandler that sanitizes logs
class EnhancedStreamlitLogHandler(logging.Handler):
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
def emit(self, record):
log_entry = self.format(record)
# Sanitize the log to remove sensitive information
sanitized_log = sanitize_log(log_entry)
self.log_queue.put(sanitized_log)
# Add logging to specific OWL functions if possible
# Add this updated function to logging_utils.py
# Add logging to specific OWL functions if possible
def patch_owl_logging():
"""Try to patch specific OWL functions to add logging."""
try:
from owl import utils
# If run_society exists in utils, patch it to log
if hasattr(utils, 'run_society'):
original_run = utils.run_society
def logged_run_society(*args, **kwargs):
logging.info("π¦ OWL run_society called")
try:
result = original_run(*args, **kwargs)
logging.info("π¦ OWL run_society completed")
return result
except Exception as e:
logging.error(f"π¦ OWL run_society error: {str(e)}")
raise
# Replace the original function
utils.run_society = logged_run_society
logging.info("π¦ OWL run_society patched with logging")
return True
except ImportError:
logging.warning("β οΈ Could not patch OWL logging - module not found")
return False
except Exception as e:
logging.warning(f"β οΈ Error patching OWL logging: {str(e)}")
return False |