Spaces:
Sleeping
Sleeping
# import os | |
# import logging | |
# import math | |
# import streamlit as st | |
# import fitz # PyMuPDF | |
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
# from langchain_community.document_loaders import PDFMinerLoader | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_community.embeddings import SentenceTransformerEmbeddings | |
# from langchain_community.vectorstores import Chroma | |
# from langchain_community.llms import HuggingFacePipeline | |
# from langchain.chains import RetrievalQA | |
# # Set up logging | |
# logging.basicConfig(level=logging.INFO) | |
# # Define global variables | |
# device = 'cpu' | |
# persist_directory = "db" | |
# uploaded_files_dir = "uploaded_files" | |
# # Streamlit app configuration | |
# st.set_page_config(page_title="Audit Assistant", layout="wide") | |
# st.title("Audit Assistant") | |
# # Load the model | |
# checkpoint = "MBZUAI/LaMini-T5-738M" | |
# tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# # Helper Functions | |
# def extract_text_from_pdf(file_path): | |
# """Extract text from a PDF using PyMuPDF (fitz).""" | |
# try: | |
# doc = fitz.open(file_path) | |
# text = "" | |
# for page_num in range(doc.page_count): | |
# page = doc.load_page(page_num) | |
# text += page.get_text("text") | |
# return text | |
# except Exception as e: | |
# logging.error(f"Error reading PDF {file_path}: {e}") | |
# return None | |
# def data_ingestion(): | |
# """Function to load PDFs and create embeddings with improved error handling and efficiency.""" | |
# try: | |
# logging.info("Starting data ingestion") | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# documents = [] | |
# for filename in os.listdir(uploaded_files_dir): | |
# if filename.endswith(".pdf"): | |
# file_path = os.path.join(uploaded_files_dir, filename) | |
# logging.info(f"Processing file: {file_path}") | |
# try: | |
# loader = PDFMinerLoader(file_path) | |
# loaded_docs = loader.load() | |
# if not loaded_docs: | |
# logging.warning(f"Skipping file with missing or invalid metadata: {file_path}") | |
# continue | |
# for doc in loaded_docs: | |
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: | |
# documents.append(doc) | |
# else: | |
# logging.warning(f"Skipping invalid document structure in {file_path}") | |
# except ValueError as e: | |
# logging.error(f"Skipping {file_path}: {str(e)}") | |
# continue | |
# if not documents: | |
# logging.error("No valid documents found to process.") | |
# return | |
# logging.info(f"Total valid documents: {len(documents)}") | |
# # Proceed with splitting and embedding documents | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
# texts = text_splitter.split_documents(documents) | |
# logging.info(f"Total text chunks created: {len(texts)}") | |
# if not texts: | |
# logging.error("No valid text chunks to create embeddings.") | |
# return | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# # Proceed to split and embed the documents | |
# MAX_BATCH_SIZE = 5461 | |
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) | |
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") | |
# db = None | |
# for i in range(total_batches): | |
# batch_start = i * MAX_BATCH_SIZE | |
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) | |
# text_batch = texts[batch_start:batch_end] | |
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") | |
# if db is None: | |
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) | |
# else: | |
# db.add_documents(text_batch) | |
# db.persist() | |
# logging.info("Data ingestion completed successfully") | |
# except Exception as e: | |
# logging.error(f"Error during data ingestion: {str(e)}") | |
# raise | |
# def llm_pipeline(): | |
# """Set up the language model pipeline.""" | |
# logging.info("Setting up LLM pipeline") | |
# pipe = pipeline( | |
# 'text2text-generation', | |
# model=base_model, | |
# tokenizer=tokenizer, | |
# max_length=256, | |
# do_sample=True, | |
# temperature=0.3, | |
# top_p=0.95, | |
# device=device | |
# ) | |
# local_llm = HuggingFacePipeline(pipeline=pipe) | |
# logging.info("LLM pipeline setup complete") | |
# return local_llm | |
# def qa_llm(): | |
# """Set up the question-answering chain.""" | |
# logging.info("Setting up QA model") | |
# llm = llm_pipeline() | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
# retriever = db.as_retriever() # Set up the retriever for the vector store | |
# qa = RetrievalQA.from_chain_type( | |
# llm=llm, | |
# chain_type="stuff", | |
# retriever=retriever, | |
# return_source_documents=True | |
# ) | |
# logging.info("QA model setup complete") | |
# return qa | |
# def process_answer(user_question): | |
# """Generate an answer to the user’s question.""" | |
# try: | |
# logging.info("Processing user question") | |
# qa = qa_llm() | |
# tailored_prompt = f""" | |
# You are an expert chatbot designed to assist Chartered Accountants (CAs) in the field of audits. | |
# Your goal is to provide accurate and comprehensive answers to any questions related to audit policies, procedures, | |
# and accounting standards based on the provided PDF documents. | |
# Please respond effectively and refer to the relevant standards and policies whenever applicable. | |
# User question: {user_question} | |
# """ | |
# generated_text = qa({"query": tailored_prompt}) | |
# answer = generated_text['result'] | |
# if "not provide" in answer or "no information" in answer: | |
# return "The document does not provide sufficient information to answer your question." | |
# logging.info("Answer generated successfully") | |
# return answer | |
# except Exception as e: | |
# logging.error(f"Error during answer generation: {str(e)}") | |
# return "Error processing the question." | |
# # Streamlit UI Setup | |
# st.sidebar.header("File Upload") | |
# uploaded_files = st.sidebar.file_uploader("Upload your PDF files", type=["pdf"], accept_multiple_files=True) | |
# if uploaded_files: | |
# # Save uploaded files | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# for uploaded_file in uploaded_files: | |
# file_path = os.path.join(uploaded_files_dir, uploaded_file.name) | |
# with open(file_path, "wb") as f: | |
# f.write(uploaded_file.getbuffer()) | |
# st.sidebar.success(f"Uploaded {len(uploaded_files)} file(s) successfully!") | |
# # Run data ingestion when files are uploaded | |
# data_ingestion() | |
# # Display UI for Q&A | |
# st.header("Ask a Question") | |
# user_question = st.text_input("Enter your question here:") | |
# if user_question: | |
# answer = process_answer(user_question) | |
# st.write(answer) | |
# else: | |
# st.sidebar.info("Upload PDF files to get started!") | |
# # -------this is the second code!!! | |
# import os | |
# import logging | |
# import math | |
# import streamlit as st | |
# import fitz # PyMuPDF | |
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
# # from langchain_community.document_loaders import PDFMinerLoader | |
# from langchain_community.document_loaders import PyMuPDFLoader | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_community.embeddings import SentenceTransformerEmbeddings | |
# from langchain_community.vectorstores import Chroma | |
# from langchain_community.llms import HuggingFacePipeline | |
# from langchain.chains import RetrievalQA | |
# device = 'cpu' | |
# persist_directory = "db" | |
# uploaded_files_dir = "uploaded_files" | |
# logging.basicConfig(level=logging.INFO) | |
# # for main Page Setup | |
# st.set_page_config(page_title="RAG Chatbot", layout="wide") | |
# st.title("📚 RAG-based PDF Assistant") | |
# # Load my model | |
# checkpoint = "MBZUAI/LaMini-T5-738M" | |
# tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# # ------------------------------- # | |
# def extract_outline_from_pdf(path): | |
# try: | |
# doc = fitz.open(path) | |
# outline_text = "" | |
# for page_num in range(len(doc)): | |
# page = doc[page_num] | |
# outline_text += f"### Page {page_num+1}:\n{page.get_text('text')[:500]}\n---\n" | |
# return outline_text if outline_text else "No preview available." | |
# except Exception as e: | |
# return f"Could not preview PDF: {e}" | |
# def data_ingestion(): | |
# """Load PDFs, validate content, and generate embeddings.""" | |
# try: | |
# logging.info("Starting data ingestion") | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# documents = [] | |
# for filename in os.listdir(uploaded_files_dir): | |
# if filename.endswith(".pdf"): | |
# file_path = os.path.join(uploaded_files_dir, filename) | |
# logging.info(f"Processing file: {file_path}") | |
# try: | |
# loader = PyMuPDFLoader(file_path) | |
# loaded_docs = loader.load() | |
# # Check if any content exists in loaded_docs | |
# if not loaded_docs or len(loaded_docs[0].page_content.strip()) == 0: | |
# logging.warning(f"No readable text found in {file_path}. Might be a scanned image or unsupported format.") | |
# continue | |
# for doc in loaded_docs: | |
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: | |
# documents.append(doc) | |
# else: | |
# logging.warning(f"Skipping invalid document structure in {file_path}") | |
# except Exception as e: | |
# logging.error(f"Skipping {file_path}: {str(e)}") | |
# continue | |
# if not documents: | |
# logging.error("No valid documents found to process.") | |
# return | |
# logging.info(f"Total valid documents: {len(documents)}") | |
# # Proceed with splitting and embedding documents | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
# texts = text_splitter.split_documents(documents) | |
# logging.info(f"Total text chunks created: {len(texts)}") | |
# if not texts: | |
# logging.error("No valid text chunks to create embeddings.") | |
# return | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# MAX_BATCH_SIZE = 5461 | |
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) | |
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") | |
# db = None | |
# for i in range(total_batches): | |
# batch_start = i * MAX_BATCH_SIZE | |
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) | |
# text_batch = texts[batch_start:batch_end] | |
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") | |
# if db is None: | |
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) | |
# else: | |
# db.add_documents(text_batch) | |
# db.persist() | |
# logging.info("Data ingestion completed successfully") | |
# except Exception as e: | |
# logging.error(f"Error during data ingestion: {str(e)}") | |
# raise | |
# def llm_pipeline(): | |
# pipe = pipeline( | |
# 'text2text-generation', | |
# model=base_model, | |
# tokenizer=tokenizer, | |
# max_length=256, | |
# do_sample=True, | |
# temperature=0.3, | |
# top_p=0.95, | |
# device=device | |
# ) | |
# return HuggingFacePipeline(pipeline=pipe) | |
# def qa_llm(): | |
# llm = llm_pipeline() | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
# retriever = db.as_retriever() | |
# return RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True) | |
# def process_answer(user_question): | |
# """Generate an answer to the user’s question using a general RAG-based prompt.""" | |
# try: | |
# logging.info("Processing user question") | |
# qa = qa_llm() # Set up the retrieval-based QA chain | |
# # Generalized, flexible prompt for any kind of PDF (resume, legal doc, etc.) | |
# tailored_prompt = f""" | |
# You are an intelligent and helpful AI assistant that provides answers strictly based on the provided document contents. | |
# If the question cannot be answered using the documents, say: 'The document does not contain this information.' | |
# Otherwise, respond clearly and concisely with relevant and factual details from the PDF. | |
# Question: {user_question} | |
# """ | |
# generated_text = qa({"query": tailored_prompt}) | |
# answer = generated_text['result'] | |
# # Add a safeguard for hallucinated answers | |
# if "not provide" in answer.lower() or "no information" in answer.lower() or len(answer.strip()) < 10: | |
# return "The document does not contain this information." | |
# logging.info("Answer generated successfully") | |
# return answer | |
# except Exception as e: | |
# logging.error(f"Error during answer generation: {str(e)}") | |
# return "Sorry, something went wrong while processing your question." | |
# # ---------------- STREAMLIT UI ---------------- # | |
# # Sidebar Upload | |
# st.sidebar.header("📤 Upload PDF Files") | |
# uploaded_files = st.sidebar.file_uploader("Select one or more PDF files", type="pdf", accept_multiple_files=True) | |
# if uploaded_files: | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# for file in uploaded_files: | |
# path = os.path.join(uploaded_files_dir, file.name) | |
# with open(path, "wb") as f: | |
# f.write(file.getbuffer()) | |
# st.sidebar.success(f"{len(uploaded_files)} file(s) uploaded.") | |
# # Display previews | |
# st.subheader("📄 Uploaded PDF Previews") | |
# for file in uploaded_files: | |
# with st.expander(file.name): | |
# st.text(extract_outline_from_pdf(os.path.join(uploaded_files_dir, file.name))) | |
# # Trigger ingestion | |
# with st.spinner("🔄 Ingesting uploaded documents..."): | |
# data_ingestion() | |
# # Ask a question | |
# st.header("❓ Ask a Question from Your Documents") | |
# user_input = st.text_input("Enter your question:") | |
# if user_input: | |
# with st.spinner("💬 Generating response..."): | |
# response = process_answer(user_input) | |
# st.success(response) | |
# else: | |
# st.sidebar.info("Upload PDFs to begin your QA journey.") | |
import os | |
import logging | |
import math | |
import streamlit as st | |
import fitz # PyMuPDF | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
from langchain_community.document_loaders import PDFMinerLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import SentenceTransformerEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.chains import RetrievalQA | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
# Define global variables | |
device = 'cpu' | |
persist_directory = "db" | |
uploaded_files_dir = "uploaded_files" | |
# Streamlit app configuration | |
st.set_page_config(page_title="RAG-based Chatbot", layout="wide") | |
st.title("RAG-based Chatbot") | |
# Load the model | |
checkpoint = "MBZUAI/LaMini-T5-738M" | |
tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# Helper Functions | |
def extract_text_from_pdf(file_path): | |
"""Extract full text from a PDF using PyMuPDF (fitz).""" | |
try: | |
doc = fitz.open(file_path) | |
text = "" | |
for page_num in range(doc.page_count): | |
page = doc.load_page(page_num) | |
text += page.get_text("text") | |
return text | |
except Exception as e: | |
logging.error(f"Error reading PDF {file_path}: {e}") | |
return None | |
def data_ingestion(): | |
"""Function to load PDFs and create embeddings with improved error handling and efficiency.""" | |
try: | |
logging.info("Starting data ingestion") | |
if not os.path.exists(uploaded_files_dir): | |
os.makedirs(uploaded_files_dir) | |
documents = [] | |
for filename in os.listdir(uploaded_files_dir): | |
if filename.endswith(".pdf"): | |
file_path = os.path.join(uploaded_files_dir, filename) | |
logging.info(f"Processing file: {file_path}") | |
try: | |
loader = PDFMinerLoader(file_path) | |
loaded_docs = loader.load() | |
if not loaded_docs: | |
logging.warning(f"Skipping file with missing or invalid metadata: {file_path}") | |
continue | |
for doc in loaded_docs: | |
if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: | |
documents.append(doc) | |
else: | |
logging.warning(f"Skipping invalid document structure in {file_path}") | |
except ValueError as e: | |
logging.error(f"Skipping {file_path}: {str(e)}") | |
continue | |
if not documents: | |
logging.error("No valid documents found to process.") | |
return | |
logging.info(f"Total valid documents: {len(documents)}") | |
# Proceed with splitting and embedding documents | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
texts = text_splitter.split_documents(documents) | |
logging.info(f"Total text chunks created: {len(texts)}") | |
if not texts: | |
logging.error("No valid text chunks to create embeddings.") | |
return | |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# Proceed to split and embed the documents | |
MAX_BATCH_SIZE = 5461 | |
total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) | |
logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") | |
db = None | |
for i in range(total_batches): | |
batch_start = i * MAX_BATCH_SIZE | |
batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) | |
text_batch = texts[batch_start:batch_end] | |
logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") | |
if db is None: | |
db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) | |
else: | |
db.add_documents(text_batch) | |
db.persist() | |
logging.info("Data ingestion completed successfully") | |
except Exception as e: | |
logging.error(f"Error during data ingestion: {str(e)}") | |
raise | |
def llm_pipeline(): | |
"""Set up the language model pipeline.""" | |
logging.info("Setting up LLM pipeline") | |
pipe = pipeline( | |
'text2text-generation', | |
model=base_model, | |
tokenizer=tokenizer, | |
max_length=256, | |
do_sample=True, | |
temperature=0.3, | |
top_p=0.95, | |
device=device | |
) | |
local_llm = HuggingFacePipeline(pipeline=pipe) | |
logging.info("LLM pipeline setup complete") | |
return local_llm | |
def qa_llm(): | |
"""Set up the question-answering chain.""" | |
logging.info("Setting up QA model") | |
llm = llm_pipeline() | |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
retriever = db.as_retriever() # Set up the retriever for the vector store | |
qa = RetrievalQA.from_chain_type( | |
llm=llm, | |
chain_type="stuff", | |
retriever=retriever, | |
return_source_documents=True | |
) | |
logging.info("QA model setup complete") | |
return qa | |
def process_answer(user_question, full_text): | |
"""Generate an answer to the user’s question or summarize the PDF content.""" | |
try: | |
logging.info("Processing user question") | |
# Check if the question is related to summarization | |
if "summarize" in user_question.lower() or "summary" in user_question.lower(): | |
tailored_prompt = f""" | |
Please provide a summary of the following content extracted from the PDF: | |
{full_text} | |
""" | |
else: | |
# Regular Q&A with context from the uploaded PDF | |
tailored_prompt = f""" | |
You are an expert chatbot designed to assist with any topic, providing accurate and detailed answers based on the provided PDFs. | |
Your goal is to deliver the most relevant information and resources based on the question asked. | |
User question: {user_question} | |
Content from the uploaded document: {full_text} | |
""" | |
# Pass the tailored prompt to the question-answering chain (QA) system | |
qa = qa_llm() # Call your QA LLM setup | |
generated_text = qa({"query": tailored_prompt}) | |
answer = generated_text['result'] | |
# If the answer contains certain fallback phrases, return a default message | |
if "not provide" in answer or "no information" in answer: | |
return "The document does not provide sufficient information to answer your question." | |
logging.info("Answer generated successfully") | |
return answer | |
except Exception as e: | |
logging.error(f"Error during answer generation: {str(e)}") | |
return "Error processing the question." | |
# Streamlit UI Setup | |
st.sidebar.header("File Upload") | |
uploaded_files = st.sidebar.file_uploader("Upload your PDF files", type=["pdf"], accept_multiple_files=True) | |
if uploaded_files: | |
# Save uploaded files and extract their text | |
if not os.path.exists(uploaded_files_dir): | |
os.makedirs(uploaded_files_dir) | |
for uploaded_file in uploaded_files: | |
file_path = os.path.join(uploaded_files_dir, uploaded_file.name) | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
st.sidebar.success(f"Uploaded {len(uploaded_files)} file(s) successfully!") | |
# Show the uploaded files' names | |
st.subheader("Uploaded PDF(s):") | |
for uploaded_file in uploaded_files: | |
st.write(uploaded_file.name) | |
# Display PDF preview link if possible | |
with open(file_path, "rb") as f: | |
file_bytes = f.read() | |
st.download_button( | |
label="Download PDF", | |
data=file_bytes, | |
file_name=uploaded_file.name, | |
mime="application/pdf", | |
) | |
# Extract and display the full text from the PDF | |
st.subheader("Full Text from the PDF:") | |
full_text = extract_text_from_pdf(file_path) | |
if full_text: | |
st.text_area("PDF Text", full_text, height=300) | |
else: | |
st.warning("Failed to extract text from this PDF.") | |
# # Generate summary option | |
# if st.button("Generate Summary of Document"): | |
# st.write("Summary: [Provide the generated summary here]") | |
# Run data ingestion when files are uploaded | |
data_ingestion() | |
# Display UI for Q&A | |
st.header("Ask a Question") | |
user_question = st.text_input("Enter your question here:") | |
if user_question: | |
answer = process_answer(user_question) | |
st.write(answer) | |
else: | |
st.sidebar.info("Upload PDF files to get started!") | |