Spaces:
Runtime error
Runtime error
from url import md_files_url | |
from get_data import extract_repo_details, fetch_md_file_via_api, data_loader, chunk_text, download_hugging_face_embeddings | |
from langchain_community.llms import Ollama | |
from langchain.chains import create_retrieval_chain | |
from langchain.chains.combine_documents import create_stuff_documents_chain | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.chains import create_history_aware_retriever | |
from langchain.memory import ChatMessageHistory | |
from langchain_core.runnables.history import RunnableWithMessageHistory | |
from langchain_pinecone import PineconeVectorStore | |
from pinecone.grpc import PineconeGRPC as Pinecone | |
from pinecone import ServerlessSpec | |
from langchain.schema import Document | |
from dotenv import load_dotenv | |
from prompt import system_prompt, retriever_prompt | |
import os | |
import logging | |
import gradio as gr | |
import sqlite3 | |
import bcrypt | |
import uuid | |
from datetime import datetime | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
DB_PATH = "chatbot.db" | |
base = {} | |
last_messages = 10 | |
documents = [] | |
load_dotenv() | |
AUTH_TOKEN_KEY = os.environ.get('AUTH_TOKEN_KEY') | |
os.environ['AUTH_TOKEN_KEY'] = AUTH_TOKEN_KEY | |
PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY') | |
os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY | |
# ---- Database part ----- # | |
# Database Connection | |
def connect_db(): | |
try: | |
return sqlite3.connect(DB_PATH) | |
except sqlite3.Error as e: | |
logger.error(f"Database connection failed: {e}") | |
return None # Or raise a custom exception | |
def create_tables(): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
username TEXT UNIQUE NOT NULL, | |
password_hash TEXT NOT NULL | |
)''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS chat_history ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id INTEGER NOT NULL, | |
message TEXT NOT NULL, | |
response TEXT NOT NULL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE | |
)''') | |
conn.commit() | |
create_tables() | |
logger.info("Database tables created successfully!") | |
# Secure Password Hashing | |
def hash_password(password): | |
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() | |
def check_password(password, hashed): | |
return bcrypt.checkpw(password.encode(), hashed.encode()) | |
# Authenticate User | |
def authenticate(username, password): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) | |
user = cursor.fetchone() | |
if user and check_password(password, user[1]): | |
session_id = uuid.uuid4().hex # Unique session ID | |
return session_id, f"Welcome {username}!", user[0] # user_id | |
return None, "Invalid username or password.", None | |
# Signup Function | |
def signup(username, password): | |
try: | |
hashed_pw = hash_password(password) | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_pw)) | |
conn.commit() | |
return f"User {username} registered successfully! You can now log in." | |
except sqlite3.IntegrityError: | |
return "Username already exists. Try another one." | |
# Store Chat in Database | |
def save_chat(user_id, message, response): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("INSERT INTO chat_history (user_id, message, response, created_at) VALUES (?, ?, ?, ?)", | |
(user_id, message, response, datetime.now())) | |
conn.commit() | |
# Retrieve Chat History (User-specific) | |
def get_chat_history(user_id): | |
with connect_db() as conn: | |
cursor = conn.cursor() | |
cursor.execute("SELECT message, response FROM chat_history WHERE user_id = ? ORDER BY created_at", (user_id,)) | |
chat_history = cursor.fetchall() | |
return [(msg, resp) for msg, resp in chat_history] | |
# --------------------------------- # | |
#Option 2-load directly from urls | |
for url in md_files_url: | |
try: | |
repo_owner, repo_name, file_path = extract_repo_details(url) | |
content = fetch_md_file_via_api(repo_owner, repo_name, file_path, AUTH_TOKEN_KEY) | |
if content: | |
document = Document(page_content=content, metadata={"source": file_path}) | |
documents.append(document) | |
except ValueError as ve: | |
logging.error(f"Error processing URL {url}: {ve}") | |
print(f"Fetched {len(documents)} documents.") | |
text_chunk = chunk_text(documents) | |
index_name = 'humblebeeai' | |
pc = Pinecone(api_key=PINECONE_API_KEY) | |
existing_indexes = [index['name'] for index in pc.list_indexes()] | |
if index_name in existing_indexes: | |
print(f"🟢 Index '{index_name}' already exists. Skipping creation.") | |
else: | |
print(f"🔴 Index '{index_name}' not found. Creating it now...") | |
pc.create_index( | |
name=index_name, | |
dimension=384, # Adjust the dimension based on your embeddings | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud="aws", | |
region="us-east-1" | |
) | |
) | |
print(f"✅ Index '{index_name}' created successfully.") | |
docsearch = PineconeVectorStore.from_documents( | |
documents=text_chunk, | |
index_name=index_name, | |
embedding=download_hugging_face_embeddings() | |
) | |
docsearch = PineconeVectorStore.from_existing_index( | |
index_name=index_name, | |
embedding=download_hugging_face_embeddings() | |
) | |
retriever = docsearch.as_retriever(search_type='mmr', search_kwargs={'k': 10, 'lambda_mult': 0.5}) | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
('system', system_prompt), | |
('human', '{input}'), | |
] | |
) | |
llm =Ollama(model='llama3.2', base_url='https://ollama-gcs-633059950484.us-central1.run.app') | |
question_answer_chain = create_stuff_documents_chain(llm, prompt) | |
rag_chain = create_retrieval_chain(retriever, question_answer_chain) | |
context_prompt = ChatPromptTemplate.from_messages( | |
[ | |
('system', retriever_prompt), | |
MessagesPlaceholder(variable_name='chat_history'), | |
('human', '{input}'), | |
] | |
) | |
history_aware_retriever = create_history_aware_retriever(llm, retriever, context_prompt) | |
qa_prompt = ChatPromptTemplate.from_messages( | |
[ | |
('system', system_prompt), | |
MessagesPlaceholder(variable_name='chat_history'), | |
('human', '{input}'), | |
] | |
) | |
question_answer_chain = create_stuff_documents_chain(llm, prompt) | |
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) | |
def get_session_history(session_id): | |
if session_id not in base: | |
base[session_id] = ChatMessageHistory() | |
stored_msg = base[session_id].messages | |
if len(stored_msg) >= last_messages: | |
base[session_id].clear() | |
for msg in stored_msg[-last_messages:]: | |
base[session_id].add_message(msg) | |
return base[session_id] | |
chat_with_msg_history = RunnableWithMessageHistory( | |
rag_chain, | |
get_session_history, | |
input_messages_key='input', | |
history_messages_key='chat_history' | |
) | |
from langchain.schema import AIMessage, HumanMessage | |
def get_combined_history(session_id, user_id): | |
persistent_history = get_chat_history(user_id) # Retrieve long-term memory | |
# Convert stored chat history into messages | |
long_term_messages = [ | |
HumanMessage(content=msg) if i % 2 == 0 else AIMessage(content=resp) | |
for i, (msg, resp) in enumerate(persistent_history) | |
] | |
# Collect session-based recent messages | |
recent_messages = base[session_id].messages if session_id in base else [] | |
# Merge and return the combined context | |
combined_history = long_term_messages + recent_messages | |
print("Combined History for Debugging:", combined_history) # 🔹 Debugging step | |
return combined_history[-last_messages:] # Optionally limit | |
def get_response(message, chat_history, session_id, user_id): | |
if not session_id or not user_id: | |
return "Session expired. Please log in again.", [] | |
combined_history = get_combined_history(session_id, user_id) | |
response = chat_with_msg_history.invoke( | |
{ | |
'input': message, | |
'chat_history': combined_history # This should be a list of LangChain message objects | |
}, | |
{'configurable': {'session_id': session_id}}, | |
) | |
# 🔹 Log the response for debugging | |
print("LangChain Response:", response) | |
# 🔹 Ensure response contains 'answer' (adjust this if needed) | |
if isinstance(response, dict) and 'answer' in response: | |
chatbot_reply = response['answer'] | |
else: | |
chatbot_reply = "I'm sorry, I couldn't process that request." | |
save_chat(user_id, message, chatbot_reply) | |
chat_history.append((message, chatbot_reply)) # Append instead of overwriting | |
return "", chat_history | |
# Logout Function | |
def logout(session_id): | |
if session_id in base: | |
del base[session_id] # Clear session history | |
return None, "Logged out successfully.", None | |
import gradio as gr | |
def enable_buttons(username, password): | |
is_valid = bool(username.strip()) and bool(password.strip()) | |
return gr.update(interactive=is_valid), gr.update(interactive=is_valid) | |
def login_user(username, password): | |
session_id, message, user_id = authenticate(username, password) | |
if session_id: | |
# Hide login, signup, and status, and show the chat interface and logout button | |
return session_id, user_id, message, "", "", get_chat_history(user_id), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) | |
return None, None, message, username, password, [], gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
def signup_user(username, password): | |
message = signup(username, password) | |
return message, "", "" | |
def logout_user(): | |
return None, "", "", "", [], gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
with gr.Blocks() as demo: | |
gr.Markdown("## HumblebeeAI Customer Support Chatbot") | |
with gr.Row(): | |
username = gr.Textbox(label="Username", interactive=True) | |
password = gr.Textbox(label="Password", type="password", interactive=True) | |
login_button = gr.Button("Login", interactive=False) # Initially disabled | |
signup_button = gr.Button("Signup", interactive=False) # Initially disabled | |
login_status = gr.Textbox(label="Status", interactive=False) | |
session_state = gr.State(None) | |
user_id_state = gr.State(None) | |
with gr.Column(visible=False) as chat_interface: | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(label="Message", placeholder="Ask me anything...") | |
send_button = gr.Button("Send") | |
logout_button = gr.Button("Logout") | |
# Enable buttons only when both username & password are filled | |
username.change(enable_buttons, [username, password], [login_button, signup_button]) | |
password.change(enable_buttons, [username, password], [login_button, signup_button]) | |
# Login Logic (Clears Username & Password) | |
login_button.click( | |
login_user, | |
[username, password], | |
[session_state, user_id_state, login_status, username, password, chatbot, chat_interface, username, password, send_button, logout_button] | |
) | |
# Signup Logic (Clears Username & Password) | |
signup_button.click( | |
signup_user, | |
[username, password], | |
[login_status, username, password] | |
) | |
# Sending Messages | |
send_button.click( | |
get_response, | |
[msg, chatbot, session_state, user_id_state], | |
[msg, chatbot] | |
) | |
msg.submit( | |
get_response, | |
[msg, chatbot, session_state, user_id_state], | |
[msg, chatbot] | |
) | |
# Logout Logic (Clears Chat and Resets UI) | |
logout_button.click( | |
logout_user, | |
None, | |
[session_state, username, password, login_status, chatbot, chat_interface, username, password, send_button, logout_button] | |
) | |
demo.launch(share=True) | |