Spaces:
Running
Running
from openai import OpenAI | |
from sentence_transformers import SentenceTransformer | |
from time import perf_counter as timer | |
from huggingface_hub import login | |
from datasets import Dataset, load_dataset | |
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import torch as t | |
import os | |
# Cache the model loading | |
def load_sentence_transformer(): | |
"""Cache the SentenceTransformer model loading to avoid reloading on every rerun""" | |
return SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device="cpu") | |
# Cache the database loading | |
def load_data(database_file): | |
return pd.read_parquet(database_file) | |
def load_credentials(): | |
credentials = {} | |
for i in range(1, 51): | |
username = os.environ.get(f"login_{i}") | |
password = os.environ.get(f"password_{i}") | |
if username and password: | |
credentials[username] = password | |
return credentials | |
def authenticate(username, password, credentials): | |
return credentials.get(username) == password | |
def save_reactions_to_dataset(user_type, username, query, results_mpnet, results_openai): | |
data = { | |
"user_type": [], | |
"username": [], | |
"query": [], | |
"retrieved_text": [], | |
"model_type": [], | |
"reaction": [] | |
} | |
# Add results from MPNet | |
for result in results_mpnet: | |
data["user_type"].append(user_type) | |
data["username"].append(username) | |
data["query"].append(query) | |
data["retrieved_text"].append(result["text"]) | |
data["model_type"].append("all-mpnet-base-v2") | |
data["reaction"].append(result["reaction"]) | |
# Add results from OpenAI | |
for result in results_openai: | |
data["user_type"].append(user_type) | |
data["username"].append(username) | |
data["query"].append(query) | |
data["retrieved_text"].append(result["text"]) | |
data["model_type"].append("openai") | |
data["reaction"].append(result["reaction"]) | |
try: | |
# Try to load existing dataset | |
dataset = load_dataset("HumbleBeeAI/al-ghazali-rag-retrieval-evaluation", split="train") | |
existing_data = dataset.to_dict() | |
# Handle missing columns in existing data | |
for key in data: | |
if key not in existing_data: | |
# Add default values for existing rows | |
existing_data[key] = ["" if key in ["username", "model_type"] else None] * len(existing_data.get(next(iter(existing_data)), [])) | |
existing_data[key].extend(data[key]) | |
except Exception: | |
# If dataset doesn't exist, use the new data structure | |
existing_data = data | |
updated_dataset = Dataset.from_dict(existing_data) | |
updated_dataset.push_to_hub("HumbleBeeAI/al-ghazali-rag-retrieval-evaluation") | |
def update_reaction(model_type, idx): | |
st.session_state.reactions[f"reaction_{model_type}_{idx}"] = st.session_state[f"reaction_{model_type}_{idx}"] | |
def cosine_similarity(embedding_0, embedding_1): | |
dot_product = sum(a * b for a, b in zip(embedding_0, embedding_1)) | |
norm_0 = sum(a * a for a in embedding_0) ** 0.5 | |
norm_1 = sum(b * b for b in embedding_1) ** 0.5 | |
return dot_product / (norm_0 * norm_1) | |
def generate_embedding(model, text, model_type="all-mpnet-base-v2"): | |
if model_type == "all-mpnet-base-v2": | |
chunk_embedding = model.encode( | |
text, | |
convert_to_tensor = True | |
) | |
return np.array(t.Tensor.cpu(chunk_embedding)) | |
elif model_type == "openai": | |
response = model.embeddings.create( | |
input=text, | |
model="text-embedding-3-small" | |
) | |
return response.data[0].embedding | |
def search_query(model, query, df, model_type, n=3): | |
if model_type == "all-mpnet-base-v2": | |
embedding = generate_embedding(model, query, model_type=model_type) | |
df['similarities'] = df.all_mpnet_embedding.apply(lambda x: cosine_similarity(x, embedding)) | |
elif model_type == "openai": | |
embedding = generate_embedding(model, query, model_type=model_type) | |
df['similarities'] = df.openai_embedding.apply(lambda x: cosine_similarity(x, embedding)) | |
res = df.sort_values('similarities', ascending=False).head(n) | |
return res | |
def clear_search_state(): | |
"""Clear search-related session state variables""" | |
st.session_state.search_performed = False | |
st.session_state.top_results_mpnet = [] | |
st.session_state.top_results_openai = [] | |
st.session_state.reactions = {} | |
st.session_state.results_saved = False | |
def main(): | |
st.title("EnlightenQalb (Alchemy of Happiness)") | |
# Initialize session state variables | |
if 'authenticated' not in st.session_state: | |
st.session_state.authenticated = False | |
st.session_state.username = None | |
st.session_state.search_performed = False | |
st.session_state.top_results_mpnet = [] | |
st.session_state.top_results_openai = [] | |
st.session_state.reactions = {} | |
st.session_state.results_saved = False | |
st.session_state.current_query = "" | |
# Load the model at startup (will be cached) | |
embedding_model = load_sentence_transformer() | |
# Load credentials | |
credentials = load_credentials() | |
# Authentication handling | |
if not st.session_state.authenticated: | |
st.sidebar.title("Login") | |
username = st.sidebar.text_input("Username") | |
password = st.sidebar.text_input("Password", type="password") | |
if st.sidebar.button("Login"): | |
if authenticate(username, password, credentials): | |
st.session_state.authenticated = True | |
st.session_state.username = username | |
st.sidebar.success("Logged in successfully!") | |
else: | |
st.sidebar.error("Invalid username or password") | |
if not st.session_state.authenticated: | |
st.warning("Please login to access the application.") | |
return | |
# Login to Hugging Face | |
huggingface_token = os.environ.get("al_ghazali_rag_retrieval_evaluation") | |
if huggingface_token: | |
login(token=huggingface_token) | |
else: | |
st.error("Hugging Face API token not found in environment variables.") | |
# Initialize OpenAI client | |
client = OpenAI() | |
# Load database | |
database_file = '[all_embedded] The Alchemy of Happiness (GhazzΔlΔ«, Claud Field) (Z-Library).parquet' | |
try: | |
df = load_data(database_file) | |
st.success("Database loaded successfully!") | |
user_type = st.radio( | |
"Select your user type:", | |
["Layman", "Enthusiast", "Ustaz (Expert)"], | |
horizontal=True | |
) | |
query = st.text_area("Enter your query:") | |
# Clear search state if query changes | |
if query != st.session_state.current_query: | |
clear_search_state() | |
st.session_state.current_query = query | |
if st.button("Search") and query: | |
clear_search_state() # Clear previous search results | |
# Perform searches with both models | |
start_time = timer() | |
# MPNet search | |
res_mpnet = search_query(embedding_model, query, df, "all-mpnet-base-v2", n=1) | |
st.session_state.top_results_mpnet = res_mpnet.index.tolist() | |
# OpenAI search | |
res_openai = search_query(client, query, df, "openai", n=1) | |
st.session_state.top_results_openai = res_openai.index.tolist() | |
end_time = timer() | |
st.write(f"Time taken to compute scores: {end_time - start_time:.5f} seconds") | |
st.session_state.search_performed = True | |
# Display results and collect reactions | |
if st.session_state.search_performed and not st.session_state.results_saved: | |
st.subheader("Query Results") | |
st.write(f"Query: {query}") | |
# Display MPNet results | |
st.markdown("### Results from MPNet Model") | |
for idx in st.session_state.top_results_mpnet: | |
text = df.iloc[int(idx)]["ext"] | |
st.write(f"**Text:** {text}") | |
key = f"reaction_mpnet_{idx}" | |
if key not in st.session_state.reactions: | |
st.session_state.reactions[key] = "π€·" | |
reaction = st.radio( | |
label=f"Rate this MPNet result (Result {idx}):", | |
options=["π", "π€·", "π"], | |
index=["π", "π€·", "π"].index(st.session_state.reactions[key]), | |
key=key, | |
horizontal=True, | |
on_change=update_reaction, | |
args=("mpnet", idx) | |
) | |
# Display OpenAI results | |
st.markdown("### Results from OpenAI Model") | |
for idx in st.session_state.top_results_openai: | |
text = df.iloc[int(idx)]["ext"] | |
st.write(f"**Text:** {text}") | |
key = f"reaction_openai_{idx}" | |
if key not in st.session_state.reactions: | |
st.session_state.reactions[key] = "π€·" | |
reaction = st.radio( | |
label=f"Rate this OpenAI result (Result {idx}):", | |
options=["π", "π€·", "π"], | |
index=["π", "π€·", "π"].index(st.session_state.reactions[key]), | |
key=key, | |
horizontal=True, | |
on_change=update_reaction, | |
args=("openai", idx) | |
) | |
# Save reactions button | |
if st.button("Save Reactions"): | |
# Collect MPNet results | |
results_mpnet = [] | |
for idx in st.session_state.top_results_mpnet: | |
key = f"reaction_mpnet_{idx}" | |
results_mpnet.append({ | |
"text": df.iloc[int(idx)]["ext"], | |
"reaction": st.session_state.reactions[key] | |
}) | |
# Collect OpenAI results | |
results_openai = [] | |
for idx in st.session_state.top_results_openai: | |
key = f"reaction_openai_{idx}" | |
results_openai.append({ | |
"text": df.iloc[int(idx)]["ext"], | |
"reaction": st.session_state.reactions[key] | |
}) | |
save_reactions_to_dataset( | |
user_type, | |
st.session_state.username, | |
query, | |
results_mpnet, | |
results_openai | |
) | |
st.success("Reactions saved successfully!") | |
clear_search_state() | |
except Exception as e: | |
st.error(f"Failed to load database: {str(e)}") | |
if __name__ == "__main__": | |
main() |