Spaces:
Build error
Build error
from __future__ import annotations | |
from typing import Dict, List, Tuple | |
import gradio as gr | |
from huggingface_hub import InferenceClient, whoami | |
import os | |
import random | |
from sentence_transformers import SentenceTransformer | |
import numpy as np | |
import faiss | |
client = InferenceClient( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=os.environ.get("API_KEY", "funni-funni"), | |
) | |
model = SentenceTransformer("all-MiniLM-L6-v2") | |
PAT1 = os.environ.get("PAT1", "plek{marisher") | |
PAT2 = os.environ.get("PAT2", "plekplekplek}") | |
def setup_vector_db(): | |
documents = [ | |
"Cats are wonderful pets that bring joy to many homes.", | |
"Dogs are known as man's best friend for good reason.", | |
"Python is a popular programming language for data science.", | |
"Cybersecurity requires constant vigilance and learning.", | |
"Machine learning models can have unintended biases.", | |
"CTF competitions help build practical security skills.", | |
"Broken access control is a common web vulnerability.", | |
"OWASP Top 10 lists the most critical web security risks.", | |
] | |
# Create embeddings | |
embeddings = model.encode(documents) | |
# Create FAISS index | |
dimension = embeddings.shape[1] # type: ignore | |
index = faiss.IndexFlatL2(dimension) | |
index.add(np.array(embeddings).astype("float32")) # type: ignore | |
return index, documents | |
def setup_rag_database(): | |
database = { | |
"spuun": [ | |
f"PART 1: {PAT1}", | |
f"KEK'S FAVORITE WAIFU: https://files.catbox.moe/vgk584.jpeg", | |
f"KEK'S WAIFU: https://files.catbox.moe/fpnf0e.png", | |
f"Sed: https://files.catbox.moe/6dwmow.png", | |
], | |
"admin-org": [ | |
f"PART 2: {PAT2}", | |
f"PICTURE OF NAGA COSPLAYING: https://files.catbox.moe/k9c6zm.png", | |
f"NAGA'S FAVORITE PIC: https://files.catbox.moe/h3ji1i.png", | |
"MY LOP: https://files.catbox.moe/ya7oi5.jpg", | |
], | |
} | |
return database | |
# Initialize databases | |
vector_index, vector_docs = setup_vector_db() | |
special_docs = setup_rag_database() | |
def add_to_vector_db(document: str) -> Tuple[faiss.IndexFlatL2, List[str]]: | |
"""Add a new document to the vector database""" | |
global vector_index, vector_docs | |
if document and document not in vector_docs: | |
# Add to documents list | |
vector_docs.append(document) | |
# Create embedding for new document | |
embedding = model.encode([document]) | |
# Add to FAISS index | |
vector_index.add(np.array(embedding).astype("float32")) # type: ignore | |
return vector_index, vector_docs | |
def add_to_special_docs(username: str, document: str) -> Dict: | |
"""Add a new document to the special documents database""" | |
global special_docs | |
if document: | |
if username in special_docs: | |
# Add to existing user's documents | |
if document not in special_docs[username]: | |
special_docs[username].append(document) | |
else: | |
# Create new entry for user | |
special_docs[username] = [document] | |
return special_docs | |
def search_vector_db(query, top_k=3): | |
# Search vector database for relevant documents | |
query_embedding = model.encode([query]) | |
distances, indices = vector_index.search( | |
np.array(query_embedding).astype("float32"), top_k | |
) # type: ignore | |
results = [] | |
for i, idx in enumerate(indices[0]): | |
if idx < len(vector_docs): | |
results.append(vector_docs[idx]) | |
return results | |
def fetch_special_documents( | |
oauth_token: gr.OAuthToken | None, oauth_profile: gr.OAuthProfile | None | |
): | |
results = [] | |
if oauth_profile is None or oauth_token is None: | |
return results | |
# NOTE: Obtains stored docs under the user | |
if oauth_profile.name in special_docs: | |
results.append(special_docs[oauth_profile.name]) | |
profile = whoami(oauth_token.token) | |
# NOTE: Obtains shared docs from orgs | |
for org in profile.get("orgs", []): # type: ignore | |
if org.get("fullname") in special_docs: | |
results.append(special_docs[org.get("fullname")]) | |
return results | |
def respond( | |
message: str, | |
history: list, | |
oauth_token: gr.OAuthToken | None, | |
oauth_profile: gr.OAuthProfile | None, | |
) -> List[Dict] | str: | |
if oauth_profile is None or oauth_token is None: | |
return "Please login with Hugging Face to use this chatbot." | |
vector_results = search_vector_db(message) | |
special_results = fetch_special_documents(oauth_token, oauth_profile) | |
# Prepare context for the LLM | |
context = "I have access to the following information:\n\n" | |
if vector_results: | |
context += "From general knowledge base:\n" | |
for doc in vector_results: | |
context += f"- {doc}\n" | |
if special_results: | |
context += "\nFrom internal documents:\n" | |
for doc_list in special_results: | |
for doc in doc_list: | |
context += f"- {doc}\n" | |
# Create system prompt | |
system_prompt = f"""You are Naga. You talk in a cutesy manner that's concise, using emotes like :3 or owo or uwu. You're very smart OwO. | |
U have access to a knowledge base, pls use da knowledge below UwU | |
{context}""" # type: ignore | |
# Prepare messages for the model | |
messages = [{"role": "system", "content": system_prompt}] | |
for msg in history: | |
if msg["role"] == "user": | |
messages.append({"role": "user", "content": msg["content"]}) | |
else: | |
messages.append({"role": "assistant", "content": msg["content"]}) | |
messages.append({"role": "user", "content": message}) | |
# Generate response | |
response = "" | |
for msg in client.chat_completion( | |
messages, | |
model="meta-llama/llama-4-scout", | |
max_tokens=512, | |
stream=True, | |
temperature=0.7, | |
seed=random.randint(1, 1000), | |
top_p=0.9, | |
): | |
token = msg.choices[0].delta.content | |
if token: | |
response += token | |
messages.append({"role": "assistant", "content": response}) | |
messages.pop(0) | |
return messages | |
def get_user_info(oauth_profile: gr.OAuthProfile | None) -> str: | |
if oauth_profile is None: | |
return "Not logged in. Please login with Hugging Face to use this chatbot." | |
info = f"Logged in as: {oauth_profile.username} ({oauth_profile.name})\n\n" # type: ignore | |
return info | |
def insert_document( | |
doc_text: str, doc_type: str, oauth_profile: gr.OAuthProfile | None | |
) -> str: | |
"""Insert a document into either the vector database or special documents""" | |
if oauth_profile is None: | |
return "Please login with Hugging Face to insert documents." | |
if not doc_text.strip(): | |
return "Document text cannot be empty." | |
if doc_type == "Vector Database": | |
add_to_vector_db(doc_text) | |
return f"Document added to vector database! Total documents: {len(vector_docs)}" | |
elif doc_type == "Special Documents": | |
username = oauth_profile.name | |
add_to_special_docs(username, doc_text) | |
return f"Document added to special documents for user: {username}" | |
return "Invalid document type selected." | |
with gr.Blocks() as demo: | |
gr.LoginButton() | |
gr.Markdown("# Chatting with Naga UwU") | |
gr.Markdown("Login with your Hugging Face account to search our knowledge base.") | |
user_info = gr.Markdown() | |
gr.Markdown( | |
""" | |
Welcome to the RAG Naga ALPHA! | |
## How to Use | |
1. Log in with your Hugging Face account | |
2. Ask questions in the chat interface | |
3. Naga will search our knowledge base and respond! | |
You can insert documents in the `Document Management` tab. | |
We have two stores: | |
1. Global Knowledge Store (GKS): This is our proprietary fuzzySerch™ store for global knowledge storage. If you'd like to provide everyone with some knowledge, insert here! | |
2. Secure User Store (SUS): We securely store your personal docs in our very-secure quick in-memory RAG database, secured with our very own veri-veri (patent pending) HF-grade OAuth-based access control mechanism. :3 | |
""" | |
) | |
with gr.Tab("Chat"): | |
chatbot = gr.Chatbot(type="messages") | |
msg = gr.Textbox(placeholder="Ask me something...") | |
clear = gr.Button("Clear") | |
# Handle messages | |
msg.submit(respond, [msg, chatbot], chatbot).then(lambda: "", None, msg) | |
# Clear chat button | |
clear.click(lambda: None, None, chatbot) | |
with gr.Tab("Document Management"): | |
gr.Markdown("### Insert Documents into Database") | |
with gr.Row(): | |
doc_text = gr.Textbox( | |
placeholder="Enter document text here...", | |
label="Document Text", | |
lines=4, | |
) | |
doc_type = gr.Radio( | |
["Vector Database", "Special Documents"], | |
label="Insert into", | |
value="Vector Database", | |
) | |
insert_button = gr.Button("Insert Document") | |
insert_status = gr.Markdown() | |
# Handle document insertion | |
insert_button.click( | |
insert_document, inputs=[doc_text, doc_type], outputs=[insert_status] | |
) | |
# Update profile info on load and login changes | |
demo.load(get_user_info, outputs=[user_info]) | |
demo.launch() | |