# import os # import logging # import math # import streamlit as st # import fitz # PyMuPDF # from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline # from langchain_community.document_loaders import PDFMinerLoader # from langchain.text_splitter import RecursiveCharacterTextSplitter # from langchain_community.embeddings import SentenceTransformerEmbeddings # from langchain_community.vectorstores import Chroma # from langchain_community.llms import HuggingFacePipeline # from langchain.chains import RetrievalQA # # Set up logging # logging.basicConfig(level=logging.INFO) # # Define global variables # device = 'cpu' # persist_directory = "db" # uploaded_files_dir = "uploaded_files" # # Streamlit app configuration # st.set_page_config(page_title="Audit Assistant", layout="wide") # st.title("Audit Assistant") # # Load the model # checkpoint = "MBZUAI/LaMini-T5-738M" # tokenizer = AutoTokenizer.from_pretrained(checkpoint) # base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) # # Helper Functions # def extract_text_from_pdf(file_path): # """Extract text from a PDF using PyMuPDF (fitz).""" # try: # doc = fitz.open(file_path) # text = "" # for page_num in range(doc.page_count): # page = doc.load_page(page_num) # text += page.get_text("text") # return text # except Exception as e: # logging.error(f"Error reading PDF {file_path}: {e}") # return None # def data_ingestion(): # """Function to load PDFs and create embeddings with improved error handling and efficiency.""" # try: # logging.info("Starting data ingestion") # if not os.path.exists(uploaded_files_dir): # os.makedirs(uploaded_files_dir) # documents = [] # for filename in os.listdir(uploaded_files_dir): # if filename.endswith(".pdf"): # file_path = os.path.join(uploaded_files_dir, filename) # logging.info(f"Processing file: {file_path}") # try: # loader = PDFMinerLoader(file_path) # loaded_docs = loader.load() # if not loaded_docs: # logging.warning(f"Skipping file with missing or invalid metadata: {file_path}") # continue # for doc in loaded_docs: # if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: # documents.append(doc) # else: # logging.warning(f"Skipping invalid document structure in {file_path}") # except ValueError as e: # logging.error(f"Skipping {file_path}: {str(e)}") # continue # if not documents: # logging.error("No valid documents found to process.") # return # logging.info(f"Total valid documents: {len(documents)}") # # Proceed with splitting and embedding documents # text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) # texts = text_splitter.split_documents(documents) # logging.info(f"Total text chunks created: {len(texts)}") # if not texts: # logging.error("No valid text chunks to create embeddings.") # return # embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # # Proceed to split and embed the documents # MAX_BATCH_SIZE = 5461 # total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) # logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") # db = None # for i in range(total_batches): # batch_start = i * MAX_BATCH_SIZE # batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) # text_batch = texts[batch_start:batch_end] # logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") # if db is None: # db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) # else: # db.add_documents(text_batch) # db.persist() # logging.info("Data ingestion completed successfully") # except Exception as e: # logging.error(f"Error during data ingestion: {str(e)}") # raise # def llm_pipeline(): # """Set up the language model pipeline.""" # logging.info("Setting up LLM pipeline") # pipe = pipeline( # 'text2text-generation', # model=base_model, # tokenizer=tokenizer, # max_length=256, # do_sample=True, # temperature=0.3, # top_p=0.95, # device=device # ) # local_llm = HuggingFacePipeline(pipeline=pipe) # logging.info("LLM pipeline setup complete") # return local_llm # def qa_llm(): # """Set up the question-answering chain.""" # logging.info("Setting up QA model") # llm = llm_pipeline() # embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) # retriever = db.as_retriever() # Set up the retriever for the vector store # qa = RetrievalQA.from_chain_type( # llm=llm, # chain_type="stuff", # retriever=retriever, # return_source_documents=True # ) # logging.info("QA model setup complete") # return qa # def process_answer(user_question): # """Generate an answer to the user’s question.""" # try: # logging.info("Processing user question") # qa = qa_llm() # tailored_prompt = f""" # You are an expert chatbot designed to assist Chartered Accountants (CAs) in the field of audits. # Your goal is to provide accurate and comprehensive answers to any questions related to audit policies, procedures, # and accounting standards based on the provided PDF documents. # Please respond effectively and refer to the relevant standards and policies whenever applicable. # User question: {user_question} # """ # generated_text = qa({"query": tailored_prompt}) # answer = generated_text['result'] # if "not provide" in answer or "no information" in answer: # return "The document does not provide sufficient information to answer your question." # logging.info("Answer generated successfully") # return answer # except Exception as e: # logging.error(f"Error during answer generation: {str(e)}") # return "Error processing the question." # # Streamlit UI Setup # st.sidebar.header("File Upload") # uploaded_files = st.sidebar.file_uploader("Upload your PDF files", type=["pdf"], accept_multiple_files=True) # if uploaded_files: # # Save uploaded files # if not os.path.exists(uploaded_files_dir): # os.makedirs(uploaded_files_dir) # for uploaded_file in uploaded_files: # file_path = os.path.join(uploaded_files_dir, uploaded_file.name) # with open(file_path, "wb") as f: # f.write(uploaded_file.getbuffer()) # st.sidebar.success(f"Uploaded {len(uploaded_files)} file(s) successfully!") # # Run data ingestion when files are uploaded # data_ingestion() # # Display UI for Q&A # st.header("Ask a Question") # user_question = st.text_input("Enter your question here:") # if user_question: # answer = process_answer(user_question) # st.write(answer) # else: # st.sidebar.info("Upload PDF files to get started!") # # -------this is the second code!!! # import os # import logging # import math # import streamlit as st # import fitz # PyMuPDF # from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline # # from langchain_community.document_loaders import PDFMinerLoader # from langchain_community.document_loaders import PyMuPDFLoader # from langchain.text_splitter import RecursiveCharacterTextSplitter # from langchain_community.embeddings import SentenceTransformerEmbeddings # from langchain_community.vectorstores import Chroma # from langchain_community.llms import HuggingFacePipeline # from langchain.chains import RetrievalQA # device = 'cpu' # persist_directory = "db" # uploaded_files_dir = "uploaded_files" # logging.basicConfig(level=logging.INFO) # # for main Page Setup # st.set_page_config(page_title="RAG Chatbot", layout="wide") # st.title("πŸ“š RAG-based PDF Assistant") # # Load my model # checkpoint = "MBZUAI/LaMini-T5-738M" # tokenizer = AutoTokenizer.from_pretrained(checkpoint) # base_model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) # # ------------------------------- # # def extract_outline_from_pdf(path): # try: # doc = fitz.open(path) # outline_text = "" # for page_num in range(len(doc)): # page = doc[page_num] # outline_text += f"### Page {page_num+1}:\n{page.get_text('text')[:500]}\n---\n" # return outline_text if outline_text else "No preview available." # except Exception as e: # return f"Could not preview PDF: {e}" # def data_ingestion(): # """Load PDFs, validate content, and generate embeddings.""" # try: # logging.info("Starting data ingestion") # if not os.path.exists(uploaded_files_dir): # os.makedirs(uploaded_files_dir) # documents = [] # for filename in os.listdir(uploaded_files_dir): # if filename.endswith(".pdf"): # file_path = os.path.join(uploaded_files_dir, filename) # logging.info(f"Processing file: {file_path}") # try: # loader = PyMuPDFLoader(file_path) # loaded_docs = loader.load() # # Check if any content exists in loaded_docs # if not loaded_docs or len(loaded_docs[0].page_content.strip()) == 0: # logging.warning(f"No readable text found in {file_path}. Might be a scanned image or unsupported format.") # continue # for doc in loaded_docs: # if hasattr(doc, 'page_content') and len(doc.page_content.strip()) > 0: # documents.append(doc) # else: # logging.warning(f"Skipping invalid document structure in {file_path}") # except Exception as e: # logging.error(f"Skipping {file_path}: {str(e)}") # continue # if not documents: # logging.error("No valid documents found to process.") # return # logging.info(f"Total valid documents: {len(documents)}") # # Proceed with splitting and embedding documents # text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) # texts = text_splitter.split_documents(documents) # logging.info(f"Total text chunks created: {len(texts)}") # if not texts: # logging.error("No valid text chunks to create embeddings.") # return # embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # MAX_BATCH_SIZE = 5461 # total_batches = math.ceil(len(texts) / MAX_BATCH_SIZE) # logging.info(f"Processing {len(texts)} text chunks in {total_batches} batches...") # db = None # for i in range(total_batches): # batch_start = i * MAX_BATCH_SIZE # batch_end = min((i + 1) * MAX_BATCH_SIZE, len(texts)) # text_batch = texts[batch_start:batch_end] # logging.info(f"Processing batch {i + 1}/{total_batches}, size: {len(text_batch)}") # if db is None: # db = Chroma.from_documents(text_batch, embeddings, persist_directory=persist_directory) # else: # db.add_documents(text_batch) # db.persist() # logging.info("Data ingestion completed successfully") # except Exception as e: # logging.error(f"Error during data ingestion: {str(e)}") # raise # def llm_pipeline(): # pipe = pipeline( # 'text2text-generation', # model=base_model, # tokenizer=tokenizer, # max_length=256, # do_sample=True, # temperature=0.3, # top_p=0.95, # device=device # ) # return HuggingFacePipeline(pipeline=pipe) # def qa_llm(): # llm = llm_pipeline() # embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # db = Chroma(persist_directory=persist_directory, embedding_function=embeddings) # retriever = db.as_retriever() # return RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True) # def process_answer(user_question): # """Generate an answer to the user’s question using a general RAG-based prompt.""" # try: # logging.info("Processing user question") # qa = qa_llm() # Set up the retrieval-based QA chain # # Generalized, flexible prompt for any kind of PDF (resume, legal doc, etc.) # tailored_prompt = f""" # You are an intelligent and helpful AI assistant that provides answers strictly based on the provided document contents. # If the question cannot be answered using the documents, say: 'The document does not contain this information.' # Otherwise, respond clearly and concisely with relevant and factual details from the PDF. # Question: {user_question} # """ # generated_text = qa({"query": tailored_prompt}) # answer = generated_text['result'] # # Add a safeguard for hallucinated answers # if "not provide" in answer.lower() or "no information" in answer.lower() or len(answer.strip()) < 10: # return "The document does not contain this information." # logging.info("Answer generated successfully") # return answer # except Exception as e: # logging.error(f"Error during answer generation: {str(e)}") # return "Sorry, something went wrong while processing your question." # # ---------------- STREAMLIT UI ---------------- # # # Sidebar Upload # st.sidebar.header("πŸ“€ Upload PDF Files") # uploaded_files = st.sidebar.file_uploader("Select one or more PDF files", type="pdf", accept_multiple_files=True) # if uploaded_files: # if not os.path.exists(uploaded_files_dir): # os.makedirs(uploaded_files_dir) # for file in uploaded_files: # path = os.path.join(uploaded_files_dir, file.name) # with open(path, "wb") as f: # f.write(file.getbuffer()) # st.sidebar.success(f"{len(uploaded_files)} file(s) uploaded.") # # Display previews # st.subheader("πŸ“„ Uploaded PDF Previews") # for file in uploaded_files: # with st.expander(file.name): # st.text(extract_outline_from_pdf(os.path.join(uploaded_files_dir, file.name))) # # Trigger ingestion # with st.spinner("πŸ”„ Ingesting uploaded documents..."): # data_ingestion() # # Ask a question # st.header("❓ Ask a Question from Your Documents") # user_input = st.text_input("Enter your question:") # if user_input: # with st.spinner("πŸ’¬ Generating response..."): # response = process_answer(user_input) # st.success(response) # else: # st.sidebar.info("Upload PDFs to begin your QA journey.") import os import streamlit as st import fitz # PyMuPDF import logging import tempfile import shutil from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_community.embeddings import SentenceTransformerEmbeddings from langchain_community.llms import HuggingFacePipeline from langchain.chains import RetrievalQA from langchain_community.document_loaders import TextLoader # --- Configuration --- st.set_page_config(page_title="πŸ“š RAG PDF Chatbot", layout="wide") st.title("πŸ“š RAG-based PDF Chatbot") device = "cpu" # --- Logging --- logging.basicConfig(level=logging.INFO) # --- Load LLM --- @st.cache_resource def load_model(): checkpoint = "MBZUAI/LaMini-T5-738M" tokenizer = AutoTokenizer.from_pretrained(checkpoint) model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) pipe = pipeline('text2text-generation', model=model, tokenizer=tokenizer, max_length=512) return HuggingFacePipeline(pipeline=pipe) # --- Extract PDF Text --- def read_pdf(file): try: doc = fitz.open(stream=file.read(), filetype="pdf") text = "" for page in doc: text += page.get_text() return text.strip() except Exception as e: logging.error(f"Failed to extract text: {e}") return "" # --- Process Answer --- def process_answer(question, full_text): # Save the full_text to a temporary file with open("temp_text.txt", "w") as f: f.write(full_text) loader = TextLoader("temp_text.txt") docs = loader.load() # Chunk the documents text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) splits = text_splitter.split_documents(docs) # Load embeddings embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") # Create a temporary directory for ChromaDB chroma_dir = os.path.join(tempfile.gettempdir(), "chroma_db") if os.path.exists(chroma_dir): shutil.rmtree(chroma_dir) db = Chroma.from_documents(splits, embeddings, persist_directory=chroma_dir) retriever = db.as_retriever() # Set up the model llm = load_model() # RAG-style retrieval QA qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever) # Smart prompting if "summarize" in question.lower() or "summary" in question.lower() or "tl;dr" in question.lower(): prompt = f"Summarize the following document:\n\n{full_text[:3000]}" summary = llm(prompt) return summary else: return qa_chain.run(question) # --- UI Layout --- with st.sidebar: st.header("πŸ“„ Upload PDF") uploaded_file = st.file_uploader("Choose a PDF", type=["pdf"]) # --- Main Interface --- if uploaded_file: st.success(f"You uploaded: {uploaded_file.name}") full_text = read_pdf(uploaded_file) if full_text: st.subheader("πŸ“ PDF Preview") with st.expander("View Extracted Text"): st.write(full_text[:3000] + ("..." if len(full_text) > 3000 else "")) st.subheader("πŸ’¬ Ask a Question") user_question = st.text_input("Type your question about the PDF content") if user_question: with st.spinner("Thinking..."): answer = process_answer(user_question, full_text) st.markdown("### πŸ€– Answer") st.write(answer) with st.sidebar: st.markdown("---") st.markdown("**πŸ’‘ Suggestions:**") st.caption("Try: \"Summarize this document\" or \"What is the key idea?\"") with st.expander("πŸ’‘ Suggestions", expanded=True): st.markdown(""" - "Summarize this document" - "Give a quick summary" - "What are the main points?" - "Explain this document in short" """) else: st.error("⚠️ No text could be extracted from the PDF. Try another file.") else: st.info("Upload a PDF to begin.")