Spaces:
Runtime error
Runtime error
from huggingface_hub import HfApi, snapshot_download | |
from helper import download_hugging_face_embeddings | |
from url import md_files_url | |
from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text | |
from langchain_community.llms import Ollama | |
from langchain.chains import create_retrieval_chain | |
from langchain.chains.combine_documents import create_stuff_documents_chain | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.chains import create_history_aware_retriever | |
from langchain.memory import ChatMessageHistory | |
from langchain_core.runnables.history import RunnableWithMessageHistory | |
from langchain.vectorstores import FAISS | |
from langchain.schema import Document | |
from dotenv import load_dotenv | |
from prompt import system_prompt, retriever_prompt | |
import os | |
import logging | |
import gradio as gr | |
import sqlite3 | |
import bcrypt | |
import uuid | |
from datetime import datetime | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
base = {} | |
last_messages = 4 | |
documents = [] | |
load_dotenv() | |
AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY') | |
BASE_URL = os.environ.get('BASE_URL') | |
os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY | |
os.environ['BASE_URL'] = BASE_URL | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
os.environ['HF_TOKEN'] = HF_TOKEN | |
HF_USERNAME = "HumbleBeeAI" # Replace with your HF username | |
DATASET_NAME = "faiss_index" | |
index_path = "faiss_index" | |
from pathlib import Path | |
# πΉ Use /tmp directory in Hugging Face Spaces (to avoid filesystem restrictions) | |
db_path = "/tmp/chatbot.db" | |
# πΉ Ensure the database file exists | |
if not os.path.exists(db_path): | |
print("π΄ chatbot.db does not exist! Creating it now...") | |
conn = sqlite3.connect(db_path) | |
cursor = conn.cursor() | |
cursor.execute('''CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
username TEXT UNIQUE NOT NULL, | |
password_hash TEXT NOT NULL | |
)''') | |
conn.commit() | |
conn.close() | |
print("β chatbot.db created successfully!") | |
# πΉ Confirm file existence | |
if os.path.exists(db_path): | |
print(f"β File chatbot.db found at {db_path}") | |
else: | |
raise FileNotFoundError("π¨ chatbot.db was not found!") | |
api = HfApi() | |
# πΉ Upload chatbot.db as a private dataset | |
api.upload_file( | |
path_or_fileobj=db_path, # Use the /tmp path | |
path_in_repo="chatbot.db", # How it will appear in the dataset | |
repo_id=f"{HF_USERNAME}/{DATASET_NAME}", # Your private dataset repo | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
print("β chatbot.db successfully uploaded to Hugging Face Dataset.") | |
# πΉ Download chatbot.db securely | |
db_folder = snapshot_download( | |
repo_id=f"{HF_USERNAME}/{DATASET_NAME}", | |
allow_patterns=["chatbot.db"], # Only download the database | |
use_auth_token=HF_TOKEN | |
) | |
# πΉ Define the database path | |
DB_PATH = os.path.join(db_folder, "chatbot.db") | |
# πΉ Confirm database was downloaded | |
if os.path.exists(DB_PATH): | |
print(f"β Database downloaded at {DB_PATH}") | |
else: | |
raise FileNotFoundError("π¨ Failed to download chatbot.db from Hugging Face.") | |
# ---- Database part ----- # | |
# Database Connection | |
def connect_db(): | |
return sqlite3.connect(DB_PATH) | |
def create_tables(): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
username TEXT UNIQUE NOT NULL, | |
password_hash TEXT NOT NULL | |
)''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS chat_history ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id INTEGER NOT NULL, | |
message TEXT NOT NULL, | |
response TEXT NOT NULL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE | |
)''') | |
conn.commit() | |
create_tables() | |
logger.info("Database tables created successfully!") | |
# Secure Password Hashing | |
def hash_password(password): | |
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() | |
def check_password(password, hashed): | |
return bcrypt.checkpw(password.encode(), hashed.encode()) | |
# Authenticate User | |
def authenticate(username, password): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) | |
user = cursor.fetchone() | |
if user and check_password(password, user[1]): | |
session_id = uuid.uuid4().hex # Unique session ID | |
return session_id, f"Welcome {username}!", user[0] # user_id | |
return None, "Invalid username or password.", None | |
# Signup Function | |
def signup(username, password): | |
try: | |
hashed_pw = hash_password(password) | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw)) | |
conn.commit() | |
return f"User {username} registered successfully! You can now log in." | |
except sqlite3.IntegrityError: | |
return "Username already exists. Try another one." | |
# Store Chat in Database | |
def save_chat(user_id, message, response): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)", | |
(user_id, message, response, datetime.now())) | |
conn.commit() | |
# Retrieve Chat History (User-specific) | |
def get_chat_history(user_id): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,)) | |
chat_history = cursor.fetchall() | |
return [(msg, resp) for msg, resp in chat_history] | |
# --------------------------------- # | |
#Option 2-load directly from urls | |
for url in md_files_url: | |
try: | |
repo_owner, repo_name, file_path = extract_repo_details(url) | |
content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY) | |
if content: | |
document = Document(page_content=content, metadata={"source": file_path}) | |
documents.append(document) | |
except ValueError as ve: | |
logging.error(f"Error processing URL {url}: {ve}") | |
print(f"Fetched {len(documents)} documents.") | |
# πΉ Use /tmp directory in Spaces | |
faiss_index_path = "/tmp/faiss_index" | |
# πΉ Ensure FAISS index exists before uploading | |
if not os.path.exists(faiss_index_path): | |
print("π΄ FAISS index not found! Creating a new FAISS index...") | |
# Create a dummy FAISS index (you should replace this with real embeddings) | |
d = 768 # Embedding dimension | |
index = faiss.IndexFlatL2(d) # Create an empty FAISS index | |
faiss.write_index(index, os.path.join(faiss_index_path, "index.faiss")) | |
print("β FAISS index created successfully!") | |
# πΉ Confirm FAISS index exists | |
faiss_file = os.path.join(faiss_index_path, "index.faiss") | |
if os.path.exists(faiss_file): | |
print(f"β FAISS index found at {faiss_file}") | |
else: | |
raise FileNotFoundError("π¨ FAISS index was not found!") | |
api = HfApi() | |
# πΉ Upload FAISS index as a private dataset | |
api.upload_folder( | |
folder_path=faiss_index_path, # Upload the FAISS folder | |
repo_id=f"{HF_USERNAME}/{DATASET_NAME}", # Your private dataset repo | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
print("β FAISS index successfully uploaded to Hugging Face Dataset.") | |
# πΉ Download FAISS index securely | |
faiss_folder = snapshot_download( | |
repo_id=f"{HF_USERNAME}/{DATASET_NAME}", | |
allow_patterns=["faiss_index/*"], # Only download FAISS index | |
use_auth_token=HF_TOKEN | |
) | |
# πΉ Define FAISS file path | |
faiss_file_path = os.path.join(faiss_folder, "index.faiss") | |
# πΉ Ensure the FAISS index was downloaded | |
if os.path.exists(faiss_file_path): | |
print(f"β FAISS index downloaded at {faiss_file_path}") | |
else: | |
raise FileNotFoundError("π¨ Failed to download FAISS index from Hugging Face.") | |
# πΉ Load FAISS Index | |
index = faiss.read_index(faiss_file_path) | |
# πΉ Integrate FAISS with LangChain | |
embedding_function = download_hugging_face_embeddings() # Your embedding function | |
docsearch = FAISS(index, embedding_function) | |
retriever = docsearch.as_retriever(search_type='similarity', search_kwargs={'k':2}) | |
llm = Ollama(model='llama3.2', base_url=BASE_URL) | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
('system', system_prompt), | |
('human', '{input}'), | |
] | |
) | |
question_answer_chain = create_stuff_documents_chain(llm, prompt) | |
rag_chain = create_retrieval_chain(retriever, question_answer_chain) | |
context_prompt = ChatPromptTemplate.from_messages( | |
[ | |
('system', retriever_prompt), | |
MessagesPlaceholder(variable_name='chat_history'), | |
('human', '{input}'), | |
] | |
) | |
history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt) | |
qa_prompt = ChatPromptTemplate.from_messages( | |
[ | |
('system', system_prompt), | |
MessagesPlaceholder(variable_name='chat_history'), | |
('human', '{input}'), | |
] | |
) | |
question_answer_chain = create_stuff_documents_chain(llm, prompt) | |
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) | |
def get_session_history(session_id): | |
if session_id not in base: | |
base[session_id] = ChatMessageHistory() | |
stored_msg = base[session_id].messages | |
if len(stored_msg) >= last_messages: | |
base[session_id].clear() | |
for msg in stored_msg[-last_messages:]: | |
base[session_id].add_message(msg) | |
return base[session_id] | |
chat_with_msg_history = RunnableWithMessageHistory( | |
rag_chain, | |
get_session_history, | |
input_messages_key='input', | |
history_messages_key='chat_history' | |
) | |
def get_response(message, chat_history, session_id, user_id): | |
if not session_id or not user_id: | |
return "Session expired. Please log in again.", [] | |
chat_history = get_chat_history(user_id) # Load user's previous chat history | |
response = chat_with_msg_history.invoke( | |
{'input': message}, | |
{'configurable': {'session_id': session_id}}, | |
) | |
# πΉ Log the response for debugging | |
print("LangChain Response:", response) | |
# πΉ Ensure response contains 'answer' (adjust this if needed) | |
if isinstance(response, dict) and 'answer' in response: | |
chatbot_reply = response['answer'] | |
else: | |
chatbot_reply = "I'm sorry, I couldn't process that request." | |
save_chat(user_id, message, chatbot_reply) | |
chat_history.append((message, chatbot_reply)) # Append instead of overwriting | |
return "", chat_history | |
# Logout Function | |
def logout(session_id): | |
if session_id in base: | |
del base[session_id] # Clear session history | |
return None, "Logged out successfully.", None | |
## ... [Keep all previous code up to the Gradio UI section] ... | |
# Gradio UI | |
with gr.Blocks() as demo: | |
gr.Markdown("## HumblebeeAI Customer Support Chatbot") | |
with gr.Row(): | |
username = gr.Textbox(label="Username", interactive=True) | |
password = gr.Textbox(label="Password", type="password", interactive=True) | |
login_button = gr.Button("Login", interactive=False) # Initially disabled | |
signup_button = gr.Button("Signup", interactive=False) # Initially disabled | |
login_status = gr.Textbox(label="Status", interactive=False) | |
session_state = gr.State(None) | |
user_id_state = gr.State(None) | |
with gr.Column(visible=False) as chat_interface: | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(label="Message", placeholder="Ask me anything...") | |
send_button = gr.Button("Send") | |
logout_button = gr.Button("Logout") | |
# πΉ Enable buttons only when both username & password are filled | |
def enable_buttons(username, password): | |
is_valid = bool(username.strip()) and bool(password.strip()) | |
return gr.update(interactive=is_valid), gr.update(interactive=is_valid) | |
username.change(enable_buttons, [username, password], [login_button, signup_button]) | |
password.change(enable_buttons, [username, password], [login_button, signup_button]) | |
# πΉ Login Logic (Clears Username & Password) | |
def login_user(username, password): | |
session_id, message, user_id = authenticate(username, password) | |
if session_id: | |
return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True) | |
return None, None, message, username, password, [], gr.update(visible=False) | |
login_button.click( | |
login_user, | |
[username, password], | |
[session_state, user_id_state, login_status, username, password, chatbot, chat_interface] | |
) | |
# πΉ Signup Logic (Clears Username & Password) | |
def signup_user(username, password): | |
message = signup(username, password) | |
return message, "", "" | |
signup_button.click( | |
signup_user, | |
[username, password], | |
[login_status, username, password] | |
) | |
# πΉ Sending Messages | |
send_button.click( | |
get_response, | |
[msg, chatbot, session_state, user_id_state], | |
[msg, chatbot] | |
) | |
msg.submit( | |
get_response, | |
[msg, chatbot, session_state, user_id_state], | |
[msg, chatbot] | |
) | |
# πΉ Logout Logic (Clears Chat and Resets UI) | |
def logout_user(): | |
return None, "", "", [], gr.update(visible=False) | |
# πΉ Logout Function (Clears Status, Session, and Chat History) | |
def logout_user(): | |
return None, "", "", "", [], gr.update(visible=False) | |
logout_button.click( | |
logout_user, | |
None, | |
[session_state, username, password, login_status, chatbot, chat_interface] | |
) | |
demo.launch(share=True) | |