|
import os |
|
import logging |
|
import time |
|
import functools |
|
import inspect |
|
import re |
|
from typing import Dict, Any, List, Tuple, Callable, Optional |
|
import queue |
|
|
|
|
|
class LogQueueSingleton: |
|
_instance = None |
|
|
|
@classmethod |
|
def get_instance(cls): |
|
if cls._instance is None: |
|
cls._instance = queue.Queue() |
|
return cls._instance |
|
|
|
|
|
def log_tool_usage(func): |
|
""" |
|
Decorator to log when a tool is being used. |
|
""" |
|
@functools.wraps(func) |
|
async def wrapper(*args, **kwargs): |
|
tool_name = func.__name__ |
|
logging.info(f"π§ TOOL TRIGGERED: {tool_name}") |
|
try: |
|
|
|
safe_args = sanitize_args(args) |
|
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()} |
|
logging.info(f"π TOOL ARGS: {tool_name} called with {len(safe_kwargs)} parameters") |
|
|
|
result = await func(*args, **kwargs) |
|
|
|
|
|
logging.info(f"β
TOOL COMPLETED: {tool_name}") |
|
return result |
|
except Exception as e: |
|
logging.error(f"β TOOL ERROR: {tool_name} - {str(e)}") |
|
raise |
|
return wrapper |
|
|
|
|
|
def log_tool_usage_sync(func): |
|
""" |
|
Decorator to log when a synchronous tool is being used. |
|
""" |
|
@functools.wraps(func) |
|
def wrapper(*args, **kwargs): |
|
tool_name = func.__name__ |
|
logging.info(f"π§ TOOL TRIGGERED: {tool_name}") |
|
try: |
|
|
|
safe_args = sanitize_args(args) |
|
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()} |
|
logging.info(f"π TOOL ARGS: {tool_name} called with {len(safe_kwargs)} parameters") |
|
|
|
result = func(*args, **kwargs) |
|
|
|
|
|
logging.info(f"β
TOOL COMPLETED: {tool_name}") |
|
return result |
|
except Exception as e: |
|
logging.error(f"β TOOL ERROR: {tool_name} - {str(e)}") |
|
raise |
|
return wrapper |
|
|
|
def sanitize_args(args): |
|
"""Sanitize arguments for logging to avoid sensitive data.""" |
|
safe_args = [] |
|
for arg in args: |
|
safe_args.append(sanitize_value(arg)) |
|
return safe_args |
|
|
|
def sanitize_value(value): |
|
"""Sanitize a value for logging.""" |
|
if isinstance(value, str): |
|
if len(value) > 50: |
|
return value[:47] + "..." |
|
return value |
|
elif isinstance(value, (list, tuple)): |
|
return f"{type(value).__name__} with {len(value)} items" |
|
elif isinstance(value, dict): |
|
return f"dict with {len(value)} items" |
|
else: |
|
return f"{type(value).__name__}" |
|
|
|
class LoggingToolkitWrapper: |
|
""" |
|
Wrapper class to add logging to toolkit methods. |
|
""" |
|
def __init__(self, toolkit): |
|
self.toolkit = toolkit |
|
self.toolkit_name = toolkit.__class__.__name__ |
|
logging.info(f"π¦ TOOLKIT INITIALIZED: {self.toolkit_name}") |
|
|
|
def __getattr__(self, name): |
|
attr = getattr(self.toolkit, name) |
|
|
|
if callable(attr) and not name.startswith('_'): |
|
if inspect.iscoroutinefunction(attr): |
|
|
|
return log_tool_usage(attr) |
|
else: |
|
|
|
@functools.wraps(attr) |
|
def wrapper(*args, **kwargs): |
|
logging.info(f"π§ TOOL TRIGGERED: {self.toolkit_name}.{name}") |
|
try: |
|
|
|
safe_args = sanitize_args(args) |
|
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()} |
|
logging.info(f"π TOOL ARGS: {name} called with {len(safe_kwargs)} parameters") |
|
|
|
result = attr(*args, **kwargs) |
|
|
|
logging.info(f"β
TOOL COMPLETED: {self.toolkit_name}.{name}") |
|
return result |
|
except Exception as e: |
|
logging.error(f"β TOOL ERROR: {self.toolkit_name}.{name} - {str(e)}") |
|
raise |
|
return wrapper |
|
|
|
return attr |
|
|
|
def wrap_toolkits(toolkits_list): |
|
""" |
|
Wrap a list of toolkits with logging functionality. |
|
""" |
|
wrapped_toolkits = [] |
|
for toolkit in toolkits_list: |
|
wrapped_toolkits.append(LoggingToolkitWrapper(toolkit)) |
|
return wrapped_toolkits |
|
|
|
|
|
|
|
def enhanced_run_society(society, verbose=True): |
|
""" |
|
Enhanced wrapper around the OWL run_society function with detailed logging. |
|
""" |
|
from owl.utils import run_society as original_run_society |
|
|
|
|
|
user_role = getattr(society, 'user_role_name', 'User') |
|
assistant_role = getattr(society, 'assistant_role_name', 'Assistant') |
|
|
|
logging.info(f"π STARTING AGENT SOCIETY: {user_role} & {assistant_role}") |
|
logging.info(f"π TASK: {society.task_prompt[:100]}...") |
|
|
|
|
|
logging.info(f"π€ INITIALIZING AGENT: {assistant_role}") |
|
|
|
|
|
original_send_message = None |
|
if hasattr(society, 'assistant_agent') and hasattr(society.assistant_agent, 'send_message'): |
|
original_send_message = society.assistant_agent.send_message |
|
|
|
@functools.wraps(original_send_message) |
|
def logged_send_message(*args, **kwargs): |
|
logging.info(f"π¬ AGENT MESSAGE: {assistant_role} is processing...") |
|
result = original_send_message(*args, **kwargs) |
|
logging.info(f"π¨ AGENT RESPONSE RECEIVED from {assistant_role}") |
|
return result |
|
|
|
society.assistant_agent.send_message = logged_send_message |
|
|
|
|
|
if hasattr(society, 'assistant_agent') and hasattr(society.assistant_agent, 'tools'): |
|
tools = getattr(society.assistant_agent, 'tools', []) |
|
logging.info(f"π§° AGENT HAS {len(tools)} TOOLS AVAILABLE") |
|
|
|
|
|
for i, tool in enumerate(tools): |
|
if callable(tool): |
|
tool_name = getattr(tool, '__name__', f"tool_{i}") |
|
logging.info(f"π§ TOOL AVAILABLE: {tool_name}") |
|
|
|
|
|
start_time = time.time() |
|
try: |
|
logging.info(f"β³ RUNNING SOCIETY...") |
|
|
|
answer, chat_history, token_count = original_run_society(society) |
|
end_time = time.time() |
|
duration = end_time - start_time |
|
|
|
|
|
if isinstance(token_count, dict): |
|
prompt_tokens = token_count.get('prompt_token_count', 0) |
|
completion_tokens = token_count.get('completion_token_count', 0) |
|
logging.info(f"π° TOKEN USAGE: Prompt={prompt_tokens}, Completion={completion_tokens}, Total={prompt_tokens + completion_tokens}") |
|
else: |
|
logging.info(f"π° TOKEN USAGE: {token_count}") |
|
|
|
logging.info(f"β
AGENT SOCIETY COMPLETED: Duration {duration:.2f}s") |
|
|
|
return answer, chat_history, token_count |
|
except Exception as e: |
|
logging.error(f"β AGENT SOCIETY ERROR: {str(e)}") |
|
raise |
|
finally: |
|
|
|
if original_send_message and hasattr(society, 'assistant_agent'): |
|
society.assistant_agent.send_message = original_send_message |
|
|
|
|
|
|
|
|
|
def sanitize_log(log_message): |
|
""" |
|
Sanitize log messages to avoid exposing sensitive information like IPs. |
|
""" |
|
|
|
ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' |
|
sanitized = re.sub(ip_pattern, '[REDACTED_IP]', log_message) |
|
|
|
|
|
api_key_pattern = r'(api[_-]?key|apikey|key|token)["\']?\s*[:=]\s*["\']?([a-zA-Z0-9]{20,})["\']?' |
|
sanitized = re.sub(api_key_pattern, r'\1: [REDACTED_API_KEY]', sanitized, flags=re.IGNORECASE) |
|
|
|
|
|
url_auth_pattern = r'(https?://)([^:@/]+:[^@/]+@)([^\s/]+)' |
|
sanitized = re.sub(url_auth_pattern, r'\1[REDACTED_AUTH]@\3', sanitized) |
|
|
|
return sanitized |
|
|
|
|
|
class EnhancedStreamlitLogHandler(logging.Handler): |
|
def __init__(self, log_queue): |
|
super().__init__() |
|
self.log_queue = log_queue |
|
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
|
def emit(self, record): |
|
log_entry = self.format(record) |
|
|
|
sanitized_log = sanitize_log(log_entry) |
|
self.log_queue.put(sanitized_log) |
|
|
|
|
|
|
|
|
|
|
|
def patch_owl_logging(): |
|
"""Try to patch specific OWL functions to add logging.""" |
|
try: |
|
from owl import utils |
|
|
|
|
|
if hasattr(utils, 'run_society'): |
|
original_run = utils.run_society |
|
|
|
def logged_run_society(*args, **kwargs): |
|
logging.info("π¦ OWL run_society called") |
|
try: |
|
result = original_run(*args, **kwargs) |
|
logging.info("π¦ OWL run_society completed") |
|
return result |
|
except Exception as e: |
|
logging.error(f"π¦ OWL run_society error: {str(e)}") |
|
raise |
|
|
|
|
|
utils.run_society = logged_run_society |
|
logging.info("π¦ OWL run_society patched with logging") |
|
|
|
return True |
|
except ImportError: |
|
logging.warning("β οΈ Could not patch OWL logging - module not found") |
|
return False |
|
except Exception as e: |
|
logging.warning(f"β οΈ Error patching OWL logging: {str(e)}") |
|
return False |