Spaces:
Sleeping
Sleeping
# import os | |
# import logging | |
# import math | |
# import streamlit as st | |
# import fitz # PyMuPDF | |
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
# from langchain_community.document_loaders import PDFMinerLoader | |
# from langchain.text_splitter import RecursiveCharacterTextSplitter | |
# from langchain_community.embeddings import SentenceTransformerEmbeddings | |
# from langchain_community.vectorstores import Chroma | |
# from langchain_community.llms import HuggingFacePipeline | |
# from langchain.chains import RetrievalQA | |
# # Set up logging | |
# logging.basicConfig(level=logging.INFO) | |
# # Define global variables | |
# device = 'cpu' | |
# persist_directory = "db" | |
# uploaded_files_dir = "uploaded_files" | |
# # Streamlit app configuration | |
# st.set_page_config(page_title="Audit Assistant", layout="wide") | |
# st.title("Audit Assistant") | |
# # Load the model | |
# checkpoint = "MBZUAI/LaMini-T5-738M" | |
# tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
# base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# # Helper Functions | |
# def extract_text_from_pdf(file_path): | |
# """Extract text from a PDF using PyMuPDF (fitz).""" | |
# try: | |
# doc = fitz.open(file_path) | |
# text = "" | |
# for page_num in range(doc.page_count): | |
# page = doc.load_page(page_num) | |
# text += page.get_text("text") | |
# return text | |
# except Exception as e: | |
# logging.error(f"Error reading PDF {file_path}: {e}") | |
# return None | |
# def data_ingestion(): | |
# """Function to load PDFs and create embeddings with improved error handling and efficiency.""" | |
# try: | |
# logging.info("Starting data ingestion") | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# documents = [] | |
# for filename in os.listdir(uploaded_files_dir): | |
# if filename.endswith(".pdf"): | |
# file_path = os.path.join(uploaded_files_dir, filename) | |
# logging.info(f"Processing file: {file_path}") | |
# try: | |
# loader = PDFMinerLoader(file_path) | |
# loaded_docs = loader.load() | |
# if not loaded_docs: | |
# logging.warning(f"Skipping file with missing or invalid metadata: {file_path}") | |
# continue | |
# for doc in loaded_docs: | |
# if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: | |
# documents.append(doc) | |
# else: | |
# logging.warning(f"Skipping invalid document structure in {file_path}") | |
# except ValueError as e: | |
# logging.error(f"Skipping {file_path}: {str(e)}") | |
# continue | |
# if not documents: | |
# logging.error("No valid documents found to process.") | |
# return | |
# logging.info(f"Total valid documents: {len(documents)}") | |
# # Proceed with splitting and embedding documents | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
# texts = text_splitter.split_documents(documents) | |
# logging.info(f"Total text chunks created: {len(texts)}") | |
# if not texts: | |
# logging.error("No valid text chunks to create embeddings.") | |
# return | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# # Proceed to split and embed the documents | |
# MAX_BATCH_SIZE = 5461 | |
# total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) | |
# logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") | |
# db = None | |
# for i in range(total_batches): | |
# batch_start = i * MAX_BATCH_SIZE | |
# batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) | |
# text_batch = texts[batch_start:batch_end] | |
# logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") | |
# if db is None: | |
# db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) | |
# else: | |
# db.add_documents(text_batch) | |
# db.persist() | |
# logging.info("Data ingestion completed successfully") | |
# except Exception as e: | |
# logging.error(f"Error during data ingestion: {str(e)}") | |
# raise | |
# def llm_pipeline(): | |
# """Set up the language model pipeline.""" | |
# logging.info("Setting up LLM pipeline") | |
# pipe = pipeline( | |
# 'text2text-generation', | |
# model=base_model, | |
# tokenizer=tokenizer, | |
# max_length=256, | |
# do_sample=True, | |
# temperature=0.3, | |
# top_p=0.95, | |
# device=device | |
# ) | |
# local_llm = HuggingFacePipeline(pipeline=pipe) | |
# logging.info("LLM pipeline setup complete") | |
# return local_llm | |
# def qa_llm(): | |
# """Set up the question-answering chain.""" | |
# logging.info("Setting up QA model") | |
# llm = llm_pipeline() | |
# embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
# retriever = db.as_retriever() # Set up the retriever for the vector store | |
# qa = RetrievalQA.from_chain_type( | |
# llm=llm, | |
# chain_type="stuff", | |
# retriever=retriever, | |
# return_source_documents=True | |
# ) | |
# logging.info("QA model setup complete") | |
# return qa | |
# def process_answer(user_question): | |
# """Generate an answer to the userβs question.""" | |
# try: | |
# logging.info("Processing user question") | |
# qa = qa_llm() | |
# tailored_prompt = f""" | |
# You are an expert chatbot designed to assist Chartered Accountants (CAs) in the field of audits. | |
# Your goal is to provide accurate and comprehensive answers to any questions related to audit policies, procedures, | |
# and accounting standards based on the provided PDF documents. | |
# Please respond effectively and refer to the relevant standards and policies whenever applicable. | |
# User question: {user_question} | |
# """ | |
# generated_text = qa({"query": tailored_prompt}) | |
# answer = generated_text['result'] | |
# if "not provide" in answer or "no information" in answer: | |
# return "The document does not provide sufficient information to answer your question." | |
# logging.info("Answer generated successfully") | |
# return answer | |
# except Exception as e: | |
# logging.error(f"Error during answer generation: {str(e)}") | |
# return "Error processing the question." | |
# # Streamlit UI Setup | |
# st.sidebar.header("File Upload") | |
# uploaded_files = st.sidebar.file_uploader("Upload your PDF files", type=["pdf"], accept_multiple_files=True) | |
# if uploaded_files: | |
# # Save uploaded files | |
# if not os.path.exists(uploaded_files_dir): | |
# os.makedirs(uploaded_files_dir) | |
# for uploaded_file in uploaded_files: | |
# file_path = os.path.join(uploaded_files_dir, uploaded_file.name) | |
# with open(file_path, "wb") as f: | |
# f.write(uploaded_file.getbuffer()) | |
# st.sidebar.success(f"Uploaded {len(uploaded_files)} file(s) successfully!") | |
# # Run data ingestion when files are uploaded | |
# data_ingestion() | |
# # Display UI for Q&A | |
# st.header("Ask a Question") | |
# user_question = st.text_input("Enter your question here:") | |
# if user_question: | |
# answer = process_answer(user_question) | |
# st.write(answer) | |
# else: | |
# st.sidebar.info("Upload PDF files to get started!") | |
# ------- | |
import os | |
import logging | |
import math | |
import streamlit as st | |
import fitz # PyMuPDF | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
from langchain_community.document_loaders import PDFMinerLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import SentenceTransformerEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.chains import RetrievalQA | |
# Configuration | |
device = 'cpu' | |
persist_directory = "db" | |
uploaded_files_dir = "uploaded_files" | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
# Streamlit Page Setup | |
st.set_page_config(page_title="RAG Chatbot", layout="wide") | |
st.title("π RAG-based PDF Assistant") | |
# Load LLM model | |
checkpoint = "MBZUAI/LaMini-T5-738M" | |
tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
# ---------------- HELPER FUNCTIONS ---------------- # | |
def extract_outline_from_pdf(path): | |
try: | |
doc = fitz.open(path) | |
outline_text = "" | |
for page_num in range(len(doc)): | |
page = doc[page_num] | |
outline_text += f"### Page {page_num+1}:\n{page.get_text('text')[:500]}\n---\n" | |
return outline_text if outline_text else "No preview available." | |
except Exception as e: | |
return f"Could not preview PDF: {e}" | |
def data_ingestion(): | |
try: | |
logging.info("Starting data ingestion") | |
if not os.path.exists(uploaded_files_dir): | |
os.makedirs(uploaded_files_dir) | |
documents = [] | |
for filename in os.listdir(uploaded_files_dir): | |
if filename.endswith(".pdf"): | |
path = os.path.join(uploaded_files_dir, filename) | |
logging.info(f"Loading: {filename}") | |
try: | |
loader = PDFMinerLoader(path) | |
loaded_docs = loader.load() | |
for doc in loaded_docs: | |
if hasattr(doc, 'page_content'): | |
documents.append(doc) | |
except Exception as e: | |
logging.warning(f"Skipping {filename}: {str(e)}") | |
if not documents: | |
st.error("β οΈ No valid documents found. Check the PDF content.") | |
return | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
texts = text_splitter.split_documents(documents) | |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
db = None | |
MAX_BATCH_SIZE = 5461 | |
for i in range(0, len(texts), MAX_BATCH_SIZE): | |
batch = texts[i:i + MAX_BATCH_SIZE] | |
if db is None: | |
db = Chroma.from_documents(batch, embeddings, persist_directory=persist_directory) | |
else: | |
db.add_documents(batch) | |
db.persist() | |
logging.info("Data ingestion completed.") | |
except Exception as e: | |
logging.error(f"Ingestion error: {e}") | |
st.error(f"Ingestion failed: {e}") | |
def llm_pipeline(): | |
pipe = pipeline( | |
'text2text-generation', | |
model=base_model, | |
tokenizer=tokenizer, | |
max_length=256, | |
do_sample=True, | |
temperature=0.3, | |
top_p=0.95, | |
device=device | |
) | |
return HuggingFacePipeline(pipeline=pipe) | |
def qa_llm(): | |
llm = llm_pipeline() | |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
retriever = db.as_retriever() | |
return RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True) | |
def process_answer(user_question): | |
try: | |
qa = qa_llm() | |
prompt = f""" | |
You are a helpful and accurate RAG-based chatbot. Your role is to analyze the content from uploaded PDF documents and | |
provide informative and detailed answers to any questions asked by the user. Use the uploaded knowledge to answer precisely. | |
Question: {user_question} | |
""" | |
output = qa({"query": prompt}) | |
return output['result'] | |
except Exception as e: | |
logging.error(f"QA failed: {e}") | |
return "β Could not generate a valid answer." | |
# ---------------- STREAMLIT UI ---------------- # | |
# Sidebar Upload | |
st.sidebar.header("π€ Upload PDF Files") | |
uploaded_files = st.sidebar.file_uploader("Select one or more PDF files", type="pdf", accept_multiple_files=True) | |
if uploaded_files: | |
if not os.path.exists(uploaded_files_dir): | |
os.makedirs(uploaded_files_dir) | |
for file in uploaded_files: | |
path = os.path.join(uploaded_files_dir, file.name) | |
with open(path, "wb") as f: | |
f.write(file.getbuffer()) | |
st.sidebar.success(f"{len(uploaded_files)} file(s) uploaded.") | |
# Display previews | |
st.subheader("π Uploaded PDF Previews") | |
for file in uploaded_files: | |
with st.expander(file.name): | |
st.text(extract_outline_from_pdf(os.path.join(uploaded_files_dir, file.name))) | |
# Trigger ingestion | |
with st.spinner("π Ingesting uploaded documents..."): | |
data_ingestion() | |
# Ask a question | |
st.header("β Ask a Question from Your Documents") | |
user_input = st.text_input("Enter your question:") | |
if user_input: | |
with st.spinner("π¬ Generating response..."): | |
response = process_answer(user_input) | |
st.success(response) | |
else: | |
st.sidebar.info("Upload PDFs to begin your QA journey.") | |