import streamlit as st import interpreter import os from streamlit_extras.colored_header import colored_header from streamlit_lottie import st_lottie import json import requests import re from datetime import datetime, timezone from typing import Dict, Any from streamlit.runtime.scriptrunner import get_script_run_ctx import time from tenacity import retry, stop_after_attempt, wait_exponential import shutil from pathlib import Path import hashlib import streamlit_file_browser as sfb # Create interpreter instance interpreter_instance = interpreter.Interpreter() # Add retry decorator for API calls @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def load_lottieurl(url: str) -> Dict[str, Any]: try: r = requests.get(url, timeout=10) # Add timeout r.raise_for_status() # Raise exception for bad status codes return r.json() except requests.exceptions.RequestException as e: st.error(f"Failed to load animation: {str(e)}") return None # Add error handling for interpreter calls def safe_interpreter_call(func, *args, **kwargs): try: return func(*args, **kwargs) except Exception as e: error_msg = str(e) if "API key" in error_msg.lower(): st.error("❌ API key error. Please check your API key in settings.") elif "rate limit" in error_msg.lower(): st.error("⏳ Rate limit exceeded. Please wait a moment and try again.") else: st.error(f"❌ Error: {error_msg}") return None def get_session_id(): ctx = get_script_run_ctx() return ctx.session_id if ctx else None def save_settings(): """Save current settings to session state and ensure they persist""" settings = st.session_state.settings session_id = get_session_id() if session_id: # Save all relevant state st.session_state[f"persistent_settings_{session_id}"] = settings.copy() st.session_state[f"persistent_model_{session_id}"] = st.session_state.selected_model st.session_state[f"persistent_audio_{session_id}"] = st.session_state.selected_audio_track def load_settings(): """Load settings from persistent storage""" session_id = get_session_id() if session_id: # Load all saved state if f"persistent_settings_{session_id}" in st.session_state: st.session_state.settings = st.session_state[f"persistent_settings_{session_id}"].copy() if f"persistent_model_{session_id}" in st.session_state: st.session_state.selected_model = st.session_state[f"persistent_model_{session_id}"] if f"persistent_audio_{session_id}" in st.session_state: st.session_state.selected_audio_track = st.session_state[f"persistent_audio_{session_id}"] def init_session_state(): """Initialize session state with proper error handling and defaults""" try: # Load persistent settings first load_settings() # Define all required session state keys and their defaults default_state = { "settings": { "api_key": os.getenv("HF_API_KEY", ""), "api_base": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-32B-Instruct", "model": "huggingface/Qwen/Qwen2.5-Coder-32B-Instruct", "auto_run": True, "theme": "light", "code_style": "monokai", "custom_instructions": "", "safe_mode": "off", "conversation_history": False, "os_mode": False, "os_restricted_mode": False, "allowed_paths": ["/"], "use_workspace": True, }, "selected_model": None, # Will be set from settings["model"] "selected_audio_track": "Ambient", "uploaded_files": [], "settings_open": False, "messages": [], "original_system_message": interpreter_instance.system_message, "session_dir": None, "last_request_time": time.time() } # Initialize all required state variables for key, default_value in default_state.items(): if key not in st.session_state: st.session_state[key] = default_value # Ensure selected_model is set from settings if not already set if not st.session_state.selected_model: st.session_state.selected_model = st.session_state.settings["model"] # Always reset interpreter for fresh session interpreter_instance.reset() # Apply initial settings apply_interpreter_settings() except Exception as e: st.error(f"Error initializing session state: {str(e)}") # Provide fallback values for critical settings if "settings" not in st.session_state: st.session_state.settings = default_state["settings"] def apply_interpreter_settings(): """Apply interpreter settings""" interpreter_instance.api_key = st.session_state.settings["api_key"] interpreter_instance.api_base = st.session_state.settings["api_base"] interpreter_instance.model = st.session_state.settings["model"] interpreter_instance.auto_run = st.session_state.settings["auto_run"] interpreter_instance.temperature = 0.7 interpreter_instance.safe_mode = "off" interpreter_instance.conversation_history = True # Force this to False # Set allowed paths directly on interpreter if st.session_state.settings["use_workspace"]: interpreter_instance.allowed_paths = [get_session_folder()] else: interpreter_instance.allowed_paths = st.session_state.settings["allowed_paths"] # Update system message interpreter_instance.system_message = st.session_state.original_system_message if st.session_state.settings["custom_instructions"]: interpreter_instance.system_message += f"\n\nAdditional Instructions:\n{st.session_state.settings['custom_instructions']}" if st.session_state.settings["use_workspace"]: workspace_path = get_session_folder() interpreter_instance.system_message += f"\n\nWorkspace Path: {workspace_path}\nYou can only access files in this workspace directory." class OutputController: def __init__(self): self.loop_detection = { 'last_content': None, 'repeat_count': 0, 'last_timestamp': datetime.now(), 'number_pattern': re.compile(r'^[\d\s.]+$'), 'terminal_spam': re.compile(r'^[0-9\s.]{20,}$'), # Long number sequences 'max_repeats': 3, 'timeout_seconds': 2 } def is_loop_detected(self, content: str) -> bool: now = datetime.now() content = str(content).strip() # Immediately skip terminal spam if self.loop_detection['terminal_spam'].match(content): return True if (content != self.loop_detection['last_content'] or (now - self.loop_detection['last_timestamp']).seconds > self.loop_detection['timeout_seconds']): self.loop_detection.update({ 'repeat_count': 0, 'last_content': content, 'last_timestamp': now }) return False # More aggressive number detection if self.loop_detection['number_pattern'].match(content): self.loop_detection['repeat_count'] += 1 if self.loop_detection['repeat_count'] > 2: # Reduced tolerance return True return False # Move clear_chat_history outside main function def clear_chat_history(): """Completely reset the chat state""" interpreter_instance.messages = [] # Clear interpreter's message history interpreter_instance.reset() # Full reset of interpreter st.session_state.messages = [] # Clear UI message history save_settings() # Ensure settings persist after clear st.success("Chat history cleared!") st.rerun() # Move update_model_settings outside main function def update_model_settings(): st.session_state.selected_model = st.session_state.model_select st.session_state.settings.update({ "model": st.session_state.model_select, "api_base": model_options[st.session_state.model_select] }) save_settings() # Save settings after update # Define audio_tracks at module level audio_tracks = { "Ambient": "https://cdn.pixabay.com/download/audio/2022/02/22/audio_d1718ab41b.mp3", "Lo-Fi": "https://cdn.pixabay.com/download/audio/2022/03/10/audio_2d7b426f87.mp3", "Focus": "https://cdn.pixabay.com/download/audio/2022/01/18/audio_d0c6bf3c0e.mp3" } # Move to module level (top of file with other constants) model_options = { # Default and recommended model "huggingface/Qwen/Qwen2.5-Coder-32B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-32B-Instruct", # Default # Other Qwen 2.5 Coder Series "huggingface/Qwen/Qwen2.5-Coder-14B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-14B-Instruct", "huggingface/Qwen/Qwen2.5-Coder-7B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-7B-Instruct", # Qwen 2.5 General Series "huggingface/Qwen/Qwen2.5-72B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-72B-Instruct", "huggingface/Qwen/Qwen2.5-32B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-32B-Instruct", "huggingface/Qwen/Qwen2.5-7B-Instruct": "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-7B-Instruct", # Other verified top performers "huggingface/mistralai/Mixtral-8x7B-Instruct-v0.1": "https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1", "huggingface/mistralai/Mistral-7B-Instruct-v0.2": "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.2", "huggingface/codellama/CodeLlama-34b-Instruct-hf": "https://api-inference.huggingface.co/models/codellama/CodeLlama-34b-Instruct-hf", "huggingface/codellama/CodeLlama-13b-Instruct-hf": "https://api-inference.huggingface.co/models/codellama/CodeLlama-13b-Instruct-hf", "huggingface/deepseek-ai/deepseek-coder-6.7b-instruct": "https://api-inference.huggingface.co/models/deepseek-ai/deepseek-coder-6.7b-instruct", "huggingface/microsoft/phi-2": "https://api-inference.huggingface.co/models/microsoft/phi-2", "huggingface/bigcode/starcoder2-15b": "https://api-inference.huggingface.co/models/bigcode/starcoder2-15b", } def get_theme_styles(theme: str = "light") -> str: """Get theme styles based on Streamlit's native theming""" return """ /* Base styles */ body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif; line-height: 1.6; } /* Message container styles */ .stChatMessage { margin: 1rem 0; padding: 1rem; border-radius: 10px; border: 1px solid rgba(128, 128, 128, 0.2); } /* Code block styles */ pre { border-radius: 8px !important; padding: 1rem !important; margin: 1rem 0 !important; border: 1px solid rgba(128, 128, 128, 0.2) !important; overflow-x: auto !important; } code { font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace !important; font-size: 0.9em !important; padding: 0.2em 0.4em !important; border-radius: 3px !important; } /* Output block styles */ .output-block { border-radius: 8px; padding: 1rem; margin: 0.5rem 0; font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace; font-size: 0.9em; border-left: 4px solid #FF4B4B; opacity: 0.9; } /* Message spacing */ .stMarkdown { line-height: 1.6; margin: 0.5rem 0; } .stMarkdown p { margin: 0.75rem 0; } /* Button styles */ .stButton button { border-radius: 20px !important; padding: 0.4rem 1rem !important; border: 1px solid rgba(128, 128, 128, 0.2) !important; font-weight: 500 !important; transition: all 0.3s ease !important; } /* Header styles */ .stMarkdown h1, .stMarkdown h2, .stMarkdown h3 { margin: 1.5rem 0 1rem 0; font-weight: 600; } /* Input field styles */ .stTextInput > div > div > input { border-radius: 10px !important; border: 1px solid rgba(128, 128, 128, 0.2) !important; padding: 0.75rem 1rem !important; font-size: 1rem !important; } /* Chat message styles */ .chat-message { padding: 1.25rem; border-radius: 12px; margin: 1rem 0; border: 1px solid rgba(128, 128, 128, 0.2); box-shadow: 0 2px 4px rgba(0,0,0,0.05); } /* User message specific styles */ .user-message { margin-left: auto; max-width: 80%; } /* Assistant message specific styles */ .assistant-message { margin-right: auto; max-width: 80%; } /* Error and warning styles */ .error-message { border-left: 4px solid #DC2626; padding: 1rem; margin: 1rem 0; border-radius: 8px; opacity: 0.9; } .warning-message { border-left: 4px solid #F59E0B; padding: 1rem; margin: 1rem 0; border-radius: 8px; opacity: 0.9; } /* Success message styles */ .success-message { border-left: 4px solid #059669; padding: 1rem; margin: 1rem 0; border-radius: 8px; opacity: 0.9; } /* Floating audio player styles */ .floating-audio { position: fixed; bottom: 20px; right: 20px; z-index: 9999; padding: 1rem; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); backdrop-filter: blur(10px); border: 1px solid rgba(128, 128, 128, 0.2); max-width: 300px; transition: all 0.3s ease; } .floating-audio:hover { transform: translateY(-2px); box-shadow: 0 6px 8px rgba(0,0,0,0.15); } .floating-audio audio { width: 250px; height: 40px; opacity: 0.9; border-radius: 8px; margin-bottom: 0.5rem; } .floating-audio select { width: 100%; padding: 0.5rem; border-radius: 8px; border: 1px solid rgba(128, 128, 128, 0.2); font-size: 0.9rem; cursor: pointer; } /* File browser styles */ .file-browser { border-radius: 12px; padding: 1rem; margin: 1rem 0; border: 1px solid rgba(128, 128, 128, 0.2); } .file-item { display: flex; align-items: center; padding: 0.75rem; border-bottom: 1px solid rgba(128, 128, 128, 0.2); transition: all 0.2s ease; } .file-item:hover { opacity: 0.8; } /* Settings panel styles */ .settings-panel { border-radius: 12px; padding: 1.5rem; margin: 1rem 0; border: 1px solid rgba(128, 128, 128, 0.2); } /* Tab styles */ .stTabs { border-radius: 8px; padding: 0.5rem; } .stTab { border-radius: 8px !important; padding: 0.5rem 1rem !important; } """ class ChatMessage: def __init__(self, content="", message_type="text", role="assistant"): self.content = content self.type = message_type self.role = role self.timestamp = datetime.now(timezone.utc) self.id = hashlib.md5(f"{self.timestamp.isoformat()}-{content}".encode()).hexdigest() self._processed_content = None # Cache for processed content def to_dict(self): return { "content": self.content, "type": self.type, "role": self.role, "timestamp": self.timestamp.isoformat(), "id": self.id } @classmethod def from_dict(cls, data): msg = cls( content=data["content"], message_type=data["type"], role=data["role"] ) msg.timestamp = datetime.fromisoformat(data["timestamp"]) msg.id = data["id"] return msg def get_formatted_content(self, force_refresh=False): """Get formatted content with caching""" if self._processed_content is None or force_refresh: self._processed_content = format_message(self) return self._processed_content def format_message(message: ChatMessage) -> str: """Format message with improved markdown and syntax highlighting for large responses""" try: if message.type == "code": # Default to Python for code blocks lang = "python" content = message.content.strip() # Enhanced language detection if content.startswith("```"): first_line = content.split("\n")[0] lang = first_line.replace("```", "").strip() or "python" content = "\n".join(content.split("\n")[1:]) if content.endswith("```"): content = content[:-3] # Extended language detection if "." in content: ext_match = re.search(r'\.(py|js|html|css|json|md|sql|sh|bash|yaml|yml|java|cpp|c|go|rs|ts)$', content.lower()) if ext_match: lang_map = { 'py': 'python', 'js': 'javascript', 'html': 'html', 'css': 'css', 'json': 'json', 'md': 'markdown', 'sql': 'sql', 'sh': 'bash', 'bash': 'bash', 'yaml': 'yaml', 'yml': 'yaml', 'java': 'java', 'cpp': 'cpp', 'c': 'c', 'go': 'go', 'rs': 'rust', 'ts': 'typescript' } lang = lang_map.get(ext_match.group(1), lang) # Format code with proper spacing and syntax formatted_content = f"```{lang}\n{content.strip()}\n```" # Add visual separator for multiple code blocks if "\n\n```" in message.content: formatted_content = f"\n{formatted_content}\n" return formatted_content elif message.type == "error": return f'
❌ **Error:** {message.content}
' elif message.type == "warning": return f'
⚠️ **Warning:** {message.content}
' elif message.type == "success": return f'
✅ {message.content}
' else: # Clean and format regular text content = message.content.strip() # Handle inline code with better spacing content = re.sub(r'(?\1 ', content) content = re.sub(r'\s+', ' ', content) # Normalize spaces # Handle markdown links with proper spacing content = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\1', content) # Improve list formatting content = re.sub(r'(\n\s*[-*]\s+[^\n]+)(?=\n\s*[^-*]|\Z)', r'\1\n', content) # Enhanced console output formatting if "$ " in content or "%" in content: lines = content.split("\n") formatted_lines = [] in_output = False output_buffer = [] for line in lines: if line.strip().startswith(("$ ", "%")): # Format collected output if output_buffer: formatted_lines.append(f'
{"".join(output_buffer)}
') output_buffer = [] # Format command with proper styling formatted_lines.append(f'{line}') in_output = True elif in_output: # Collect output lines output_buffer.append(line + "\n") else: # Regular text line formatted_lines.append(line) # Handle any remaining output if output_buffer: formatted_lines.append(f'
{"".join(output_buffer)}
') content = "\n".join(formatted_lines) # Clean up excessive newlines content = re.sub(r'\n{3,}', '\n\n', content) return content except Exception as e: st.error(f"Error formatting message: {str(e)}") return message.content def handle_user_input(user_input: str): """Handle user input with improved streaming and chunking for large responses""" if not user_input.strip(): return # Rate limiting with exponential backoff current_time = time.time() if hasattr(st.session_state, 'last_request_time'): time_since_last = current_time - st.session_state.last_request_time min_interval = 1.0 # Base interval in seconds if hasattr(st.session_state, 'request_count'): st.session_state.request_count += 1 if st.session_state.request_count > 5: min_interval = min(5.0, min_interval * 1.5) else: st.session_state.request_count = 1 if time_since_last < min_interval: st.warning(f"Please wait {min_interval - time_since_last:.1f} seconds before sending another message...") time.sleep(min_interval - time_since_last) st.session_state.last_request_time = current_time st.session_state.request_count = 1 # Add user message user_message = ChatMessage(user_input, "text", "user") st.session_state.messages.append(user_message) with st.chat_message("user", avatar="🧑‍💻"): st.markdown(user_message.get_formatted_content()) # Process with interpreter try: with st.chat_message("assistant", avatar="🤖"): message_container = st.container() with st.spinner("Thinking..."): # Initialize buffers and state message_buffer = [] code_buffer = [] current_chunk = { 'type': 'message', 'content': '', 'language': None } # Create placeholder for streaming updates with message_container: response_placeholder = st.empty() # Enhanced streaming with chunking for chunk in interpreter_instance.chat(user_input, stream=True, display=False): if isinstance(chunk, dict): content = str(chunk.get('content', '')) # Convert content to string chunk_type = chunk.get('type', 'message') # Skip empty chunks if not content: continue # Handle different chunk types if chunk_type == 'message': # Flush code buffer if exists if code_buffer: code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string if code_text.strip(): message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n") code_buffer = [] # Add message content message_buffer.append(content) current_chunk = {'type': 'message', 'content': content} elif chunk_type in ['code', 'console']: # Start new code block if needed if current_chunk['type'] != 'code': if code_buffer: # Flush previous code buffer code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string if code_text.strip(): message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n") code_buffer = [] current_chunk = { 'type': 'code', 'language': chunk.get('format', 'python') } # Accumulate code content code_buffer.append(content) # Update display with proper chunking try: display_content = ''.join(str(item) for item in message_buffer) # Convert each item to string if code_buffer: # Add current code buffer if exists code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string if code_text.strip(): display_content += f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```" # Use markdown for display with proper formatting response_placeholder.markdown(display_content) except Exception as e: st.error(f"Error updating display: {str(e)}") # Final cleanup and display try: # Handle any remaining code buffer if code_buffer: code_text = ''.join(str(item) for item in code_buffer) # Convert each item to string if code_text.strip(): message_buffer.append(f"\n```{current_chunk['language'] or 'python'}\n{code_text.strip()}\n```\n") # Prepare final response final_response = ''.join(str(item) for item in message_buffer) # Convert each item to string # Create and store assistant message assistant_message = ChatMessage(final_response, "text", "assistant") st.session_state.messages.append(assistant_message) # Final display update response_placeholder.markdown(assistant_message.get_formatted_content()) except Exception as e: st.error(f"Error in final display update: {str(e)}") response_placeholder.markdown(final_response) except Exception as e: error_msg = ChatMessage(str(e), "error", "assistant") st.session_state.messages.append(error_msg) st.error(error_msg.get_formatted_content()) # Add file handling functions def get_session_folder() -> str: """Get or create the session folder with improved error handling and validation""" if not st.session_state.settings["use_workspace"]: return "/" try: # Use custom workspace path if set if st.session_state.get("session_dir"): workspace_dir = Path(st.session_state.session_dir) # Validate path is absolute and exists if not workspace_dir.is_absolute(): workspace_dir = workspace_dir.resolve() else: # Use default workspace directory workspace_dir = Path("autointerpreter-workspace").resolve() # Validate and create directory try: # Check if path is writable if workspace_dir.exists() and not os.access(str(workspace_dir), os.W_OK): raise PermissionError(f"No write permission for {workspace_dir}") workspace_dir.mkdir(parents=True, exist_ok=True, mode=0o755) # Verify directory was created successfully if not workspace_dir.is_dir(): raise OSError(f"Failed to create directory {workspace_dir}") except (PermissionError, OSError) as e: st.error(f"Error accessing workspace directory: {str(e)}") # Fallback to temporary directory import tempfile workspace_dir = Path(tempfile.mkdtemp(prefix="autointerpreter-")) st.warning(f"Using temporary workspace: {workspace_dir}") # Create session directory if needed if not st.session_state.get("session_dir"): timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H_%M") session_dir = workspace_dir / f"session-{timestamp}" try: session_dir.mkdir(exist_ok=True, mode=0o755) if not session_dir.is_dir(): raise OSError(f"Failed to create session directory {session_dir}") st.session_state.session_dir = str(session_dir) st.session_state.uploaded_files = [] update_custom_instructions() except Exception as e: st.error(f"Error creating session directory: {str(e)}") return str(workspace_dir) return st.session_state.session_dir except Exception as e: st.error(f"Error managing workspace: {str(e)}") return "/" def handle_file_upload(uploaded_files): """Enhanced file upload handling with better error handling and validation""" if not uploaded_files: return try: session_dir = Path(get_session_folder()) if str(session_dir) == "/": st.error("Invalid workspace configuration!") return if not session_dir.exists(): st.error("Session directory does not exist!") try: session_dir.mkdir(parents=True, exist_ok=True, mode=0o755) st.success("Created new session directory.") except Exception as e: st.error(f"Failed to create session directory: {str(e)}") return for uploaded_file in uploaded_files: try: # Validate file if uploaded_file.size == 0: st.warning(f"Skipping empty file: {uploaded_file.name}") continue # Sanitize filename safe_filename = Path(uploaded_file.name).name safe_filename = re.sub(r'[^a-zA-Z0-9._-]', '_', safe_filename) # Validate extension allowed_extensions = {'.txt', '.py', '.js', '.html', '.css', '.json', '.md', '.csv', '.yml', '.yaml'} file_ext = Path(safe_filename).suffix.lower() if file_ext not in allowed_extensions: st.warning(f"Unsupported file type: {file_ext}. Skipping {safe_filename}") continue file_path = session_dir / safe_filename # Handle file conflicts if file_path.exists(): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") name_parts = safe_filename.rsplit('.', 1) safe_filename = f"{name_parts[0]}_{timestamp}.{name_parts[1]}" if len(name_parts) > 1 else f"{safe_filename}_{timestamp}" file_path = session_dir / safe_filename # Check file size file_size = len(uploaded_file.getvalue()) if file_size > 100 * 1024 * 1024: # 100MB limit st.warning(f"File {safe_filename} is too large (>{file_size/(1024*1024):.1f}MB). Skipping...") continue # Save file with error handling try: with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) # Add to session state file_info = { "name": safe_filename, "path": str(file_path), "size": uploaded_file.size, "type": uploaded_file.type or "application/octet-stream", "timestamp": datetime.now(timezone.utc) } if "uploaded_files" not in st.session_state: st.session_state.uploaded_files = [] # Update or append file info existing_file = next((f for f in st.session_state.uploaded_files if f["path"] == str(file_path)), None) if existing_file: existing_file.update(file_info) else: st.session_state.uploaded_files.append(file_info) st.success(f"Successfully uploaded: {safe_filename}") except Exception as e: st.error(f"Error saving {safe_filename}: {str(e)}") if file_path.exists(): try: file_path.unlink() except Exception as cleanup_error: st.error(f"Error cleaning up partial file: {str(cleanup_error)}") continue except Exception as e: st.error(f"Error processing upload {uploaded_file.name}: {str(e)}") continue # Update instructions after successful uploads update_custom_instructions() except Exception as e: st.error(f"Error handling file uploads: {str(e)}") def update_custom_instructions(): """Update custom instructions with workspace info""" workspace_path = get_session_folder() files_info = "" if st.session_state.get("uploaded_files"): files = [f"{info['name']} ({info['size'] / 1024:.1f} KB)" for info in st.session_state.uploaded_files] files_info = "\nFiles: " + ", ".join(files) workspace_info = f"\nWorking Space: {workspace_path}{files_info}" # Update instructions current_instructions = st.session_state.settings.get("custom_instructions", "").strip() current_instructions = re.sub(r'\nWorking Space:.*?(?=\n|$)', '', current_instructions, flags=re.MULTILINE) current_instructions = re.sub(r'\nFiles:.*?(?=\n|$)', '', current_instructions, flags=re.MULTILINE) new_instructions = current_instructions + ("\n" if current_instructions else "") + workspace_info st.session_state.settings["custom_instructions"] = new_instructions apply_interpreter_settings() def get_file_icon(file_type: str) -> str: """Get appropriate icon for file type""" if file_type.startswith('image/'): return "🖼️" elif file_type.startswith('text/'): return "📄" elif file_type.startswith('application/pdf'): return "📑" elif file_type.startswith('application/json'): return "📋" elif 'python' in file_type.lower(): return "🐍" elif 'javascript' in file_type.lower(): return "📜" elif 'spreadsheet' in file_type.lower(): return "📊" else: return "📎" # Add audio track selection handler def update_audio_track(): """Update the selected audio track with error handling""" try: new_track = st.session_state.audio_select if new_track in audio_tracks: st.session_state.selected_audio_track = new_track save_settings() # Persist the audio track selection else: st.error(f"Invalid audio track selection: {new_track}") # Fallback to default track st.session_state.selected_audio_track = "Ambient" except Exception as e: st.error(f"Error updating audio track: {str(e)}") st.session_state.selected_audio_track = "Ambient" def create_audio_player(): """Create a more robust audio player with error handling""" return """
Your browser does not support the audio element.
""".format( audio_tracks=audio_tracks, st=st ) def cleanup_old_sessions(): """Clean up old session directories with improved error handling and format detection""" try: workspace_dir = Path("autointerpreter-workspace") if not workspace_dir.exists(): return current_time = datetime.now(timezone.utc) current_session = st.session_state.get("session_dir") # Define supported timestamp formats timestamp_formats = [ "%Y-%m-%d_%H_%M", # Standard format: 2024-01-20_14_30 "%Y%m%d_%H%M%S", # Compact format: 20240120_143000 ] for session_dir in workspace_dir.glob("session-*"): try: if str(session_dir) == current_session: continue dir_name = session_dir.name timestamp_str = None session_time = None # Try to extract timestamp based on different patterns if dir_name.startswith("session-"): # Try standard timestamp format first timestamp_str = dir_name.replace("session-", "") # Try each supported format for fmt in timestamp_formats: try: session_time = datetime.strptime(timestamp_str, fmt).replace(tzinfo=timezone.utc) break except ValueError: continue # If no supported format matches, check for UUID-like format if not session_time and ( len(timestamp_str) == 36 or # Standard UUID len(timestamp_str) == 32 or # Compact UUID '-' in timestamp_str # Any UUID-like string ): # For UUID-based sessions, use file modification time try: mtime = session_dir.stat().st_mtime session_time = datetime.fromtimestamp(mtime, tz=timezone.utc) except Exception: continue if not session_time: # Skip without warning for unrecognized formats continue # If older than 24 hours if (current_time - session_time).days >= 1: try: if session_dir.exists(): # Double check existence # Check if directory is empty has_files = any(session_dir.iterdir()) if has_files: # Move to archive instead of deleting if contains files archive_dir = workspace_dir / "archived_sessions" archive_dir.mkdir(exist_ok=True) new_name = f"archived_{dir_name}_{current_time.strftime('%Y%m%d_%H%M%S')}" shutil.move(str(session_dir), str(archive_dir / new_name)) print(f"Archived session with files: {session_dir}") else: # Delete if empty shutil.rmtree(session_dir) print(f"Cleaned up empty session: {session_dir}") except Exception as e: print(f"Error processing session directory {session_dir}: {str(e)}") except Exception as e: print(f"Error processing session directory {session_dir}: {str(e)}") continue except Exception as e: print(f"Error in cleanup: {str(e)}") def main(): # 1. Initialize session state init_session_state() # 2. Page config (should be at top) st.set_page_config( page_title="AutoInterpreter", layout="wide", initial_sidebar_state="collapsed", menu_items={ 'Get Help': 'https://github.com/samihalawa/autointerpreter', 'Report a bug': "https://github.com/samihalawa/autointerpreter/issues", 'About': "# AutoInterpreter\nThe Final AI Coding Experience" } ) # 3. Load and apply styles st.markdown(""" """, unsafe_allow_html=True) # 4. Setup UI components lottie_coding = load_lottieurl('https://assets5.lottiefiles.com/packages/lf20_fcfjwiyb.json') # 5. Create header and settings UI col1, col2, col3 = st.columns([0.2, 0.6, 0.2]) with col2: st_lottie(lottie_coding, height=200, key="coding") colored_header( label="AutoInterpreter", description="Run Any Code. The Final AI Coding Experience.", color_name="red-70" ) with col3: st.markdown('
', unsafe_allow_html=True) btn_col1, btn_col2 = st.columns([1, 1]) with btn_col1: # Toggle settings when button is clicked if st.button("⚙️", help="Configure AutoInterpreter", key="settings_btn"): st.session_state.settings_open = not st.session_state.settings_open with btn_col2: if st.button("❌", help="Clear chat history", key="clear_btn"): if st.button("✓", help="Confirm clear", key="confirm_btn"): clear_chat_history() st.markdown('
', unsafe_allow_html=True) # Move theme application before settings panel theme = st.session_state.settings.get("theme", "light") st.markdown(f"", unsafe_allow_html=True) # Enhanced Settings modal with tabs if st.session_state.settings_open: with st.expander("Settings Panel", expanded=True): # Add close button to top-right of settings panel col1, col2 = st.columns([0.9, 0.1]) with col2: if st.button("✖️", help="Close settings", key="close_settings"): st.session_state.settings_open = False save_settings() # Save settings before closing st.rerun() # Settings tabs tab1, tab2, tab3 = st.tabs(["API & Model", "Code Settings", "Assistant Settings"]) with tab1: # API Settings st.text_input( "API Key", value=st.session_state.settings["api_key"], type="password", key="api_key", help="Enter your HuggingFace API key" ) st.markdown("---") st.markdown("### 🤖 Model Selection") # Current model display current_model = st.session_state.selected_model.split('/')[-1] st.info(f"Current Model: **{current_model}**", icon="🤖") # Model Selection with categories model_category = st.radio( "Model Category", ["Qwen Coder Series", "Qwen General Series", "Other Models"], help="Select model category" ) filtered_models = { k: v for k, v in model_options.items() if ( (model_category == "Qwen Coder Series" and "Qwen2.5-Coder" in k) or (model_category == "Qwen General Series" and "Qwen2.5-" in k and "Coder" not in k) or (model_category == "Other Models" and "Qwen" not in k) ) } selected_model = st.selectbox( "Select Model", options=list(filtered_models.keys()), format_func=lambda x: x.split('/')[-1], key="model_select", help="Choose your preferred model" ) # Theme Selection st.markdown("---") st.markdown("### 🎨 Theme") theme_selection = st.selectbox( "UI Theme", options=["light", "dark"], index=0 if theme == "light" else 1, key="theme_select", help="Select the UI theme" ) # Save Settings Button - More prominent st.markdown("---") col1, col2 = st.columns([0.7, 0.3]) with col2: if st.button("💾 Save Changes", type="primary", use_container_width=True): # Update settings st.session_state.settings.update({ "api_key": st.session_state.api_key, "model": st.session_state.model_select, "api_base": model_options[st.session_state.model_select], "theme": st.session_state.theme_select }) st.session_state.selected_model = st.session_state.model_select save_settings() st.session_state.settings_open = False st.success("✅ Settings saved successfully!") time.sleep(0.5) st.rerun() with tab2: # Code Execution Settings col1, col2 = st.columns(2) with col1: st.toggle( "Auto Run Code", value=st.session_state.settings["auto_run"], key="auto_run", help="Automatically execute code without confirmation", on_change=lambda: st.session_state.settings.update({"auto_run": st.session_state.auto_run}) ) st.selectbox( "Code Style", options=["monokai", "github", "dracula"], index=0, key="code_style", help="Code highlighting theme", on_change=lambda: st.session_state.settings.update({"code_style": st.session_state.code_style}) ) with col2: st.selectbox( "Safe Mode", options=["off", "ask", "auto"], index=0, key="safe_mode", help="Code execution safety level", on_change=lambda: st.session_state.settings.update({"safe_mode": st.session_state.safe_mode}) ) with tab3: # Assistant Behavior Settings """ st.toggle( "Save Chat History", value=st.session_state.settings["conversation_history"], key="conversation_history", help="Preserve conversation between sessions", on_change=lambda: st.session_state.settings.update({"conversation_history": st.session_state.conversation_history}) ) """ # System Message Settings st.text_area( "Default System Message", value=interpreter_instance.system_message, disabled=True, help="Base instructions for the AI assistant", height=100 ) st.text_area( "Custom Instructions", value=st.session_state.settings["custom_instructions"], key="custom_instructions", help="Additional instructions for the assistant", height=100, on_change=lambda: st.session_state.settings.update({"custom_instructions": st.session_state.custom_instructions}) ) # OS Control Settings st.markdown("---") st.markdown("### System Access Settings") st.markdown("**Warning**: OS mode enables system control (mouse, keyboard, screen access)") st.toggle( "Enable System Control", value=st.session_state.settings["os_mode"], key="os_mode", help="Allow assistant to control system", on_change=lambda: st.session_state.settings.update({"os_mode": st.session_state.os_mode}) ) if st.session_state.settings["os_mode"]: st.toggle( "Restricted Access", value=st.session_state.settings["os_restricted_mode"], key="os_restricted_mode", help="Limit system access to specific paths", on_change=lambda: st.session_state.settings.update({"os_restricted_mode": st.session_state.os_restricted_mode}) ) if st.session_state.settings["os_restricted_mode"]: st.text_area( "Allowed Paths", value="\n".join(st.session_state.settings["allowed_paths"]), help="One path per line", key="allowed_paths", on_change=lambda: st.session_state.settings.update({ "allowed_paths": [p.strip() for p in st.session_state.allowed_paths.split("\n") if p.strip()] }) ) # Add current model display after header col1, col2, col3 = st.columns([0.2, 0.6, 0.2]) with col3: st.markdown( f"""
🤖 Model: {st.session_state.selected_model.split('/')[-1]}
""", unsafe_allow_html=True ) # Initialize selected track in session state if not exists if "selected_audio_track" not in st.session_state: st.session_state.selected_audio_track = "Ambient" # Create floating audio player st.markdown(create_audio_player(), unsafe_allow_html=True) # Simplified workspace control in sidebar with st.sidebar: st.markdown("### 🗂️ Workspace Control") # Single workspace toggle in sidebar st.toggle( "Use Workspace", value=st.session_state.settings["use_workspace"], key="use_workspace", help="Restrict file access to workspace directory only", on_change=lambda: ( st.session_state.settings.update({"use_workspace": st.session_state.use_workspace}), apply_interpreter_settings() ) ) if st.session_state.settings["use_workspace"]: st.markdown("### 📂 Workspace") # Add workspace path input workspace_path = st.text_input( "Workspace Path", value=str(Path(st.session_state.get("session_dir", "autointerpreter-workspace")).resolve()), help="Enter the full path to your workspace directory", key="workspace_path_input" ) # Add Set Path button if st.button("📁 Set Workspace Path", use_container_width=True): try: workspace_dir = Path(workspace_path) workspace_dir.mkdir(parents=True, exist_ok=True) st.session_state.session_dir = str(workspace_dir) apply_interpreter_settings() st.success(f"✅ Workspace set to: {workspace_path}") st.rerun() except Exception as e: st.error(f"❌ Error setting workspace path: {str(e)}") # File browser for workspace management event = sfb.st_file_browser( path=st.session_state.get("session_dir", "autointerpreter-workspace"), key="file_browser", show_choose_file=True, show_delete_file=True, show_new_folder=True, show_upload_file=True, show_preview=True ) if event: if event.get("type") == "file_selected": st.session_state.session_dir = event["path"] st.code(f"Current workspace: {event['path']}", language="bash") apply_interpreter_settings() elif event.get("type") == "folder_created": st.success(f"Created folder: {event['path']}") elif event.get("type") == "file_deleted": st.warning(f"Deleted: {event['path']}") if str(event['path']) == st.session_state.get('session_dir'): st.session_state.pop('session_dir', None) apply_interpreter_settings() # After settings panel and before chat history display # Apply interpreter settings apply_interpreter_settings() # Display chat history with enhanced formatting for message in st.session_state.messages: with st.chat_message(message.role, avatar="🧑‍💻" if message.role == "user" else "🤖"): st.markdown(message.get_formatted_content()) # Handle new user input user_input = st.chat_input("Ask me anything about coding...", key="chat_input") if user_input: handle_user_input(user_input) # Add ARIA labels to main UI components st.markdown("""
""", unsafe_allow_html=True) # Add cleanup of old sessions at the end if st.session_state.settings["use_workspace"]: cleanup_old_sessions() if __name__ == "__main__": main()