|
import streamlit as st |
|
from pypdf import PdfReader |
|
import os |
|
from pathlib import Path |
|
from dotenv import load_dotenv |
|
import pickle |
|
import timeit |
|
from PIL import Image |
|
import zipfile |
|
import datetime |
|
import shutil |
|
from collections import defaultdict |
|
import pandas as pd |
|
|
|
from langchain.embeddings import HuggingFaceEmbeddings |
|
from langchain.document_loaders import PyPDFLoader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.document_loaders import PyPDFLoader, DirectoryLoader |
|
from langchain.memory import ConversationBufferMemory |
|
from langchain.chains import ConversationalRetrievalChain |
|
from langchain.prompts.prompt import PromptTemplate |
|
from langchain.vectorstores import Chroma |
|
from langchain.document_loaders import PyPDFDirectoryLoader |
|
from langchain.retrievers import BM25Retriever, EnsembleRetriever |
|
from langchain.document_loaders import UnstructuredHTMLLoader |
|
from langchain.llms import OpenAI |
|
from langchain.chat_models import ChatOpenAI |
|
from langchain.agents.agent_toolkits import create_retriever_tool |
|
from langchain.agents.agent_toolkits import create_conversational_retrieval_agent |
|
from langchain.utilities import SerpAPIWrapper |
|
from langchain.agents import Tool |
|
from langchain.agents import load_tools |
|
from langchain.chat_models import ChatOpenAI |
|
from langchain.retrievers.multi_query import MultiQueryRetriever |
|
from langchain.chains import RetrievalQA |
|
import logging |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
current_timestamp = datetime.datetime.now() |
|
timestamp_string = current_timestamp.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
def build_llm(): |
|
''' |
|
Loading OpenAI model |
|
''' |
|
|
|
llm= ChatOpenAI(temperature = 0) |
|
return llm |
|
|
|
def build_embedding_model(): |
|
''' |
|
Loading Sentence transformer model for text embedding |
|
''' |
|
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2', |
|
model_kwargs={'device': 'cpu'}) |
|
return embeddings |
|
|
|
def unzip_opm(): |
|
|
|
zip_file_path = r'OPM_Files/OPM_Retirement_backup-20230902T130906Z-001.zip' |
|
|
|
|
|
extract_path = os.path.dirname(zip_file_path) |
|
|
|
|
|
extract_folder = os.path.splitext(os.path.basename(zip_file_path))[0] |
|
extract_folder_path = os.path.join(extract_path, extract_folder) |
|
|
|
|
|
if not os.path.exists(extract_folder_path): |
|
os.makedirs(extract_folder_path) |
|
|
|
|
|
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: |
|
|
|
zip_ref.extractall(extract_folder_path) |
|
|
|
print(f'Unzipped {zip_file_path} to {extract_folder_path}') |
|
return extract_folder_path |
|
|
|
|
|
|
|
|
|
|
|
return |
|
|
|
def count_files_by_type(folder_path): |
|
''' |
|
Counting files by file type in the specified folder |
|
''' |
|
file_count_by_type = defaultdict(int) |
|
|
|
for root, _, files in os.walk(folder_path): |
|
for file in files: |
|
_, extension = os.path.splitext(file) |
|
file_count_by_type[extension] += 1 |
|
|
|
return file_count_by_type |
|
|
|
def generate_file_count_table(file_count_by_type): |
|
''' |
|
Generate a table files count file type |
|
''' |
|
data = {"File Type": [], "Number of Files": []} |
|
for extension, count in file_count_by_type.items(): |
|
data["File Type"].append(extension) |
|
data["Number of Files"].append(count) |
|
|
|
df = pd.DataFrame(data) |
|
df = df.sort_values(by="Number of Files", ascending=False) |
|
return df |
|
|
|
def move_files_to_folders(folder_path): |
|
''' |
|
Move files to respective folder. Example, PDF docs to PDFs folder, HTML docs to HTMLs folder. |
|
''' |
|
for root, _, files in os.walk(folder_path): |
|
for file in files: |
|
_, extension = os.path.splitext(file) |
|
source_path = os.path.join(root, file) |
|
|
|
if extension == '.pdf': |
|
dest_folder = "PDFs" |
|
elif extension == '.html': |
|
dest_folder = "HTMLs" |
|
else: |
|
continue |
|
|
|
dest_path = os.path.join(dest_folder, file) |
|
os.makedirs(dest_folder, exist_ok=True) |
|
shutil.copy(source_path, dest_path) |
|
|
|
|
|
|
|
def load_vectorstore(persist_directory, embeddings): |
|
''' |
|
This function will try first to load chroma database from the disk. If it does exist, |
|
It will do the following, |
|
1) Load the pdfs |
|
2) create text chunks |
|
3) Index it and store it in a Chroma DB |
|
4) Peform the same for HTML files |
|
5) Store the final chroma db in the disk |
|
''' |
|
if os.path.exists(persist_directory): |
|
print("Using existing vectore store for these documents.") |
|
vectorstore = Chroma(persist_directory=persist_directory, embedding_function=embeddings) |
|
print("Chroma DB loaded from the disk") |
|
return vectorstore |
|
else: |
|
folder_path= unzip_opm() |
|
print("Vector store is not available. Creating new one.") |
|
file_count_by_type = count_files_by_type(folder_path) |
|
file_count_table = generate_file_count_table(file_count_by_type) |
|
print("File Count Table:") |
|
print(file_count_table) |
|
|
|
move_files_to_folders(folder_path) |
|
print("PDF and HTML files copied to separate folders.") |
|
|
|
|
|
pdf_folder_path= f"{folder_path}/PDFs" |
|
html_folder_path= f"{folder_path}/HTMLs" |
|
pdf_dir_loader = PyPDFDirectoryLoader(pdf_folder_path) |
|
pdf_pages = pdf_dir_loader.load() |
|
print("PDF files are loaded from the folder.") |
|
|
|
|
|
|
|
HTML_docs_path_list = [os.path.join(html_folder_path, f) for f in os.listdir(html_folder_path) if os.path.isfile(os.path.join(html_folder_path, f))] |
|
|
|
html_loaders= [] |
|
for html_file in HTML_docs_path_list: |
|
loader = UnstructuredHTMLLoader(html_file) |
|
html_loaders.append(loader) |
|
|
|
html_pages = [] |
|
docs_cannot_load= [] |
|
for loader in html_loaders: |
|
try: |
|
html_pages.extend(loader.load()) |
|
except: |
|
print("Cannot load the file:", loader) |
|
docs_cannot_load.append(loader) |
|
print("HTML files are loaded from the folder.") |
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
|
chunk_size = 1000, |
|
chunk_overlap = 200, |
|
length_function = len, |
|
is_separator_regex = False, |
|
) |
|
|
|
pdf_texts = text_splitter.transform_documents(pdf_pages) |
|
|
|
html_texts = text_splitter.transform_documents(html_pages) |
|
|
|
all_texts= pdf_texts+html_texts |
|
print("PDF and HTML docs are split into chunks and created a final list representing all the chunks.") |
|
|
|
|
|
vectorstore = Chroma.from_documents(all_texts, |
|
embeddings, |
|
persist_directory=persist_directory) |
|
vectorstore.persist() |
|
print("Chroma DB created and loaded") |
|
return vectorstore |
|
|
|
|
|
def load_text_chunks(text_chunks_pkl_dir): |
|
''' |
|
Loading the pickle file that holds all the documents from the disk. |
|
If it does not exist, create new one. |
|
Text documents are required to create BM25 Retriever. But loading all the documents in |
|
every session will be a time consuming process. So we are storing all the docs in a pickle file |
|
and load the pickle file from the disk to overcome this problem. |
|
''' |
|
try: |
|
print("Text chunks are loading from the disk") |
|
with open(text_chunks_pkl_dir, 'rb') as file: |
|
cached_text_chunks = pickle.load(file) |
|
|
|
print("Text chunks are loaded from the disk") |
|
return cached_text_chunks |
|
except: |
|
print("Creating text chunks from the docs and caching it.") |
|
folder_path= unzip_opm() |
|
pdf_folder_path= f"{folder_path}/PDFs" |
|
html_folder_path= f"{folder_path}/HTMLs" |
|
pdf_dir_loader = PyPDFDirectoryLoader(pdf_folder_path) |
|
pdf_pages = pdf_dir_loader.load() |
|
HTML_docs_path_list = [os.path.join(html_folder_path, f) for f in os.listdir(html_folder_path) if os.path.isfile(os.path.join(html_folder_path, f))] |
|
|
|
html_loaders= [] |
|
for html_file in HTML_docs_path_list: |
|
loader = UnstructuredHTMLLoader(html_file) |
|
html_loaders.append(loader) |
|
|
|
html_pages = [] |
|
for loader in html_loaders: |
|
try: |
|
html_pages.extend(loader.load()) |
|
except: |
|
print("Cannot load the file:", loader) |
|
all_texts= pdf_pages+html_pages |
|
|
|
with open('text_chunks.pkl', 'wb') as file: |
|
pickle.dump(all_texts, file) |
|
print("Text chunks are created and cached") |
|
|
|
def load_ensemble_retriver(embeddings, chroma_vectorstore): |
|
"""Load ensemble retiriever with BM25 and Chroma as individual retrievers""" |
|
|
|
|
|
chroma_retriever = chroma_vectorstore.as_retriever(search_kwargs={"k": 10}) |
|
|
|
logging.basicConfig() |
|
logging.getLogger('langchain.retrievers.multi_query').setLevel(logging.INFO) |
|
retriever_from_llm = MultiQueryRetriever.from_llm(retriever=chroma_retriever, |
|
llm=ChatOpenAI(temperature=0)) |
|
return retriever_from_llm |
|
|
|
|
|
def load_conversational_retrievel_chain(retriever, llm): |
|
'''Load Conversational Retrievel agent with following tasks as tools, |
|
1) OPM Knowledge base query |
|
2) INternet search with SerpAPI |
|
This agent combines RAG, chat interfaces, agents. |
|
''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template = """You are a helpful assistant. You do not respond as 'User' or pretend to be 'User'. You only respond once as 'Assistant'. |
|
Use the following pieces of context to answer the question at the end. If you don't know the answer,\ |
|
just say that you don't know, don't try to make up an answer. |
|
|
|
{context} |
|
|
|
{history} |
|
Question: {question} |
|
Helpful Answer:""" |
|
|
|
prompt = PromptTemplate(input_variables=["history", "context", "question"], template=template) |
|
memory = ConversationBufferMemory(input_key="question", memory_key="history") |
|
|
|
qa = RetrievalQA.from_chain_type( |
|
llm=llm, |
|
chain_type="stuff", |
|
retriever=retriever, |
|
return_source_documents=True, |
|
chain_type_kwargs={"memory": memory}, |
|
) |
|
return qa |
|
|