from huggingface_hub import HfApi, snapshot_download from helper import download_hugging_face_embeddings from url import md_files_url from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text from langchain_community.llms import Ollama from langchain.chains import create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.chains import create_history_aware_retriever from langchain.memory import ChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain.vectorstores import FAISS from langchain.schema import Document from dotenv import load_dotenv from prompt import system_prompt, retriever_prompt import os import logging import gradio as gr import sqlite3 import bcrypt import uuid from datetime import datetime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) base = {} last_messages = 4 documents = [] load_dotenv() AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY') BASE_URL = os.environ.get('BASE_URL') os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY os.environ['BASE_URL'] = BASE_URL HF_TOKEN = os.environ.get("HF_TOKEN") os.environ['HF_TOKEN'] = HF_TOKEN HF_USERNAME = "HumbleBeeAI" # Replace with your HF username DATASET_NAME = "faiss_index" index_path = "faiss_index" from pathlib import Path # 🔹 Use /tmp directory in Hugging Face Spaces (to avoid filesystem restrictions) db_path = "/tmp/chatbot.db" # 🔹 Ensure the database file exists if not os.path.exists(db_path): print("🔴 chatbot.db does not exist! Creating it now...") conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL )''') conn.commit() conn.close() print("✅ chatbot.db created successfully!") # 🔹 Confirm file existence if os.path.exists(db_path): print(f"✅ File chatbot.db found at {db_path}") else: raise FileNotFoundError("🚨 chatbot.db was not found!") api = HfApi() # 🔹 Upload chatbot.db as a private dataset api.upload_file( path_or_fileobj=db_path, # Use the /tmp path path_in_repo="chatbot.db", # How it will appear in the dataset repo_id=f"{HF_USERNAME}/{DATASET_NAME}", # Your private dataset repo repo_type="dataset", token=HF_TOKEN ) print("✅ chatbot.db successfully uploaded to Hugging Face Dataset.") # 🔹 Download chatbot.db securely db_folder = snapshot_download( repo_id=f"{HF_USERNAME}/{DATASET_NAME}", allow_patterns=["chatbot.db"], # Only download the database use_auth_token=HF_TOKEN ) # 🔹 Define the database path DB_PATH = os.path.join(db_folder, "chatbot.db") # 🔹 Confirm database was downloaded if os.path.exists(DB_PATH): print(f"✅ Database downloaded at {DB_PATH}") else: raise FileNotFoundError("🚨 Failed to download chatbot.db from Hugging Face.") # ---- Database part ----- # # Database Connection def connect_db(): return sqlite3.connect(DB_PATH) def create_tables(): with connect_db() as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL )''') cursor.execute(''' CREATE TABLE IF NOT EXISTS chat_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, message TEXT NOT NULL, response TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE )''') conn.commit() create_tables() logger.info("Database tables created successfully!") # Secure Password Hashing def hash_password(password): return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def check_password(password, hashed): return bcrypt.checkpw(password.encode(), hashed.encode()) # Authenticate User def authenticate(username, password): with connect_db() as conn: cursor = conn.cursor() cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) user = cursor.fetchone() if user and check_password(password, user[1]): session_id = uuid.uuid4().hex # Unique session ID return session_id, f"Welcome {username}!", user[0] # user_id return None, "Invalid username or password.", None # Signup Function def signup(username, password): try: hashed_pw = hash_password(password) with connect_db() as conn: cursor = conn.cursor() cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw)) conn.commit() return f"User {username} registered successfully! You can now log in." except sqlite3.IntegrityError: return "Username already exists. Try another one." # Store Chat in Database def save_chat(user_id, message, response): with connect_db() as conn: cursor = conn.cursor() cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)", (user_id, message, response, datetime.now())) conn.commit() # Retrieve Chat History (User-specific) def get_chat_history(user_id): with connect_db() as conn: cursor = conn.cursor() cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,)) chat_history = cursor.fetchall() return [(msg, resp) for msg, resp in chat_history] # --------------------------------- # #Option 2-load directly from urls for url in md_files_url: try: repo_owner, repo_name, file_path = extract_repo_details(url) content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY) if content: document = Document(page_content=content, metadata={"source": file_path}) documents.append(document) except ValueError as ve: logging.error(f"Error processing URL {url}: {ve}") print(f"Fetched {len(documents)} documents.") # 🔹 Use /tmp directory in Spaces faiss_index_path = "/tmp/faiss_index" # 🔹 Ensure FAISS index exists before uploading if not os.path.exists(faiss_index_path): print("🔴 FAISS index not found! Creating a new FAISS index...") # Create a dummy FAISS index (you should replace this with real embeddings) d = 768 # Embedding dimension index = faiss.IndexFlatL2(d) # Create an empty FAISS index faiss.write_index(index, os.path.join(faiss_index_path, "index.faiss")) print("✅ FAISS index created successfully!") # 🔹 Confirm FAISS index exists faiss_file = os.path.join(faiss_index_path, "index.faiss") if os.path.exists(faiss_file): print(f"✅ FAISS index found at {faiss_file}") else: raise FileNotFoundError("🚨 FAISS index was not found!") api = HfApi() # 🔹 Upload FAISS index as a private dataset api.upload_folder( folder_path=faiss_index_path, # Upload the FAISS folder repo_id=f"{HF_USERNAME}/{DATASET_NAME}", # Your private dataset repo repo_type="dataset", token=HF_TOKEN ) print("✅ FAISS index successfully uploaded to Hugging Face Dataset.") # 🔹 Download FAISS index securely faiss_folder = snapshot_download( repo_id=f"{HF_USERNAME}/{DATASET_NAME}", allow_patterns=["faiss_index/*"], # Only download FAISS index use_auth_token=HF_TOKEN ) # 🔹 Define FAISS file path faiss_file_path = os.path.join(faiss_folder, "index.faiss") # 🔹 Ensure the FAISS index was downloaded if os.path.exists(faiss_file_path): print(f"✅ FAISS index downloaded at {faiss_file_path}") else: raise FileNotFoundError("🚨 Failed to download FAISS index from Hugging Face.") # 🔹 Load FAISS Index index = faiss.read_index(faiss_file_path) # 🔹 Integrate FAISS with LangChain embedding_function = download_hugging_face_embeddings() # Your embedding function docsearch = FAISS(index, embedding_function) retriever = docsearch.as_retriever(search_type='similarity', search_kwargs={'k':2}) llm = Ollama(model='llama3.2', base_url=BASE_URL) prompt = ChatPromptTemplate.from_messages( [ ('system', system_prompt), ('human', '{input}'), ] ) question_answer_chain = create_stuff_documents_chain(llm, prompt) rag_chain = create_retrieval_chain(retriever, question_answer_chain) context_prompt = ChatPromptTemplate.from_messages( [ ('system', retriever_prompt), MessagesPlaceholder(variable_name='chat_history'), ('human', '{input}'), ] ) history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt) qa_prompt = ChatPromptTemplate.from_messages( [ ('system', system_prompt), MessagesPlaceholder(variable_name='chat_history'), ('human', '{input}'), ] ) question_answer_chain = create_stuff_documents_chain(llm, prompt) rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) def get_session_history(session_id): if session_id not in base: base[session_id] = ChatMessageHistory() stored_msg = base[session_id].messages if len(stored_msg) >= last_messages: base[session_id].clear() for msg in stored_msg[-last_messages:]: base[session_id].add_message(msg) return base[session_id] chat_with_msg_history = RunnableWithMessageHistory( rag_chain, get_session_history, input_messages_key='input', history_messages_key='chat_history' ) def get_response(message, chat_history, session_id, user_id): if not session_id or not user_id: return "Session expired. Please log in again.", [] chat_history = get_chat_history(user_id) # Load user's previous chat history response = chat_with_msg_history.invoke( {'input': message}, {'configurable': {'session_id': session_id}}, ) # 🔹 Log the response for debugging print("LangChain Response:", response) # 🔹 Ensure response contains 'answer' (adjust this if needed) if isinstance(response, dict) and 'answer' in response: chatbot_reply = response['answer'] else: chatbot_reply = "I'm sorry, I couldn't process that request." save_chat(user_id, message, chatbot_reply) chat_history.append((message, chatbot_reply)) # Append instead of overwriting return "", chat_history # Logout Function def logout(session_id): if session_id in base: del base[session_id] # Clear session history return None, "Logged out successfully.", None ## ... [Keep all previous code up to the Gradio UI section] ... # Gradio UI with gr.Blocks() as demo: gr.Markdown("## HumblebeeAI Customer Support Chatbot") with gr.Row(): username = gr.Textbox(label="Username", interactive=True) password = gr.Textbox(label="Password", type="password", interactive=True) login_button = gr.Button("Login", interactive=False) # Initially disabled signup_button = gr.Button("Signup", interactive=False) # Initially disabled login_status = gr.Textbox(label="Status", interactive=False) session_state = gr.State(None) user_id_state = gr.State(None) with gr.Column(visible=False) as chat_interface: chatbot = gr.Chatbot() msg = gr.Textbox(label="Message", placeholder="Ask me anything...") send_button = gr.Button("Send") logout_button = gr.Button("Logout") # 🔹 Enable buttons only when both username & password are filled def enable_buttons(username, password): is_valid = bool(username.strip()) and bool(password.strip()) return gr.update(interactive=is_valid), gr.update(interactive=is_valid) username.change(enable_buttons, [username, password], [login_button, signup_button]) password.change(enable_buttons, [username, password], [login_button, signup_button]) # 🔹 Login Logic (Clears Username & Password) def login_user(username, password): session_id, message, user_id = authenticate(username, password) if session_id: return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True) return None, None, message, username, password, [], gr.update(visible=False) login_button.click( login_user, [username, password], [session_state, user_id_state, login_status, username, password, chatbot, chat_interface] ) # 🔹 Signup Logic (Clears Username & Password) def signup_user(username, password): message = signup(username, password) return message, "", "" signup_button.click( signup_user, [username, password], [login_status, username, password] ) # 🔹 Sending Messages send_button.click( get_response, [msg, chatbot, session_state, user_id_state], [msg, chatbot] ) msg.submit( get_response, [msg, chatbot, session_state, user_id_state], [msg, chatbot] ) # 🔹 Logout Logic (Clears Chat and Resets UI) def logout_user(): return None, "", "", [], gr.update(visible=False) # 🔹 Logout Function (Clears Status, Session, and Chat History) def logout_user(): return None, "", "", "", [], gr.update(visible=False) logout_button.click( logout_user, None, [session_state, username, password, login_status, chatbot, chat_interface] ) demo.launch(share=True)