Bot_RAG / app.py
pradeepsengarr's picture
Update app.py
39d36c9 verified
raw
history blame
22.6 kB
# import os
# import logging
# import math
# import streamlit as st
# import fitz # PyMuPDF
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
# from langchain_community.document_loaders import PDFMinerLoader
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# from langchain_community.embeddings import SentenceTransformerEmbeddings
# from langchain_community.vectorstores import Chroma
# from langchain_community.llms import HuggingFacePipeline
# from langchain.chains import RetrievalQA
# # Set up logging
# logging.basicConfig(level=logging.INFO)
# # Define global variables
# device = 'cpu'
# persist_directory = "db"
# uploaded_files_dir = "uploaded_files"
# # Streamlit app configuration
# st.set_page_config(page_title="Audit Assistant", layout="wide")
# st.title("Audit Assistant")
# # Load the model
# checkpoint = "MBZUAI/LaMini-T5-738M"
# tokenizer = AutoTokenizer.from_pretrained(checkpoint)
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint)
# # Helper Functions
# def extract_text_from_pdf(file_path):
# """Extract text from a PDF using PyMuPDF (fitz)."""
# try:
# doc = fitz.open(file_path)
# text = ""
# for page_num in range(doc.page_count):
# page = doc.load_page(page_num)
# text += page.get_text("text")
# return text
# except Exception as e:
# logging.error(f"Error reading PDF {file_path}: {e}")
# return None
# def data_ingestion():
# """Function to load PDFs and create embeddings with improved error handling and efficiency."""
# try:
# logging.info("Starting data ingestion")
# if not os.path.exists(uploaded_files_dir):
# os.makedirs(uploaded_files_dir)
# documents = []
# for filename in os.listdir(uploaded_files_dir):
# if filename.endswith(".pdf"):
# file_path = os.path.join(uploaded_files_dir, filename)
# logging.info(f"Processing file: {file_path}")
# try:
# loader = PDFMinerLoader(file_path)
# loaded_docs = loader.load()
# if not loaded_docs:
# logging.warning(f"Skipping file with missing or invalid metadata: {file_path}")
# continue
# for doc in loaded_docs:
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0:
# documents.append(doc)
# else:
# logging.warning(f"Skipping invalid document structure in {file_path}")
# except ValueError as e:
# logging.error(f"Skipping {file_path}: {str(e)}")
# continue
# if not documents:
# logging.error("No valid documents found to process.")
# return
# logging.info(f"Total valid documents: {len(documents)}")
# # Proceed with splitting and embedding documents
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
# texts = text_splitter.split_documents(documents)
# logging.info(f"Total text chunks created: {len(texts)}")
# if not texts:
# logging.error("No valid text chunks to create embeddings.")
# return
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# # Proceed to split and embed the documents
# MAX_BATCH_SIZE = 5461
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE)
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...")
# db = None
# for i in range(total_batches):
# batch_start = i * MAX_BATCH_SIZE
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts))
# text_batch = texts[batch_start:batch_end]
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}")
# if db is None:
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory)
# else:
# db.add_documents(text_batch)
# db.persist()
# logging.info("Data ingestion completed successfully")
# except Exception as e:
# logging.error(f"Error during data ingestion: {str(e)}")
# raise
# def llm_pipeline():
# """Set up the language model pipeline."""
# logging.info("Setting up LLM pipeline")
# pipe = pipeline(
# 'text2text-generation',
# model=base_model,
# tokenizer=tokenizer,
# max_length=256,
# do_sample=True,
# temperature=0.3,
# top_p=0.95,
# device=device
# )
# local_llm = HuggingFacePipeline(pipeline=pipe)
# logging.info("LLM pipeline setup complete")
# return local_llm
# def qa_llm():
# """Set up the question-answering chain."""
# logging.info("Setting up QA model")
# llm = llm_pipeline()
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
# retriever = db.as_retriever() # Set up the retriever for the vector store
# qa = RetrievalQA.from_chain_type(
# llm=llm,
# chain_type="stuff",
# retriever=retriever,
# return_source_documents=True
# )
# logging.info("QA model setup complete")
# return qa
# def process_answer(user_question):
# """Generate an answer to the user’s question."""
# try:
# logging.info("Processing user question")
# qa = qa_llm()
# tailored_prompt = f"""
# You are an expert chatbot designed to assist Chartered Accountants (CAs) in the field of audits.
# Your goal is to provide accurate and comprehensive answers to any questions related to audit policies, procedures,
# and accounting standards based on the provided PDF documents.
# Please respond effectively and refer to the relevant standards and policies whenever applicable.
# User question: {user_question}
# """
# generated_text = qa({"query": tailored_prompt})
# answer = generated_text['result']
# if "not provide" in answer or "no information" in answer:
# return "The document does not provide sufficient information to answer your question."
# logging.info("Answer generated successfully")
# return answer
# except Exception as e:
# logging.error(f"Error during answer generation: {str(e)}")
# return "Error processing the question."
# # Streamlit UI Setup
# st.sidebar.header("File Upload")
# uploaded_files = st.sidebar.file_uploader("Upload your PDF files", type=["pdf"], accept_multiple_files=True)
# if uploaded_files:
# # Save uploaded files
# if not os.path.exists(uploaded_files_dir):
# os.makedirs(uploaded_files_dir)
# for uploaded_file in uploaded_files:
# file_path = os.path.join(uploaded_files_dir, uploaded_file.name)
# with open(file_path, "wb") as f:
# f.write(uploaded_file.getbuffer())
# st.sidebar.success(f"Uploaded {len(uploaded_files)} file(s) successfully!")
# # Run data ingestion when files are uploaded
# data_ingestion()
# # Display UI for Q&A
# st.header("Ask a Question")
# user_question = st.text_input("Enter your question here:")
# if user_question:
# answer = process_answer(user_question)
# st.write(answer)
# else:
# st.sidebar.info("Upload PDF files to get started!")
# # -------this is the second code!!!
# import os
# import logging
# import math
# import streamlit as st
# import fitz # PyMuPDF
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
# # from langchain_community.document_loaders import PDFMinerLoader
# from langchain_community.document_loaders import PyMuPDFLoader
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# from langchain_community.embeddings import SentenceTransformerEmbeddings
# from langchain_community.vectorstores import Chroma
# from langchain_community.llms import HuggingFacePipeline
# from langchain.chains import RetrievalQA
# device = 'cpu'
# persist_directory = "db"
# uploaded_files_dir = "uploaded_files"
# logging.basicConfig(level=logging.INFO)
# # for main Page Setup
# st.set_page_config(page_title="RAG Chatbot", layout="wide")
# st.title("πŸ“š RAG-based PDF Assistant")
# # Load my model
# checkpoint = "MBZUAI/LaMini-T5-738M"
# tokenizer = AutoTokenizer.from_pretrained(checkpoint)
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint)
# # ------------------------------- #
# def extract_outline_from_pdf(path):
# try:
# doc = fitz.open(path)
# outline_text = ""
# for page_num in range(len(doc)):
# page = doc[page_num]
# outline_text += f"### Page {page_num+1}:\n{page.get_text('text')[:500]}\n---\n"
# return outline_text if outline_text else "No preview available."
# except Exception as e:
# return f"Could not preview PDF: {e}"
# def data_ingestion():
# """Load PDFs, validate content, and generate embeddings."""
# try:
# logging.info("Starting data ingestion")
# if not os.path.exists(uploaded_files_dir):
# os.makedirs(uploaded_files_dir)
# documents = []
# for filename in os.listdir(uploaded_files_dir):
# if filename.endswith(".pdf"):
# file_path = os.path.join(uploaded_files_dir, filename)
# logging.info(f"Processing file: {file_path}")
# try:
# loader = PyMuPDFLoader(file_path)
# loaded_docs = loader.load()
# # Check if any content exists in loaded_docs
# if not loaded_docs or len(loaded_docs[0].page_content.strip()) == 0:
# logging.warning(f"No readable text found in {file_path}. Might be a scanned image or unsupported format.")
# continue
# for doc in loaded_docs:
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0:
# documents.append(doc)
# else:
# logging.warning(f"Skipping invalid document structure in {file_path}")
# except Exception as e:
# logging.error(f"Skipping {file_path}: {str(e)}")
# continue
# if not documents:
# logging.error("No valid documents found to process.")
# return
# logging.info(f"Total valid documents: {len(documents)}")
# # Proceed with splitting and embedding documents
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
# texts = text_splitter.split_documents(documents)
# logging.info(f"Total text chunks created: {len(texts)}")
# if not texts:
# logging.error("No valid text chunks to create embeddings.")
# return
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# MAX_BATCH_SIZE = 5461
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE)
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...")
# db = None
# for i in range(total_batches):
# batch_start = i * MAX_BATCH_SIZE
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts))
# text_batch = texts[batch_start:batch_end]
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}")
# if db is None:
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory)
# else:
# db.add_documents(text_batch)
# db.persist()
# logging.info("Data ingestion completed successfully")
# except Exception as e:
# logging.error(f"Error during data ingestion: {str(e)}")
# raise
# def llm_pipeline():
# pipe = pipeline(
# 'text2text-generation',
# model=base_model,
# tokenizer=tokenizer,
# max_length=256,
# do_sample=True,
# temperature=0.3,
# top_p=0.95,
# device=device
# )
# return HuggingFacePipeline(pipeline=pipe)
# def qa_llm():
# llm = llm_pipeline()
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
# retriever = db.as_retriever()
# return RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True)
# def process_answer(user_question):
# """Generate an answer to the user’s question using a general RAG-based prompt."""
# try:
# logging.info("Processing user question")
# qa = qa_llm() # Set up the retrieval-based QA chain
# # Generalized, flexible prompt for any kind of PDF (resume, legal doc, etc.)
# tailored_prompt = f"""
# You are an intelligent and helpful AI assistant that provides answers strictly based on the provided document contents.
# If the question cannot be answered using the documents, say: 'The document does not contain this information.'
# Otherwise, respond clearly and concisely with relevant and factual details from the PDF.
# Question: {user_question}
# """
# generated_text = qa({"query": tailored_prompt})
# answer = generated_text['result']
# # Add a safeguard for hallucinated answers
# if "not provide" in answer.lower() or "no information" in answer.lower() or len(answer.strip()) < 10:
# return "The document does not contain this information."
# logging.info("Answer generated successfully")
# return answer
# except Exception as e:
# logging.error(f"Error during answer generation: {str(e)}")
# return "Sorry, something went wrong while processing your question."
# # ---------------- STREAMLIT UI ---------------- #
# # Sidebar Upload
# st.sidebar.header("πŸ“€ Upload PDF Files")
# uploaded_files = st.sidebar.file_uploader("Select one or more PDF files", type="pdf", accept_multiple_files=True)
# if uploaded_files:
# if not os.path.exists(uploaded_files_dir):
# os.makedirs(uploaded_files_dir)
# for file in uploaded_files:
# path = os.path.join(uploaded_files_dir, file.name)
# with open(path, "wb") as f:
# f.write(file.getbuffer())
# st.sidebar.success(f"{len(uploaded_files)} file(s) uploaded.")
# # Display previews
# st.subheader("πŸ“„ Uploaded PDF Previews")
# for file in uploaded_files:
# with st.expander(file.name):
# st.text(extract_outline_from_pdf(os.path.join(uploaded_files_dir, file.name)))
# # Trigger ingestion
# with st.spinner("πŸ”„ Ingesting uploaded documents..."):
# data_ingestion()
# # Ask a question
# st.header("❓ Ask a Question from Your Documents")
# user_input = st.text_input("Enter your question:")
# if user_input:
# with st.spinner("πŸ’¬ Generating response..."):
# response = process_answer(user_input)
# st.success(response)
# else:
# st.sidebar.info("Upload PDFs to begin your QA journey.")
import os
import streamlit as st
import fitz # PyMuPDF
import logging
import math
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_community.llms import HuggingFacePipeline
from langchain.chains import RetrievalQA
from langchain.schema import Document
from sentence_transformers import SentenceTransformer
from langchain_community.embeddings import HuggingFaceEmbeddings
# --- Configuration ---
st.set_page_config(page_title="πŸ“š RAG PDF Chatbot", layout="wide")
st.title("πŸ“š RAG-based PDF Chatbot")
persist_directory = "db"
device = "cpu"
# --- Logging ---
logging.basicConfig(level=logging.INFO)
# --- Load LLM ---
@st.cache_resource
def load_model():
checkpoint = "MBZUAI/LaMini-T5-738M"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint)
pipe = pipeline('text2text-generation', model=model, tokenizer=tokenizer, max_length=512)
return HuggingFacePipeline(pipeline=pipe)
# --- Extract PDF Text ---
def read_pdf(file):
try:
doc = fitz.open(stream=file.read(), filetype="pdf")
text = ""
for page in doc:
text += page.get_text()
return text.strip()
except Exception as e:
logging.error(f"Failed to extract text: {e}")
return ""
# --- Split Text into Chunks ---
def split_text_into_chunks(text):
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
return splitter.create_documents([text])
# --- Create Vector DB ---
def create_vectorstore(documents):
model = SentenceTransformer("all-MiniLM-L6-v2", device='cpu')
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
db = Chroma.from_documents(documents, embeddings, persist_directory=persist_directory)
db.persist()
return db
# --- Setup QA Chain ---
def setup_qa(db):
retriever = db.as_retriever()
llm = load_model()
return RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=True)
# --- Process Answer ---
# def process_answer(question, full_text):
# # STEP 1: Chunk the PDF text
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
# docs = text_splitter.create_documents([full_text])
# # STEP 2: Create embeddings
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# db = Chroma.from_documents(docs, embeddings)
# # STEP 3: Retrieve relevant chunks using the question
# retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 5})
# relevant_docs = retriever.get_relevant_documents(question)
# # STEP 4: Format the context
# context = "\n\n".join([doc.page_content for doc in relevant_docs])
# # STEP 5: Prompting
# prompt_template = """
# You are a helpful assistant that answers questions based on the context below.
# Context:
# {context}
# Question: {question}
# Answer:
# """.strip()
# prompt = prompt_template.format(context=context, question=question)
# # STEP 6: Load the model and generate response
# llm = HuggingFacePipeline.from_model_id(
# model_id="MBZUAI/LaMini-T5-738M",
# task="text2text-generation",
# model_kwargs={"temperature": 0.3, "max_length": 256},
# )
# return llm.invoke(prompt)
def process_answer(question, full_text):
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain.chains import RetrievalQA
from langchain import HuggingFacePipeline
from transformers import pipeline
import os
import shutil
# Save to temp file and load it as document
with open("temp_text.txt", "w") as f:
f.write(full_text)
loader = TextLoader("temp_text.txt")
docs = loader.load()
# Chunking the docs
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
splits = text_splitter.split_documents(docs)
# Embeddings
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# Clean up old DB if exists
if os.path.exists("chroma_db"):
shutil.rmtree("chroma_db")
db = Chroma.from_documents(splits, embeddings, persist_directory="chroma_db")
retriever = db.as_retriever()
# Model pipeline
pipe = pipeline("text2text-generation", model="MBZUAI/LaMini-T5-738M", max_length=512)
llm = HuggingFacePipeline(pipeline=pipe)
# Retrieval QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
return_source_documents=False
)
# Check if question is about summarization
if "summarize" in question.lower() or "summary" in question.lower() or "tl;dr" in question.lower():
prompt = f"Summarize the following document:\n\n{full_text[:3000]}" # trimming to 3K chars for model
summary = llm(prompt)
return summary
else:
answer = qa_chain.run(question)
return answer
# --- UI Layout ---
with st.sidebar:
st.header("πŸ“„ Upload PDF")
uploaded_file = st.file_uploader("Choose a PDF", type=["pdf"])
# --- Main Interface ---
if uploaded_file:
st.success(f"You uploaded: {uploaded_file.name}")
full_text = read_pdf(uploaded_file)
if full_text:
st.subheader("πŸ“‘ PDF Preview")
with st.expander("View Extracted Text"):
st.write(full_text[:3000] + ("..." if len(full_text) > 3000 else ""))
st.subheader("πŸ’¬ Ask a Question")
user_question = st.text_input("Type your question about the PDF content")
if user_question:
with st.spinner("Thinking..."):
answer = process_answer(user_question, full_text)
st.markdown("### πŸ€– Answer")
st.write(answer)
with st.sidebar:
st.markdown("---")
st.markdown("**πŸ’‘ Suggestions:**")
st.caption("Try: \"Summarize this document\" or \"What is the key idea?\"")
with st.expander("πŸ’‘ Suggestions", expanded=True):
st.markdown("""
- "Summarize this document"
- "Give a quick summary"
- "What are the main points?"
- "Explain this document in short"
""")
else:
st.error("⚠️ No text could be extracted from the PDF. Try another file.")
else:
st.info("Upload a PDF to begin.")