Spaces:
Sleeping
Sleeping
# import os | |
# import logging | |
# import math | |
# import streamlit as st | |
# import fitz # PyMuPDF | |
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
# from langchain_community.document_loaders import PDFMinerLoader | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_community.embeddings import SentenceTransformerEmbeddings | |
# from langchain_community.vectorstores import Chroma | |
# from langchain_community.llms import HuggingFacePipeline | |
# from langchain.chains import RetrievalQA | |
# # Set up logging | |
# logging.basicConfig(level=logging.INFO) | |
# # Define global variables | |
# device = 'cpu' | |
# persist_directory = "db" | |
# uploaded_files_dir = "uploaded_files" | |
# # Streamlit app configuration | |
# st.set_page_config(page_title="Audit Assistant", layout="wide") | |
# st.title("Audit Assistant") | |
# # Load the model | |
# checkpoint = "MBZUAI/LaMini-T5-738M" | |
# tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# # Helper Functions | |
# def extract_text_from_pdf(file_path): | |
# """Extract text from a PDF using PyMuPDF (fitz).""" | |
# try: | |
# doc = fitz.open(file_path) | |
# text = "" | |
# for page_num in range(doc.page_count): | |
# page = doc.load_page(page_num) | |
# text += page.get_text("text") | |
# return text | |
# except Exception as e: | |
# logging.error(f"Error reading PDF {file_path}: {e}") | |
# return None | |
# def data_ingestion(): | |
# """Function to load PDFs and create embeddings with improved error handling and efficiency.""" | |
# try: | |
# logging.info("Starting data ingestion") | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# documents = [] | |
# for filename in os.listdir(uploaded_files_dir): | |
# if filename.endswith(".pdf"): | |
# file_path = os.path.join(uploaded_files_dir, filename) | |
# logging.info(f"Processing file: {file_path}") | |
# try: | |
# loader = PDFMinerLoader(file_path) | |
# loaded_docs = loader.load() | |
# if not loaded_docs: | |
# logging.warning(f"Skipping file with missing or invalid metadata: {file_path}") | |
# continue | |
# for doc in loaded_docs: | |
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: | |
# documents.append(doc) | |
# else: | |
# logging.warning(f"Skipping invalid document structure in {file_path}") | |
# except ValueError as e: | |
# logging.error(f"Skipping {file_path}: {str(e)}") | |
# continue | |
# if not documents: | |
# logging.error("No valid documents found to process.") | |
# return | |
# logging.info(f"Total valid documents: {len(documents)}") | |
# # Proceed with splitting and embedding documents | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
# texts = text_splitter.split_documents(documents) | |
# logging.info(f"Total text chunks created: {len(texts)}") | |
# if not texts: | |
# logging.error("No valid text chunks to create embeddings.") | |
# return | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# # Proceed to split and embed the documents | |
# MAX_BATCH_SIZE = 5461 | |
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) | |
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") | |
# db = None | |
# for i in range(total_batches): | |
# batch_start = i * MAX_BATCH_SIZE | |
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) | |
# text_batch = texts[batch_start:batch_end] | |
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") | |
# if db is None: | |
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) | |
# else: | |
# db.add_documents(text_batch) | |
# db.persist() | |
# logging.info("Data ingestion completed successfully") | |
# except Exception as e: | |
# logging.error(f"Error during data ingestion: {str(e)}") | |
# raise | |
# def llm_pipeline(): | |
# """Set up the language model pipeline.""" | |
# logging.info("Setting up LLM pipeline") | |
# pipe = pipeline( | |
# 'text2text-generation', | |
# model=base_model, | |
# tokenizer=tokenizer, | |
# max_length=256, | |
# do_sample=True, | |
# temperature=0.3, | |
# top_p=0.95, | |
# device=device | |
# ) | |
# local_llm = HuggingFacePipeline(pipeline=pipe) | |
# logging.info("LLM pipeline setup complete") | |
# return local_llm | |
# def qa_llm(): | |
# """Set up the question-answering chain.""" | |
# logging.info("Setting up QA model") | |
# llm = llm_pipeline() | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
# retriever = db.as_retriever() # Set up the retriever for the vector store | |
# qa = RetrievalQA.from_chain_type( | |
# llm=llm, | |
# chain_type="stuff", | |
# retriever=retriever, | |
# return_source_documents=True | |
# ) | |
# logging.info("QA model setup complete") | |
# return qa | |
# def process_answer(user_question): | |
# """Generate an answer to the userβs question.""" | |
# try: | |
# logging.info("Processing user question") | |
# qa = qa_llm() | |
# tailored_prompt = f""" | |
# You are an expert chatbot designed to assist Chartered Accountants (CAs) in the field of audits. | |
# Your goal is to provide accurate and comprehensive answers to any questions related to audit policies, procedures, | |
# and accounting standards based on the provided PDF documents. | |
# Please respond effectively and refer to the relevant standards and policies whenever applicable. | |
# User question: {user_question} | |
# """ | |
# generated_text = qa({"query": tailored_prompt}) | |
# answer = generated_text['result'] | |
# if "not provide" in answer or "no information" in answer: | |
# return "The document does not provide sufficient information to answer your question." | |
# logging.info("Answer generated successfully") | |
# return answer | |
# except Exception as e: | |
# logging.error(f"Error during answer generation: {str(e)}") | |
# return "Error processing the question." | |
# # Streamlit UI Setup | |
# st.sidebar.header("File Upload") | |
# uploaded_files = st.sidebar.file_uploader("Upload your PDF files", type=["pdf"], accept_multiple_files=True) | |
# if uploaded_files: | |
# # Save uploaded files | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# for uploaded_file in uploaded_files: | |
# file_path = os.path.join(uploaded_files_dir, uploaded_file.name) | |
# with open(file_path, "wb") as f: | |
# f.write(uploaded_file.getbuffer()) | |
# st.sidebar.success(f"Uploaded {len(uploaded_files)} file(s) successfully!") | |
# # Run data ingestion when files are uploaded | |
# data_ingestion() | |
# # Display UI for Q&A | |
# st.header("Ask a Question") | |
# user_question = st.text_input("Enter your question here:") | |
# if user_question: | |
# answer = process_answer(user_question) | |
# st.write(answer) | |
# else: | |
# st.sidebar.info("Upload PDF files to get started!") | |
# # -------this is the second code!!! | |
# import os | |
# import logging | |
# import math | |
# import streamlit as st | |
# import fitz # PyMuPDF | |
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
# # from langchain_community.document_loaders import PDFMinerLoader | |
# from langchain_community.document_loaders import PyMuPDFLoader | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_community.embeddings import SentenceTransformerEmbeddings | |
# from langchain_community.vectorstores import Chroma | |
# from langchain_community.llms import HuggingFacePipeline | |
# from langchain.chains import RetrievalQA | |
# device = 'cpu' | |
# persist_directory = "db" | |
# uploaded_files_dir = "uploaded_files" | |
# logging.basicConfig(level=logging.INFO) | |
# # for main Page Setup | |
# st.set_page_config(page_title="RAG Chatbot", layout="wide") | |
# st.title("π RAG-based PDF Assistant") | |
# # Load my model | |
# checkpoint = "MBZUAI/LaMini-T5-738M" | |
# tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# # ------------------------------- # | |
# def extract_outline_from_pdf(path): | |
# try: | |
# doc = fitz.open(path) | |
# outline_text = "" | |
# for page_num in range(len(doc)): | |
# page = doc[page_num] | |
# outline_text += f"### Page {page_num+1}:\n{page.get_text('text')[:500]}\n---\n" | |
# return outline_text if outline_text else "No preview available." | |
# except Exception as e: | |
# return f"Could not preview PDF: {e}" | |
# def data_ingestion(): | |
# """Load PDFs, validate content, and generate embeddings.""" | |
# try: | |
# logging.info("Starting data ingestion") | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# documents = [] | |
# for filename in os.listdir(uploaded_files_dir): | |
# if filename.endswith(".pdf"): | |
# file_path = os.path.join(uploaded_files_dir, filename) | |
# logging.info(f"Processing file: {file_path}") | |
# try: | |
# loader = PyMuPDFLoader(file_path) | |
# loaded_docs = loader.load() | |
# # Check if any content exists in loaded_docs | |
# if not loaded_docs or len(loaded_docs[0].page_content.strip()) == 0: | |
# logging.warning(f"No readable text found in {file_path}. Might be a scanned image or unsupported format.") | |
# continue | |
# for doc in loaded_docs: | |
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: | |
# documents.append(doc) | |
# else: | |
# logging.warning(f"Skipping invalid document structure in {file_path}") | |
# except Exception as e: | |
# logging.error(f"Skipping {file_path}: {str(e)}") | |
# continue | |
# if not documents: | |
# logging.error("No valid documents found to process.") | |
# return | |
# logging.info(f"Total valid documents: {len(documents)}") | |
# # Proceed with splitting and embedding documents | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
# texts = text_splitter.split_documents(documents) | |
# logging.info(f"Total text chunks created: {len(texts)}") | |
# if not texts: | |
# logging.error("No valid text chunks to create embeddings.") | |
# return | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# MAX_BATCH_SIZE = 5461 | |
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) | |
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") | |
# db = None | |
# for i in range(total_batches): | |
# batch_start = i * MAX_BATCH_SIZE | |
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) | |
# text_batch = texts[batch_start:batch_end] | |
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") | |
# if db is None: | |
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) | |
# else: | |
# db.add_documents(text_batch) | |
# db.persist() | |
# logging.info("Data ingestion completed successfully") | |
# except Exception as e: | |
# logging.error(f"Error during data ingestion: {str(e)}") | |
# raise | |
# def llm_pipeline(): | |
# pipe = pipeline( | |
# 'text2text-generation', | |
# model=base_model, | |
# tokenizer=tokenizer, | |
# max_length=256, | |
# do_sample=True, | |
# temperature=0.3, | |
# top_p=0.95, | |
# device=device | |
# ) | |
# return HuggingFacePipeline(pipeline=pipe) | |
# def qa_llm(): | |
# llm = llm_pipeline() | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
# retriever = db.as_retriever() | |
# return RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True) | |
# def process_answer(user_question): | |
# """Generate an answer to the userβs question using a general RAG-based prompt.""" | |
# try: | |
# logging.info("Processing user question") | |
# qa = qa_llm() # Set up the retrieval-based QA chain | |
# # Generalized, flexible prompt for any kind of PDF (resume, legal doc, etc.) | |
# tailored_prompt = f""" | |
# You are an intelligent and helpful AI assistant that provides answers strictly based on the provided document contents. | |
# If the question cannot be answered using the documents, say: 'The document does not contain this information.' | |
# Otherwise, respond clearly and concisely with relevant and factual details from the PDF. | |
# Question: {user_question} | |
# """ | |
# generated_text = qa({"query": tailored_prompt}) | |
# answer = generated_text['result'] | |
# # Add a safeguard for hallucinated answers | |
# if "not provide" in answer.lower() or "no information" in answer.lower() or len(answer.strip()) < 10: | |
# return "The document does not contain this information." | |
# logging.info("Answer generated successfully") | |
# return answer | |
# except Exception as e: | |
# logging.error(f"Error during answer generation: {str(e)}") | |
# return "Sorry, something went wrong while processing your question." | |
# # ---------------- STREAMLIT UI ---------------- # | |
# # Sidebar Upload | |
# st.sidebar.header("π€ Upload PDF Files") | |
# uploaded_files = st.sidebar.file_uploader("Select one or more PDF files", type="pdf", accept_multiple_files=True) | |
# if uploaded_files: | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# for file in uploaded_files: | |
# path = os.path.join(uploaded_files_dir, file.name) | |
# with open(path, "wb") as f: | |
# f.write(file.getbuffer()) | |
# st.sidebar.success(f"{len(uploaded_files)} file(s) uploaded.") | |
# # Display previews | |
# st.subheader("π Uploaded PDF Previews") | |
# for file in uploaded_files: | |
# with st.expander(file.name): | |
# st.text(extract_outline_from_pdf(os.path.join(uploaded_files_dir, file.name))) | |
# # Trigger ingestion | |
# with st.spinner("π Ingesting uploaded documents..."): | |
# data_ingestion() | |
# # Ask a question | |
# st.header("β Ask a Question from Your Documents") | |
# user_input = st.text_input("Enter your question:") | |
# if user_input: | |
# with st.spinner("π¬ Generating response..."): | |
# response = process_answer(user_input) | |
# st.success(response) | |
# else: | |
# st.sidebar.info("Upload PDFs to begin your QA journey.") | |
import os | |
import streamlit as st | |
import fitz # PyMuPDF | |
import logging | |
import math | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings import SentenceTransformerEmbeddings | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.chains import RetrievalQA | |
from langchain.schema import Document | |
from sentence_transformers import SentenceTransformer | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
# --- Configuration --- | |
st.set_page_config(page_title="π RAG PDF Chatbot", layout="wide") | |
st.title("π RAG-based PDF Chatbot") | |
persist_directory = "db" | |
device = "cpu" | |
# --- Logging --- | |
logging.basicConfig(level=logging.INFO) | |
# --- Load LLM --- | |
def load_model(): | |
checkpoint = "MBZUAI/LaMini-T5-738M" | |
tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
pipe = pipeline('text2text-generation', model=model, tokenizer=tokenizer, max_length=512) | |
return HuggingFacePipeline(pipeline=pipe) | |
# --- Extract PDF Text --- | |
def read_pdf(file): | |
try: | |
doc = fitz.open(stream=file.read(), filetype="pdf") | |
text = "" | |
for page in doc: | |
text += page.get_text() | |
return text.strip() | |
except Exception as e: | |
logging.error(f"Failed to extract text: {e}") | |
return "" | |
# --- Split Text into Chunks --- | |
def split_text_into_chunks(text): | |
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) | |
return splitter.create_documents([text]) | |
# --- Create Vector DB --- | |
def create_vectorstore(documents): | |
model = SentenceTransformer("all-MiniLM-L6-v2", device='cpu') | |
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
db = Chroma.from_documents(documents, embeddings, persist_directory=persist_directory) | |
db.persist() | |
return db | |
# --- Setup QA Chain --- | |
def setup_qa(db): | |
retriever = db.as_retriever() | |
llm = load_model() | |
return RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=True) | |
# --- Process Answer --- | |
# def process_answer(question, full_text): | |
# # STEP 1: Chunk the PDF text | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
# docs = text_splitter.create_documents([full_text]) | |
# # STEP 2: Create embeddings | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# db = Chroma.from_documents(docs, embeddings) | |
# # STEP 3: Retrieve relevant chunks using the question | |
# retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 5}) | |
# relevant_docs = retriever.get_relevant_documents(question) | |
# # STEP 4: Format the context | |
# context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
# # STEP 5: Prompting | |
# prompt_template = """ | |
# You are a helpful assistant that answers questions based on the context below. | |
# Context: | |
# {context} | |
# Question: {question} | |
# Answer: | |
# """.strip() | |
# prompt = prompt_template.format(context=context, question=question) | |
# # STEP 6: Load the model and generate response | |
# llm = HuggingFacePipeline.from_model_id( | |
# model_id="MBZUAI/LaMini-T5-738M", | |
# task="text2text-generation", | |
# model_kwargs={"temperature": 0.3, "max_length": 256}, | |
# ) | |
# return llm.invoke(prompt) | |
def process_answer(question, full_text): | |
from langchain_community.document_loaders import TextLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain_community.embeddings import SentenceTransformerEmbeddings | |
from langchain.chains import RetrievalQA | |
from langchain import HuggingFacePipeline | |
from transformers import pipeline | |
import os | |
import shutil | |
# Save to temp file and load it as document | |
with open("temp_text.txt", "w") as f: | |
f.write(full_text) | |
loader = TextLoader("temp_text.txt") | |
docs = loader.load() | |
# Chunking the docs | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
splits = text_splitter.split_documents(docs) | |
# Embeddings | |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# Clean up old DB if exists | |
if os.path.exists("chroma_db"): | |
shutil.rmtree("chroma_db") | |
db = Chroma.from_documents(splits, embeddings, persist_directory="chroma_db") | |
retriever = db.as_retriever() | |
# Model pipeline | |
pipe = pipeline("text2text-generation", model="MBZUAI/LaMini-T5-738M", max_length=512) | |
llm = HuggingFacePipeline(pipeline=pipe) | |
# Retrieval QA chain | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=llm, | |
retriever=retriever, | |
return_source_documents=False | |
) | |
# Check if question is about summarization | |
if "summarize" in question.lower() or "summary" in question.lower() or "tl;dr" in question.lower(): | |
prompt = f"Summarize the following document:\n\n{full_text[:3000]}" # trimming to 3K chars for model | |
summary = llm(prompt) | |
return summary | |
else: | |
answer = qa_chain.run(question) | |
return answer | |
# --- UI Layout --- | |
with st.sidebar: | |
st.header("π Upload PDF") | |
uploaded_file = st.file_uploader("Choose a PDF", type=["pdf"]) | |
# --- Main Interface --- | |
if uploaded_file: | |
st.success(f"You uploaded: {uploaded_file.name}") | |
full_text = read_pdf(uploaded_file) | |
if full_text: | |
st.subheader("π PDF Preview") | |
with st.expander("View Extracted Text"): | |
st.write(full_text[:3000] + ("..." if len(full_text) > 3000 else "")) | |
st.subheader("π¬ Ask a Question") | |
user_question = st.text_input("Type your question about the PDF content") | |
if user_question: | |
with st.spinner("Thinking..."): | |
answer = process_answer(user_question, full_text) | |
st.markdown("### π€ Answer") | |
st.write(answer) | |
with st.sidebar: | |
st.markdown("---") | |
st.markdown("**π‘ Suggestions:**") | |
st.caption("Try: \"Summarize this document\" or \"What is the key idea?\"") | |
with st.expander("π‘ Suggestions", expanded=True): | |
st.markdown(""" | |
- "Summarize this document" | |
- "Give a quick summary" | |
- "What are the main points?" | |
- "Explain this document in short" | |
""") | |
else: | |
st.error("β οΈ No text could be extracted from the PDF. Try another file.") | |
else: | |
st.info("Upload a PDF to begin.") | |