Spaces:
Sleeping
Sleeping
File size: 4,659 Bytes
8f28ceb 473e3b4 8f28ceb 473e3b4 8f28ceb aeafe2c 8f28ceb aeafe2c 8f28ceb aeafe2c 8f28ceb aeafe2c 8f28ceb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from sentence_transformers import SentenceTransformer
from PyPDF2 import PdfReader
import numpy as np
import torch
class RAGChatbot:
def __init__(self,
model_name="facebook/opt-350m",
embedding_model="all-MiniLM-L6-v2"):
# Initialize tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
# Initialize embedding model
self.embedding_model = SentenceTransformer(embedding_model)
# Initialize document storage
self.documents = []
self.embeddings = []
def extract_text_from_pdf(self, pdf_path):
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
def load_documents(self, file_paths):
self.documents = []
self.embeddings = []
for file_path in file_paths:
if file_path.endswith('.pdf'):
text = self.extract_text_from_pdf(file_path)
else:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
# Split text into chunks
chunks = [text[i:i+500] for i in range(0, len(text), 500)]
self.documents.extend(chunks)
# Generate embeddings
self.embeddings = self.embedding_model.encode(self.documents)
return f"Loaded {len(self.documents)} text chunks from {len(file_paths)} files"
def retrieve_relevant_context(self, query, top_k=3):
if not self.documents:
return "No documents loaded"
# Generate query embedding
query_embedding = self.embedding_model.encode([query])[0]
# Calculate cosine similarities
similarities = np.dot(self.embeddings, query_embedding) / (
np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(query_embedding)
)
# Get top k most similar documents
top_indices = similarities.argsort()[-top_k:][::-1]
return " ".join([self.documents[i] for i in top_indices])
def generate_response(self, query, context):
# Construct prompt with context
full_prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
# Generate response
inputs = self.tokenizer(full_prompt, return_tensors="pt").to(self.model.device)
outputs = self.model.generate(**inputs, max_new_tokens=150)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return response.split("Answer:")[-1].strip()
def chat(self, query, history):
try:
# Retrieve relevant context
context = self.retrieve_relevant_context(query)
# Generate response
response = self.generate_response(query, context)
# Append to history and return as list of tuples
updated_history = history + [[query, response]]
return updated_history
except Exception as e:
return history + [[query, f"An error occurred: {str(e)}"]]
# Create Gradio interface
def create_interface():
rag_chatbot = RAGChatbot()
with gr.Blocks() as demo:
gr.Markdown("# RAG Chatbot with Hugging Face Models")
with gr.Row():
file_input = gr.File(label="Upload Documents", file_count="multiple", type="filepath")
load_btn = gr.Button("Load Documents")
status_output = gr.Textbox(label="Load Status")
chatbot = gr.Chatbot()
msg = gr.Textbox(label="Enter your query")
submit_btn = gr.Button("Send")
clear_btn = gr.Button("Clear Chat")
# Event handlers
load_btn.click(
rag_chatbot.load_documents,
inputs=[file_input],
outputs=[status_output]
)
submit_btn.click(
rag_chatbot.chat,
inputs=[msg, chatbot],
outputs=[chatbot, msg]
)
msg.submit(
rag_chatbot.chat,
inputs=[msg, chatbot],
outputs=[chatbot, msg]
)
clear_btn.click(lambda: None, None, [chatbot, msg])
return demo
# Launch the app
if __name__ == "__main__":
demo = create_interface()
demo.launch() |