from flask import Flask, render_template, request, jsonify import subprocess import tempfile import os from langchain.llms import Ollama from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.memory import ConversationBufferMemory from datetime import datetime import json from typing import Dict, List import sqlite3 from contextlib import contextmanager import re from werkzeug.utils import secure_filename app = Flask(__name__) PORT = int(os.environ.get("PORT", 7860)) UPLOAD_FOLDER = '/tmp/uploads' # Change to tmp directory for Spaces ALLOWED_EXTENSIONS = {'py'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER os.makedirs(UPLOAD_FOLDER, exist_ok=True) # Database configuration DATABASE_PATH = '/tmp/chat_database.db' # Initialize LangChain with Ollama LLM from transformers import AutoTokenizer, AutoModelForCausalLM import torch # Load model and tokenizer model_name = "mistralai/Mistral-7B-Instruct-v0.1" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name) class HuggingFaceLLM: def __init__(self, model, tokenizer): self.model = model self.tokenizer = tokenizer def predict(self, prompt): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=2048, truncation=True) with torch.no_grad(): outputs = self.model.generate( inputs["input_ids"], max_length=2048, num_return_sequences=1, temperature=0.7, pad_token_id=self.tokenizer.eos_token_id ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) return response llm = HuggingFaceLLM(model, tokenizer) @contextmanager def get_db_connection(): conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_db(): with get_db_connection() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS chats ( id TEXT PRIMARY KEY, title TEXT, date TEXT, last_message TEXT ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT, role TEXT, content TEXT, timestamp TEXT, FOREIGN KEY (chat_id) REFERENCES chats (id) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS important_info ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id TEXT, content TEXT, FOREIGN KEY (chat_id) REFERENCES chats (id) ) ''') conn.commit() # Initialize database on startup init_db() class ChatSession: def __init__(self, session_id): self.session_id = session_id self.memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True ) self._load_chat_history() self._load_important_info() def _load_chat_history(self): """Load chat history from database""" with get_db_connection() as conn: messages = conn.execute( 'SELECT role, content, timestamp FROM messages WHERE chat_id = ? ORDER BY timestamp', (self.session_id,) ).fetchall() self.chat_history = [] for msg in messages: self.chat_history.append({ "role": msg['role'], "content": msg['content'], "timestamp": msg['timestamp'] }) if msg['role'] == "user": self.memory.chat_memory.add_user_message(msg['content']) else: self.memory.chat_memory.add_ai_message(msg['content']) def _load_important_info(self): """Load important info from database""" with get_db_connection() as conn: info = conn.execute( 'SELECT content FROM important_info WHERE chat_id = ?', (self.session_id,) ).fetchall() self.important_info = [row['content'] for row in info] def add_message(self, role, content): timestamp = datetime.now().isoformat() message = { "role": role, "content": content, "timestamp": timestamp } # Store in database with get_db_connection() as conn: conn.execute( 'INSERT INTO messages (chat_id, role, content, timestamp) VALUES (?, ?, ?, ?)', (self.session_id, role, content, timestamp) ) conn.commit() self.chat_history.append(message) # Update memory if role == "user": self.memory.chat_memory.add_user_message(content) else: self.memory.chat_memory.add_ai_message(content) def add_important_info(self, content): """Add important information to database""" with get_db_connection() as conn: conn.execute( 'INSERT INTO important_info (chat_id, content) VALUES (?, ?)', (self.session_id, content) ) conn.commit() self.important_info.append(content) def get_memory_variables(self): return self.memory.load_memory_variables({}) def clear_memory(self): """Clear all memory from database""" with get_db_connection() as conn: conn.execute('DELETE FROM messages WHERE chat_id = ?', (self.session_id,)) conn.execute('DELETE FROM important_info WHERE chat_id = ?', (self.session_id,)) conn.commit() self.memory.clear() self.chat_history = [] self.important_info = [] def clear_chat_history(self): """Clear chat history from database""" with get_db_connection() as conn: conn.execute('DELETE FROM messages WHERE chat_id = ?', (self.session_id,)) conn.commit() self.chat_history = [] self.memory.chat_memory.clear() def clear_important_info(self): """Clear important info from database""" with get_db_connection() as conn: conn.execute('DELETE FROM important_info WHERE chat_id = ?', (self.session_id,)) conn.commit() self.important_info = [] # Rest of the prompt template and other configurations remain the same prompt_template = """ Role: You are Figr Code Assistant, specializing in providing clear, error-free Python code solutions. Context: {important_info} Previous Conversation: {chat_history} Current Request: {user_request} Output Guidelines: 1. Code Format: - Use ```python for code blocks - Use `code` for inline code references - Provide raw text without HTML formatting - Strictly include explanation only after code blocks 2. Code Organization: - Default to single, focused code snippets for clarity - Only split into multiple snippets(each individually runnable) if: a) Multiple distinct concepts are requested b) Complex functionality requires modular explanation - Mark critical information with [IMPORTANT] prefix and give small explanations with some bold headings if required and in white font always. """ prompt = PromptTemplate( input_variables=["user_request", "chat_history", "important_info"], template=prompt_template ) llm_chain = LLMChain(llm=llm, prompt=prompt) def convert_to_html(raw_text): """Convert markdown to HTML while preserving code blocks with custom buttons""" try: # Create a temporary markdown file with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".md") as temp_input: temp_input.write(raw_text) temp_input_path = temp_input.name # Use pandoc with specific options to preserve code blocks with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as temp_output: temp_output_path = temp_output.name # Use pandoc with specific options cmd = [ "pandoc", temp_input_path, "-f", "markdown", "-t", "html", "--highlight-style=pygments", "--no-highlight", # Disable pandoc's highlighting "-o", temp_output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: with open(temp_output_path, "r") as f: html_content = f.read() # Add custom buttons to code blocks import re def replace_code_block(match): code_class = match.group(1) or '' code_content = match.group(2) return f'''
{code_content}
''' # Replace
 blocks with our custom wrapper
            pattern = r'
(.*?)
' html_content = re.sub(pattern, replace_code_block, html_content, flags=re.DOTALL) else: html_content = f"Error: {result.stderr}" finally: # Clean up temporary files if os.path.exists(temp_input_path): os.remove(temp_input_path) if os.path.exists(temp_output_path): os.remove(temp_output_path) return html_content def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def extract_important_info(response): important_items = [] for line in response.split('\n'): if '[IMPORTANT]' in line: important_items.append(line.replace('[IMPORTANT]', '').strip()) return important_items def create_new_chat(session_id: str): """Create a new chat session with metadata in database""" with get_db_connection() as conn: conn.execute( 'INSERT INTO chats (id, title, date, last_message) VALUES (?, ?, ?, ?)', (session_id, "New Chat", datetime.now().isoformat(), None) ) conn.commit() return { "id": session_id, "title": "New Chat", "date": datetime.now().isoformat(), "last_message": None } def update_chat_metadata(session_id: str, last_message: str): """Update chat metadata in database""" title = last_message[:30] + "..." if len(last_message) > 30 else last_message with get_db_connection() as conn: conn.execute( 'UPDATE chats SET title = ?, last_message = ? WHERE id = ?', (title, last_message, session_id) ) conn.commit() def format_response(response): """Format response with proper code block structure""" # First, handle code blocks with language specification formatted = re.sub( r'```(\w+)\n(.*?)\n```', lambda m: f'
\n\n\n
{m.group(2)}
\n
\n
', response, flags=re.DOTALL ) # Then handle code blocks without language specification formatted = re.sub( r'```\n(.*?)\n```', lambda m: f'
\n\n\n
{m.group(1)}
\n
\n
', formatted, flags=re.DOTALL ) # Handle inline code formatted = re.sub( r'`([^`]+)`', r'\1', formatted ) return formatted @app.route("/api/chat-list", methods=["GET"]) def get_chat_list(): """Get list of all chats from database""" with get_db_connection() as conn: chats = conn.execute('SELECT * FROM chats ORDER BY date DESC').fetchall() return jsonify({ "chats": [dict(chat) for chat in chats] }) # The rest of your route handlers (convert_to_html, extract_important_info, etc.) remain the same @app.route("/api/chat", methods=["POST"]) def chat(): data = request.json user_input = data.get("message", "") session_id = data.get("sessionId", "default") # Get or create session session = ChatSession(session_id) try: # Add user message session.add_message("user", user_input) update_chat_metadata(session_id, user_input) # Get memory variables memory_vars = session.get_memory_variables() # Generate response raw_response = llm_chain.run( user_request=user_input, chat_history=memory_vars.get("chat_history", ""), important_info="\n".join(session.important_info) ) # Extract and store important information new_important_info = extract_important_info(raw_response) for info in new_important_info: session.add_important_info(info) # Format the response properly with code block structure def format_response(response): # First, handle code blocks with language specification formatted = re.sub( r'```(\w+)\n(.*?)\n```', lambda m: f'
\n\n\n
{m.group(2)}
\n
\n
', response, flags=re.DOTALL ) # Then handle code blocks without language specification formatted = re.sub( r'```\n(.*?)\n```', lambda m: f'
\n\n\n
{m.group(1)}
\n
\n
', formatted, flags=re.DOTALL ) # Handle inline code formatted = re.sub( r'`([^`]+)`', r'\1', formatted ) return formatted # Format the response formatted_response = format_response(raw_response) # Store the formatted response session.add_message("assistant", formatted_response) return jsonify({ "response": formatted_response, "success": True, "important_info": session.important_info }) except Exception as e: return jsonify({ "response": f"An error occurred: {str(e)}", "success": False }) @app.route("/api/new-chat", methods=["POST"]) def new_chat(): """Create a new chat session""" session_id = str(datetime.now().timestamp()) chat = create_new_chat(session_id) return jsonify({"success": True, "chat": chat}) @app.route("/api/chat-history", methods=["GET"]) def get_chat_history(): """Get chat history for a specific session""" session_id = request.args.get("sessionId", "default") with get_db_connection() as conn: # Get messages messages = conn.execute( 'SELECT role, content, timestamp FROM messages WHERE chat_id = ? ORDER BY timestamp', (session_id,) ).fetchall() # Format assistant messages if they aren't already formatted formatted_messages = [] for msg in messages: message_dict = dict(msg) if message_dict['role'] == 'assistant' and '```' in message_dict['content']: # Format the response if it contains code blocks message_dict['content'] = format_response(message_dict['content']) formatted_messages.append(message_dict) # Get important info important_info = conn.execute( 'SELECT content FROM important_info WHERE chat_id = ?', (session_id,) ).fetchall() return jsonify({ "history": formatted_messages, "important_info": [info['content'] for info in important_info] }) @app.route('/api/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'success': False, 'error': 'No file part'}) file = request.files['file'] if file.filename == '': return jsonify({'success': False, 'error': 'No selected file'}) if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Read the file content with open(filepath, 'r') as f: content = f.read() # Analyze the code using the LLM analysis_prompt = f""" Please analyze this Python code: {content} Provide: 1. A clear, small explanation 2. Any potential errors or improvements 3. Suggestions for better practices - Each in separate neat paragraphs with highlighted headings. """ analysis = llm.predict(analysis_prompt) # Clean up the uploaded file os.remove(filepath) return jsonify({ 'success': True, 'filename': filename, 'content': content, 'analysis': analysis }) return jsonify({'success': False, 'error': 'Invalid file type'}) @app.route("/api/clear-memory", methods=["POST"]) def clear_memory(): """Clear memory based on specified option""" session_id = request.json.get("sessionId", "default") clear_option = request.json.get("clearOption", "all") with get_db_connection() as conn: try: if clear_option == "all": conn.execute('DELETE FROM messages WHERE chat_id = ?', (session_id,)) conn.execute('DELETE FROM important_info WHERE chat_id = ?', (session_id,)) message = "All memory cleared successfully" elif clear_option == "chat": conn.execute('DELETE FROM messages WHERE chat_id = ?', (session_id,)) message = "Chat history cleared successfully" elif clear_option == "important": conn.execute('DELETE FROM important_info WHERE chat_id = ?', (session_id,)) message = "Important information cleared successfully" else: return jsonify({ "success": False, "message": "Invalid clear option specified" }) conn.commit() return jsonify({ "success": True, "message": message }) except Exception as e: return jsonify({ "success": False, "message": f"Error clearing memory: {str(e)}" }) @app.route("/api/test-code", methods=["POST"]) def test_code(): try: data = request.json code = data.get("code", "") # Create a temporary file to store the code with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(code) temp_file = f.name try: # Run the code using Python result = subprocess.run( ['python', temp_file], capture_output=True, text=True, timeout=5 # 5 second timeout for safety ) # Prepare the response success = result.returncode == 0 output = result.stdout if success else result.stderr return jsonify({ "success": success, "output": output }) finally: # Clean up the temporary file os.unlink(temp_file) except Exception as e: return jsonify({ "success": False, "output": f"Error executing code: {str(e)}" }) @app.route("/") def home(): """Serve the main application page""" return render_template("index.html") if __name__ == "__main__": init_db() app.run(host="0.0.0.0", port=PORT)