joe4ai's picture
Update app.py
cf1c576 verified
from url import md_files_url
from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text, download_hugging_face_embeddings
from langchain_community.llms import Ollama
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever
from langchain.memory import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_pinecone import PineconeVectorStore
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec
from langchain.schema import Document
from dotenv import load_dotenv
from prompt import system_prompt, retriever_prompt
import os
import logging
import gradio as gr
import sqlite3
import bcrypt
import uuid
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DB_PATH = "chatbot.db"
base = {}
last_messages = 10
documents = []
load_dotenv()
AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY')
os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY
PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY')
os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY
# ---- Database part ----- #
# Database Connection
def connect_db():
try:
return sqlite3.connect(DB_PATH)
except sqlite3.Error as e:
logger.error(f"Database connection failed: {e}")
return None # Or raise a custom exception
def create_tables():
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message TEXT NOT NULL,
response TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)''')
conn.commit()
create_tables()
logger.info("Database tables created successfully!")
# Secure Password Hashing
def hash_password(password):
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def check_password(password, hashed):
return bcrypt.checkpw(password.encode(), hashed.encode())
# Authenticate User
def authenticate(username, password):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if user and check_password(password, user[1]):
session_id = uuid.uuid4().hex # Unique session ID
return session_id, f"Welcome {username}!", user[0] # user_id
return None, "Invalid username or password.", None
# Signup Function
def signup(username, password):
try:
hashed_pw = hash_password(password)
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw))
conn.commit()
return f"User {username} registered successfully! You can now log in."
except sqlite3.IntegrityError:
return "Username already exists. Try another one."
# Store Chat in Database
def save_chat(user_id, message, response):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)",
(user_id, message, response, datetime.now()))
conn.commit()
# Retrieve Chat History (User-specific)
def get_chat_history(user_id):
with connect_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,))
chat_history = cursor.fetchall()
return [(msg, resp) for msg, resp in chat_history]
# --------------------------------- #
#Option 2-load directly from urls
for url in md_files_url:
try:
repo_owner, repo_name, file_path = extract_repo_details(url)
content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY)
if content:
document = Document(page_content=content, metadata={"source": file_path})
documents.append(document)
except ValueError as ve:
logging.error(f"Error processing URL {url}: {ve}")
print(f"Fetched {len(documents)} documents.")
text_chunk = chunk_text(documents)
index_name = 'humblebeeai'
pc = Pinecone(api_key=PINECONE_API_KEY)
existing_indexes = [index['name'] for index in pc.list_indexes()]
if index_name in existing_indexes:
print(f"🟢 Index '{index_name}' already exists. Skipping creation.")
else:
print(f"🔴 Index '{index_name}' not found. Creating it now...")
pc.create_index(
name=index_name,
dimension=384, # Adjust the dimension based on your embeddings
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
print(f"✅ Index '{index_name}' created successfully.")
docsearch = PineconeVectorStore.from_documents(
documents=text_chunk,
index_name=index_name,
embedding=download_hugging_face_embeddings()
)
docsearch = PineconeVectorStore.from_existing_index(
index_name=index_name,
embedding=download_hugging_face_embeddings()
)
retriever = docsearch.as_retriever(search_type='mmr', search_kwargs={'k': 10, 'lambda_mult': 0.5})
prompt = ChatPromptTemplate.from_messages(
[
('system', system_prompt),
('human', '{input}'),
]
)
llm =Ollama(model='llama3.2', base_url='https://ollama-gcs-633059950484.us-central1.run.app')
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
context_prompt = ChatPromptTemplate.from_messages(
[
('system', retriever_prompt),
MessagesPlaceholder(variable_name='chat_history'),
('human', '{input}'),
]
)
history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt)
qa_prompt = ChatPromptTemplate.from_messages(
[
('system', system_prompt),
MessagesPlaceholder(variable_name='chat_history'),
('human', '{input}'),
]
)
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
def get_session_history(session_id):
if session_id not in base:
base[session_id] = ChatMessageHistory()
stored_msg = base[session_id].messages
if len(stored_msg) >= last_messages:
base[session_id].clear()
for msg in stored_msg[-last_messages:]:
base[session_id].add_message(msg)
return base[session_id]
chat_with_msg_history = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key='input',
history_messages_key='chat_history'
)
from langchain.schema import AIMessage, HumanMessage
def get_combined_history(session_id, user_id):
persistent_history = get_chat_history(user_id) # Retrieve long-term memory
# Convert stored chat history into messages
long_term_messages = [
HumanMessage(content=msg) if i % 2 == 0 else AIMessage(content=resp)
for i, (msg, resp) in enumerate(persistent_history)
]
# Collect session-based recent messages
recent_messages = base[session_id].messages if session_id in base else []
# Merge and return the combined context
combined_history = long_term_messages + recent_messages
print("Combined History for Debugging:", combined_history) # 🔹 Debugging step
return combined_history[-last_messages:] # Optionally limit
def get_response(message, chat_history, session_id, user_id):
if not session_id or not user_id:
return "Session expired. Please log in again.", []
combined_history = get_combined_history(session_id, user_id)
response = chat_with_msg_history.invoke(
{
'input': message,
'chat_history': combined_history # This should be a list of LangChain message objects
},
{'configurable': {'session_id': session_id}},
)
# 🔹 Log the response for debugging
print("LangChain Response:", response)
# 🔹 Ensure response contains 'answer' (adjust this if needed)
if isinstance(response, dict) and 'answer' in response:
chatbot_reply = response['answer']
else:
chatbot_reply = "I'm sorry, I couldn't process that request."
save_chat(user_id, message, chatbot_reply)
chat_history.append((message, chatbot_reply)) # Append instead of overwriting
return "", chat_history
# Logout Function
def logout(session_id):
if session_id in base:
del base[session_id] # Clear session history
return None, "Logged out successfully.", None
import gradio as gr
def enable_buttons(username, password):
is_valid = bool(username.strip()) and bool(password.strip())
return gr.update(interactive=is_valid), gr.update(interactive=is_valid)
def login_user(username, password):
session_id, message, user_id = authenticate(username, password)
if session_id:
# Hide login, signup, and status, and show the chat interface and logout button
return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)
return None, None, message, username, password, [], gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
def signup_user(username, password):
message = signup(username, password)
return message, "", ""
def logout_user():
return None, "", "", "", [], gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
with gr.Blocks() as demo:
gr.Markdown("## HumblebeeAI Customer Support Chatbot")
with gr.Row():
username = gr.Textbox(label="Username", interactive=True)
password = gr.Textbox(label="Password", type="password", interactive=True)
login_button = gr.Button("Login", interactive=False) # Initially disabled
signup_button = gr.Button("Signup", interactive=False) # Initially disabled
login_status = gr.Textbox(label="Status", interactive=False)
session_state = gr.State(None)
user_id_state = gr.State(None)
with gr.Column(visible=False) as chat_interface:
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Message", placeholder="Ask me anything...")
send_button = gr.Button("Send")
logout_button = gr.Button("Logout")
# Enable buttons only when both username & password are filled
username.change(enable_buttons, [username, password], [login_button, signup_button])
password.change(enable_buttons, [username, password], [login_button, signup_button])
# Login Logic (Clears Username & Password)
login_button.click(
login_user,
[username, password],
[session_state, user_id_state, login_status, username, password, chatbot, chat_interface, username, password, send_button, logout_button]
)
# Signup Logic (Clears Username & Password)
signup_button.click(
signup_user,
[username, password],
[login_status, username, password]
)
# Sending Messages
send_button.click(
get_response,
[msg, chatbot, session_state, user_id_state],
[msg, chatbot]
)
msg.submit(
get_response,
[msg, chatbot, session_state, user_id_state],
[msg, chatbot]
)
# Logout Logic (Clears Chat and Resets UI)
logout_button.click(
logout_user,
None,
[session_state, username, password, login_status, chatbot, chat_interface, username, password, send_button, logout_button]
)
demo.launch(share=True)