from url import md_files_url from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text, download_hugging_face_embeddings from langchain_community.llms import Ollama from langchain.chains import create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.chains import create_history_aware_retriever from langchain.memory import ChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_pinecone import PineconeVectorStore from pinecone.grpc import PineconeGRPC as Pinecone from pinecone import ServerlessSpec from langchain.schema import Document from dotenv import load_dotenv from prompt import system_prompt, retriever_prompt import os import logging import gradio as gr import sqlite3 import bcrypt import uuid from datetime import datetime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) DB_PATH = "chatbot.db" base = {} last_messages = 10 documents = [] load_dotenv() AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY') os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY') os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY # ---- Database part ----- # # Database Connection def connect_db(): try: return sqlite3.connect(DB_PATH) except sqlite3.Error as e: logger.error(f"Database connection failed: {e}") return None # Or raise a custom exception def create_tables(): with connect_db() as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL )''') cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, message TEXT NOT NULL, response TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE )''') conn.commit() create_tables() logger.info("Database tables created successfully!") # Secure Password Hashing def hash_password(password): return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def check_password(password, hashed): return bcrypt.checkpw(password.encode(), hashed.encode()) # Authenticate User def authenticate(username, password): with connect_db() as conn: cursor = conn.cursor() cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) user = cursor.fetchone() if user and check_password(password, user[1]): session_id = uuid.uuid4().hex # Unique session ID return session_id, f"Welcome {username}!", user[0] # user_id return None, "Invalid username or password.", None # Signup Function def signup(username, password): try: hashed_pw = hash_password(password) with connect_db() as conn: cursor = conn.cursor() cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw)) conn.commit() return f"User {username} registered successfully! You can now log in." except sqlite3.IntegrityError: return "Username already exists. Try another one." # Store Chat in Database def save_chat(user_id, message, response): with connect_db() as conn: cursor = conn.cursor() cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)", (user_id, message, response, datetime.now())) conn.commit() # Retrieve Chat History (User-specific) def get_chat_history(user_id): with connect_db() as conn: cursor = conn.cursor() cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,)) chat_history = cursor.fetchall() return [(msg, resp) for msg, resp in chat_history] # --------------------------------- # #Option 2-load directly from urls for url in md_files_url: try: repo_owner, repo_name, file_path = extract_repo_details(url) content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY) if content: document = Document(page_content=content, metadata={"source": file_path}) documents.append(document) except ValueError as ve: logging.error(f"Error processing URL {url}: {ve}") print(f"Fetched {len(documents)} documents.") text_chunk = chunk_text(documents) index_name = 'humblebeeai' pc = Pinecone(api_key=PINECONE_API_KEY) existing_indexes = [index['name'] for index in pc.list_indexes()] if index_name in existing_indexes: print(f"🟢 Index '{index_name}' already exists. Skipping creation.") else: print(f"🔴 Index '{index_name}' not found. Creating it now...") pc.create_index( name=index_name, dimension=384, # Adjust the dimension based on your embeddings metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) print(f"✅ Index '{index_name}' created successfully.") docsearch = PineconeVectorStore.from_documents( documents=text_chunk, index_name=index_name, embedding=download_hugging_face_embeddings() ) docsearch = PineconeVectorStore.from_existing_index( index_name=index_name, embedding=download_hugging_face_embeddings() ) retriever = docsearch.as_retriever(search_type='mmr', search_kwargs={'k': 10, 'lambda_mult': 0.5}) prompt = ChatPromptTemplate.from_messages( [ ('system', system_prompt), ('human', '{input}'), ] ) llm =Ollama(model='llama3.2', base_url='https://ollama-gcs-633059950484.us-central1.run.app') question_answer_chain = create_stuff_documents_chain(llm, prompt) rag_chain = create_retrieval_chain(retriever, question_answer_chain) context_prompt = ChatPromptTemplate.from_messages( [ ('system', retriever_prompt), MessagesPlaceholder(variable_name='chat_history'), ('human', '{input}'), ] ) history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt) qa_prompt = ChatPromptTemplate.from_messages( [ ('system', system_prompt), MessagesPlaceholder(variable_name='chat_history'), ('human', '{input}'), ] ) question_answer_chain = create_stuff_documents_chain(llm, prompt) rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) def get_session_history(session_id): if session_id not in base: base[session_id] = ChatMessageHistory() stored_msg = base[session_id].messages if len(stored_msg) >= last_messages: base[session_id].clear() for msg in stored_msg[-last_messages:]: base[session_id].add_message(msg) return base[session_id] chat_with_msg_history = RunnableWithMessageHistory( rag_chain, get_session_history, input_messages_key='input', history_messages_key='chat_history' ) from langchain.schema import AIMessage, HumanMessage def get_combined_history(session_id, user_id): persistent_history = get_chat_history(user_id) # Retrieve long-term memory # Convert stored chat history into messages long_term_messages = [ HumanMessage(content=msg) if i % 2 == 0 else AIMessage(content=resp) for i, (msg, resp) in enumerate(persistent_history) ] # Collect session-based recent messages recent_messages = base[session_id].messages if session_id in base else [] # Merge and return the combined context combined_history = long_term_messages + recent_messages print("Combined History for Debugging:", combined_history) # 🔹 Debugging step return combined_history[-last_messages:] # Optionally limit def get_response(message, chat_history, session_id, user_id): if not session_id or not user_id: return "Session expired. Please log in again.", [] combined_history = get_combined_history(session_id, user_id) response = chat_with_msg_history.invoke( { 'input': message, 'chat_history': combined_history # This should be a list of LangChain message objects }, {'configurable': {'session_id': session_id}}, ) # 🔹 Log the response for debugging print("LangChain Response:", response) # 🔹 Ensure response contains 'answer' (adjust this if needed) if isinstance(response, dict) and 'answer' in response: chatbot_reply = response['answer'] else: chatbot_reply = "I'm sorry, I couldn't process that request." save_chat(user_id, message, chatbot_reply) chat_history.append((message, chatbot_reply)) # Append instead of overwriting return "", chat_history # Logout Function def logout(session_id): if session_id in base: del base[session_id] # Clear session history return None, "Logged out successfully.", None import gradio as gr def enable_buttons(username, password): is_valid = bool(username.strip()) and bool(password.strip()) return gr.update(interactive=is_valid), gr.update(interactive=is_valid) def login_user(username, password): session_id, message, user_id = authenticate(username, password) if session_id: # Hide login, signup, and status, and show the chat interface and logout button return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) return None, None, message, username, password, [], gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) def signup_user(username, password): message = signup(username, password) return message, "", "" def logout_user(): return None, "", "", "", [], gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) with gr.Blocks() as demo: gr.Markdown("## HumblebeeAI Customer Support Chatbot") with gr.Row(): username = gr.Textbox(label="Username", interactive=True) password = gr.Textbox(label="Password", type="password", interactive=True) login_button = gr.Button("Login", interactive=False) # Initially disabled signup_button = gr.Button("Signup", interactive=False) # Initially disabled login_status = gr.Textbox(label="Status", interactive=False) session_state = gr.State(None) user_id_state = gr.State(None) with gr.Column(visible=False) as chat_interface: chatbot = gr.Chatbot() msg = gr.Textbox(label="Message", placeholder="Ask me anything...") send_button = gr.Button("Send") logout_button = gr.Button("Logout") # Enable buttons only when both username & password are filled username.change(enable_buttons, [username, password], [login_button, signup_button]) password.change(enable_buttons, [username, password], [login_button, signup_button]) # Login Logic (Clears Username & Password) login_button.click( login_user, [username, password], [session_state, user_id_state, login_status, username, password, chatbot, chat_interface, username, password, send_button, logout_button] ) # Signup Logic (Clears Username & Password) signup_button.click( signup_user, [username, password], [login_status, username, password] ) # Sending Messages send_button.click( get_response, [msg, chatbot, session_state, user_id_state], [msg, chatbot] ) msg.submit( get_response, [msg, chatbot, session_state, user_id_state], [msg, chatbot] ) # Logout Logic (Clears Chat and Resets UI) logout_button.click( logout_user, None, [session_state, username, password, login_status, chatbot, chat_interface, username, password, send_button, logout_button] ) demo.launch(share=True)