joe4ai's picture
Update app.py
e30aa81 verified
raw
history blame
13.8 kB
from huggingface_hub import HfApi, snapshot_download
from helper import download_hugging_face_embeddings
from url import md_files_url
from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text
from langchain_community.llms import Ollama
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever
from langchain.memory import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain.vectorstores import FAISS
from langchain.schema import Document
from dotenv import load_dotenv
from prompt import system_prompt, retriever_prompt
import os
import logging
import gradio as gr
import sqlite3
import bcrypt
import uuid
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
base = {}
last_messages = 4
documents = []
load_dotenv()
AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY')
BASE_URL = os.environ.get('BASE_URL')
os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY
os.environ['BASE_URL'] = BASE_URL
HF_TOKEN = os.environ.get("HF_TOKEN")
os.environ['HF_TOKEN'] = HF_TOKEN
HF_USERNAME = "HumbleBeeAI" # Replace with your HF username
DATASET_NAME = "faiss_index"
index_path = "faiss_index"
from pathlib import Path
# πŸ”Ή Use /tmp directory in Hugging Face Spaces (to avoid filesystem restrictions)
db_path = "/tmp/chatbot.db"
# πŸ”Ή Ensure the database file exists
if not os.path.exists(db_path):
print("πŸ”΄ chatbot.db does not exist! Creating it now...")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
)''')
conn.commit()
conn.close()
print("βœ… chatbot.db created successfully!")
# πŸ”Ή Confirm file existence
if os.path.exists(db_path):
print(f"βœ… File chatbot.db found at {db_path}")
else:
raise FileNotFoundError("🚨 chatbot.db was not found!")
api = HfApi()
# πŸ”Ή Upload chatbot.db as a private dataset
api.upload_file(
path_or_fileobj=db_path, # Use the /tmp path
path_in_repo="chatbot.db", # How it will appear in the dataset
repo_id=f"{HF_USERNAME}/{DATASET_NAME}", # Your private dataset repo
repo_type="dataset",
token=HF_TOKEN
)
print("βœ… chatbot.db successfully uploaded to Hugging Face Dataset.")
# πŸ”Ή Download chatbot.db securely
db_folder = snapshot_download(
repo_id=f"{HF_USERNAME}/{DATASET_NAME}",
allow_patterns=["chatbot.db"], # Only download the database
use_auth_token=HF_TOKEN
)
# πŸ”Ή Define the database path
DB_PATH = os.path.join(db_folder, "chatbot.db")
# πŸ”Ή Confirm database was downloaded
if os.path.exists(DB_PATH):
print(f"βœ… Database downloaded at {DB_PATH}")
else:
raise FileNotFoundError("🚨 Failed to download chatbot.db from Hugging Face.")
# ---- Database part ----- #
# Database Connection
def connect_db():
return sqlite3.connect(DB_PATH)
def create_tables():
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message TEXT NOT NULL,
response TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)''')
conn.commit()
create_tables()
logger.info("Database tables created successfully!")
# Secure Password Hashing
def hash_password(password):
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def check_password(password, hashed):
return bcrypt.checkpw(password.encode(), hashed.encode())
# Authenticate User
def authenticate(username, password):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if user and check_password(password, user[1]):
session_id = uuid.uuid4().hex # Unique session ID
return session_id, f"Welcome {username}!", user[0] # user_id
return None, "Invalid username or password.", None
# Signup Function
def signup(username, password):
try:
hashed_pw = hash_password(password)
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw))
conn.commit()
return f"User {username} registered successfully! You can now log in."
except sqlite3.IntegrityError:
return "Username already exists. Try another one."
# Store Chat in Database
def save_chat(user_id, message, response):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)",
(user_id, message, response, datetime.now()))
conn.commit()
# Retrieve Chat History (User-specific)
def get_chat_history(user_id):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,))
chat_history = cursor.fetchall()
return [(msg, resp) for msg, resp in chat_history]
# --------------------------------- #
#Option 2-load directly from urls
for url in md_files_url:
try:
repo_owner, repo_name, file_path = extract_repo_details(url)
content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY)
if content:
document = Document(page_content=content, metadata={"source": file_path})
documents.append(document)
except ValueError as ve:
logging.error(f"Error processing URL {url}: {ve}")
print(f"Fetched {len(documents)} documents.")
# πŸ”Ή Use /tmp directory in Spaces
faiss_index_path = "/tmp/faiss_index"
# πŸ”Ή Ensure FAISS index exists before uploading
if not os.path.exists(faiss_index_path):
print("πŸ”΄ FAISS index not found! Creating a new FAISS index...")
# Create a dummy FAISS index (you should replace this with real embeddings)
d = 768 # Embedding dimension
index = faiss.IndexFlatL2(d) # Create an empty FAISS index
faiss.write_index(index, os.path.join(faiss_index_path, "index.faiss"))
print("βœ… FAISS index created successfully!")
# πŸ”Ή Confirm FAISS index exists
faiss_file = os.path.join(faiss_index_path, "index.faiss")
if os.path.exists(faiss_file):
print(f"βœ… FAISS index found at {faiss_file}")
else:
raise FileNotFoundError("🚨 FAISS index was not found!")
api = HfApi()
# πŸ”Ή Upload FAISS index as a private dataset
api.upload_folder(
folder_path=faiss_index_path, # Upload the FAISS folder
repo_id=f"{HF_USERNAME}/{DATASET_NAME}", # Your private dataset repo
repo_type="dataset",
token=HF_TOKEN
)
print("βœ… FAISS index successfully uploaded to Hugging Face Dataset.")
# πŸ”Ή Download FAISS index securely
faiss_folder = snapshot_download(
repo_id=f"{HF_USERNAME}/{DATASET_NAME}",
allow_patterns=["faiss_index/*"], # Only download FAISS index
use_auth_token=HF_TOKEN
)
# πŸ”Ή Define FAISS file path
faiss_file_path = os.path.join(faiss_folder, "index.faiss")
# πŸ”Ή Ensure the FAISS index was downloaded
if os.path.exists(faiss_file_path):
print(f"βœ… FAISS index downloaded at {faiss_file_path}")
else:
raise FileNotFoundError("🚨 Failed to download FAISS index from Hugging Face.")
# πŸ”Ή Load FAISS Index
index = faiss.read_index(faiss_file_path)
# πŸ”Ή Integrate FAISS with LangChain
embedding_function = download_hugging_face_embeddings() # Your embedding function
docsearch = FAISS(index, embedding_function)
retriever = docsearch.as_retriever(search_type='similarity', search_kwargs={'k':2})
llm = Ollama(model='llama3.2', base_url=BASE_URL)
prompt = ChatPromptTemplate.from_messages(
[
('system', system_prompt),
('human', '{input}'),
]
)
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
context_prompt = ChatPromptTemplate.from_messages(
[
('system', retriever_prompt),
MessagesPlaceholder(variable_name='chat_history'),
('human', '{input}'),
]
)
history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt)
qa_prompt = ChatPromptTemplate.from_messages(
[
('system', system_prompt),
MessagesPlaceholder(variable_name='chat_history'),
('human', '{input}'),
]
)
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
def get_session_history(session_id):
if session_id not in base:
base[session_id] = ChatMessageHistory()
stored_msg = base[session_id].messages
if len(stored_msg) >= last_messages:
base[session_id].clear()
for msg in stored_msg[-last_messages:]:
base[session_id].add_message(msg)
return base[session_id]
chat_with_msg_history = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key='input',
history_messages_key='chat_history'
)
def get_response(message, chat_history, session_id, user_id):
if not session_id or not user_id:
return "Session expired. Please log in again.", []
chat_history = get_chat_history(user_id) # Load user's previous chat history
response = chat_with_msg_history.invoke(
{'input': message},
{'configurable': {'session_id': session_id}},
)
# πŸ”Ή Log the response for debugging
print("LangChain Response:", response)
# πŸ”Ή Ensure response contains 'answer' (adjust this if needed)
if isinstance(response, dict) and 'answer' in response:
chatbot_reply = response['answer']
else:
chatbot_reply = "I'm sorry, I couldn't process that request."
save_chat(user_id, message, chatbot_reply)
chat_history.append((message, chatbot_reply)) # Append instead of overwriting
return "", chat_history
# Logout Function
def logout(session_id):
if session_id in base:
del base[session_id] # Clear session history
return None, "Logged out successfully.", None
## ... [Keep all previous code up to the Gradio UI section] ...
# Gradio UI
with gr.Blocks() as demo:
gr.Markdown("## HumblebeeAI Customer Support Chatbot")
with gr.Row():
username = gr.Textbox(label="Username", interactive=True)
password = gr.Textbox(label="Password", type="password", interactive=True)
login_button = gr.Button("Login", interactive=False) # Initially disabled
signup_button = gr.Button("Signup", interactive=False) # Initially disabled
login_status = gr.Textbox(label="Status", interactive=False)
session_state = gr.State(None)
user_id_state = gr.State(None)
with gr.Column(visible=False) as chat_interface:
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Message", placeholder="Ask me anything...")
send_button = gr.Button("Send")
logout_button = gr.Button("Logout")
# πŸ”Ή Enable buttons only when both username & password are filled
def enable_buttons(username, password):
is_valid = bool(username.strip()) and bool(password.strip())
return gr.update(interactive=is_valid), gr.update(interactive=is_valid)
username.change(enable_buttons, [username, password], [login_button, signup_button])
password.change(enable_buttons, [username, password], [login_button, signup_button])
# πŸ”Ή Login Logic (Clears Username & Password)
def login_user(username, password):
session_id, message, user_id = authenticate(username, password)
if session_id:
return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True)
return None, None, message, username, password, [], gr.update(visible=False)
login_button.click(
login_user,
[username, password],
[session_state, user_id_state, login_status, username, password, chatbot, chat_interface]
)
# πŸ”Ή Signup Logic (Clears Username & Password)
def signup_user(username, password):
message = signup(username, password)
return message, "", ""
signup_button.click(
signup_user,
[username, password],
[login_status, username, password]
)
# πŸ”Ή Sending Messages
send_button.click(
get_response,
[msg, chatbot, session_state, user_id_state],
[msg, chatbot]
)
msg.submit(
get_response,
[msg, chatbot, session_state, user_id_state],
[msg, chatbot]
)
# πŸ”Ή Logout Logic (Clears Chat and Resets UI)
def logout_user():
return None, "", "", [], gr.update(visible=False)
# πŸ”Ή Logout Function (Clears Status, Session, and Chat History)
def logout_user():
return None, "", "", "", [], gr.update(visible=False)
logout_button.click(
logout_user,
None,
[session_state, username, password, login_status, chatbot, chat_interface]
)
demo.launch(share=True)