joe4ai's picture
Update app.py
1bef60b verified
raw
history blame
14.5 kB
from url import md_files_url
from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text, download_hugging_face_embeddings
from langchain_community.llms import Ollama
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever
from langchain.memory import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_pinecone import PineconeVectorStore
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec
from langchain.schema import Document
from dotenv import load_dotenv
from prompt import system_prompt, retriever_prompt
import os
import logging
import gradio as gr
import sqlite3
import bcrypt
import uuid
from datetime import datetime
from huggingface_hub import HfApi, snapshot_download
api = HfApi()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
base = {}
last_messages = 10
documents = []
HF_ORG_NAME = 'HumbleBeeAI'
DATASET_NAME = 'faiss_index'
repo_id = f"{HF_ORG_NAME}/{DATASET_NAME}"
load_dotenv()
AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY')
BASE_URL = os.environ.get("BASE_URL")
os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
raise ValueError("🚨 HF_TOKEN is not set! Ensure you have the correct permissions.")
# πŸ”Ή Verify authentication works
try:
user_info = api.whoami(token=HF_TOKEN)
print(f"βœ… Authenticated as: {user_info['name']}")
except Exception as e:
raise ValueError(f"🚨 Hugging Face authentication failed: {e}")
repo_id = f"{HF_ORG_NAME}/{DATASET_NAME}" # Use your organization dataset
api = HfApi()
db_path = "chatbot.db"
# πŸ”Ή Check if chatbot.db exists
if not os.path.exists(db_path):
print("πŸ”΄ chatbot.db does not exist! Creating it now...")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
)''')
conn.commit()
conn.close()
print("βœ… chatbot.db created successfully.")
repo_files = api.list_repo_files(repo_id, repo_type="dataset", token=HF_TOKEN)
if "chatbot.db" in repo_files:
print("🟒 `chatbot.db` already exists in the dataset. Skipping upload.")
else:
print("πŸ“€ Uploading `chatbot.db` to Hugging Face...")
api.upload_file(
path_or_fileobj=db_path,
path_in_repo="chatbot.db",
repo_id=repo_id,
repo_type="dataset",
token=HF_TOKEN
)
print("βœ… `chatbot.db` successfully uploaded to the organization's private dataset.")
DB_LOCAL_PATH = "/tmp/chatbot.db" # Path where we save the database
# πŸ”Ή Only download if chatbot.db does not exist locally
if not os.path.exists(DB_LOCAL_PATH):
print("πŸ“₯ Downloading `chatbot.db` from Hugging Face...")
try:
db_folder = snapshot_download(
repo_id=repo_id,
repo_type="dataset", # βœ… Ensure it's a dataset repo
allow_patterns=["chatbot.db"],
use_auth_token=HF_TOKEN
)
DB_PATH = os.path.join(db_folder, "chatbot.db")
if os.path.exists(DB_PATH):
print(f"βœ… Database downloaded at {DB_PATH}")
else:
raise FileNotFoundError("🚨 Failed to download chatbot.db from the organization's dataset.")
except Exception as e:
raise FileNotFoundError(f"🚨 Error downloading chatbot.db: {e}")
else:
print(f"🟒 `chatbot.db` already exists at {DB_LOCAL_PATH}, skipping download.")
# ---- Database part ----- #
# Database Connection
def connect_db():
try:
return sqlite3.connect(DB_PATH)
except sqlite3.Error as e:
logger.error(f"Database connection failed: {e}")
return None # Or raise a custom exception
def create_tables():
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message TEXT NOT NULL,
response TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)''')
conn.commit()
create_tables()
logger.info("Database tables created successfully!")
# Secure Password Hashing
def hash_password(password):
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def check_password(password, hashed):
return bcrypt.checkpw(password.encode(), hashed.encode())
# Authenticate User
def authenticate(username, password):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if user and check_password(password, user[1]):
session_id = uuid.uuid4().hex # Unique session ID
return session_id, f"Welcome {username}!", user[0] # user_id
return None, "Invalid username or password.", None
# Signup Function
def signup(username, password):
try:
hashed_pw = hash_password(password)
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw))
conn.commit()
return f"User {username} registered successfully! You can now log in."
except sqlite3.IntegrityError:
return "Username already exists. Try another one."
# Store Chat in Database
def save_chat(user_id, message, response):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)",
(user_id, message, response, datetime.now()))
conn.commit()
# Retrieve Chat History (User-specific)
def get_chat_history(user_id):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,))
chat_history = cursor.fetchall()
return [(msg, resp) for msg, resp in chat_history]
# --------------------------------- #
#Option 2-load directly from urls
for url in md_files_url:
try:
repo_owner, repo_name, file_path = extract_repo_details(url)
content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY)
if content:
document = Document(page_content=content, metadata={"source": file_path})
documents.append(document)
except ValueError as ve:
logging.error(f"Error processing URL {url}: {ve}")
print(f"Fetched {len(documents)} documents.")
text_chunk = chunk_text(documents)
PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY')
os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY
index_name = 'humblebeeai'
pc = Pinecone(api_key=PINECONE_API_KEY)
existing_indexes = [index['name'] for index in pc.list_indexes()]
if index_name in existing_indexes:
print(f"🟒 Index '{index_name}' already exists. Skipping creation.")
else:
print(f"πŸ”΄ Index '{index_name}' not found. Creating it now...")
pc.create_index(
name=index_name,
dimension=384, # Adjust the dimension based on your embeddings
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
print(f"βœ… Index '{index_name}' created successfully.")
docsearch = PineconeVectorStore.from_documents(
documents=text_chunk,
index_name=index_name,
embedding=download_hugging_face_embeddings()
)
docsearch = PineconeVectorStore.from_existing_index(
index_name=index_name,
embedding=download_hugging_face_embeddings()
)
retriever = docsearch.as_retriever(search_type='mmr', search_kwargs={'k': 10, 'lambda_mult': 0.5})
llm = Ollama(model='llama3.2')
prompt = ChatPromptTemplate.from_messages(
[
('system', system_prompt),
('human', '{input}'),
]
)
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
context_prompt = ChatPromptTemplate.from_messages(
[
('system', retriever_prompt),
MessagesPlaceholder(variable_name='chat_history'),
('human', '{input}'),
]
)
history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt)
qa_prompt = ChatPromptTemplate.from_messages(
[
('system', system_prompt),
MessagesPlaceholder(variable_name='chat_history'),
('human', '{input}'),
]
)
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
def get_session_history(session_id):
if session_id not in base:
base[session_id] = ChatMessageHistory()
stored_msg = base[session_id].messages
if len(stored_msg) >= last_messages:
base[session_id].clear()
for msg in stored_msg[-last_messages:]:
base[session_id].add_message(msg)
return base[session_id]
chat_with_msg_history = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key='input',
history_messages_key='chat_history'
)
from langchain.schema import AIMessage, HumanMessage
def get_combined_history(session_id, user_id):
persistent_history = get_chat_history(user_id) # Retrieve long-term memory
# Convert stored chat history into messages
long_term_messages = [
HumanMessage(content=msg) if i % 2 == 0 else AIMessage(content=resp)
for i, (msg, resp) in enumerate(persistent_history)
]
# Collect session-based recent messages
recent_messages = base[session_id].messages if session_id in base else []
# Merge and return the combined context
combined_history = long_term_messages + recent_messages
print("Combined History for Debugging:", combined_history) # πŸ”Ή Debugging step
return combined_history[-last_messages:] # Optionally limit
def get_response(message, chat_history, session_id, user_id):
if not session_id or not user_id:
return "Session expired. Please log in again.", []
combined_history = get_combined_history(session_id, user_id)
response = chat_with_msg_history.invoke(
{
'input': message,
'chat_history': combined_history # This should be a list of LangChain message objects
},
{'configurable': {'session_id': session_id}},
)
# πŸ”Ή Log the response for debugging
print("LangChain Response:", response)
# πŸ”Ή Ensure response contains 'answer' (adjust this if needed)
if isinstance(response, dict) and 'answer' in response:
chatbot_reply = response['answer']
else:
chatbot_reply = "I'm sorry, I couldn't process that request."
save_chat(user_id, message, chatbot_reply)
chat_history.append((message, chatbot_reply)) # Append instead of overwriting
return "", chat_history
# Logout Function
def logout(session_id):
if session_id in base:
del base[session_id] # Clear session history
return None, "Logged out successfully.", None
# Gradio UI
with gr.Blocks() as demo:
gr.Markdown("## HumblebeeAI Customer Support Chatbot")
with gr.Row():
username = gr.Textbox(label="Username", interactive=True)
password = gr.Textbox(label="Password", type="password", interactive=True)
login_button = gr.Button("Login", interactive=False) # Initially disabled
signup_button = gr.Button("Signup", interactive=False) # Initially disabled
login_status = gr.Textbox(label="Status", interactive=False)
session_state = gr.State(None)
user_id_state = gr.State(None)
with gr.Column(visible=False) as chat_interface:
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Message", placeholder="Ask me anything...")
send_button = gr.Button("Send")
logout_button = gr.Button("Logout")
# πŸ”Ή Enable buttons only when both username & password are filled
def enable_buttons(username, password):
is_valid = bool(username.strip()) and bool(password.strip())
return gr.update(interactive=is_valid), gr.update(interactive=is_valid)
username.change(enable_buttons, [username, password], [login_button, signup_button])
password.change(enable_buttons, [username, password], [login_button, signup_button])
# πŸ”Ή Login Logic (Clears Username & Password)
def login_user(username, password):
session_id, message, user_id = authenticate(username, password)
if session_id:
return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True)
return None, None, message, username, password, [], gr.update(visible=False)
login_button.click(
login_user,
[username, password],
[session_state, user_id_state, login_status, username, password, chatbot, chat_interface]
)
# πŸ”Ή Signup Logic (Clears Username & Password)
def signup_user(username, password):
message = signup(username, password)
return message, "", ""
signup_button.click(
signup_user,
[username, password],
[login_status, username, password]
)
# πŸ”Ή Sending Messages
send_button.click(
get_response,
[msg, chatbot, session_state, user_id_state],
[msg, chatbot]
)
msg.submit(
get_response,
[msg, chatbot, session_state, user_id_state],
[msg, chatbot]
)
# πŸ”Ή Logout Logic (Clears Chat and Resets UI)
def logout_user():
return None, "", "", "", [], gr.update(visible=False)
logout_button.click(
logout_user,
None,
[session_state, username, password, login_status, chatbot, chat_interface]
)
demo.launch(share=True)