A mini version of LISA in a Jupyter notebook for easier testing and playing around.

In [2]:
# import some packages
import os

from dotenv import load_dotenv
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain.llms import HuggingFaceTextGenInference
from langchain.chains.conversation.memory import (
 ConversationBufferWindowMemory,
)

In [3]:
# Set api keys
load_dotenv("API.env") # put all the API tokens here, such as openai, huggingface...
HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")

In [None]:
# Set inference link, use this online one for easier reproduce
inference_api_url = 'https://api-inference.huggingface.co/models/HuggingFaceH4/zephyr-7b-beta'
# Recommend using better LLMs, such as Mixtral 7x8B

llm = HuggingFaceTextGenInference(
 verbose=True, # Provides detailed logs of operation
 max_new_tokens=1024, # Maximum number of token that can be generated.
 top_p=0.95, # Threshold for controlling randomness in text generation process. 
 typical_p=0.95, #
 temperature=0.1, # For choosing probable words.
 inference_server_url=inference_api_url, # URL des Inferenzservers
 timeout=120, # Timeout for connection with the url
 )

# Alternative, you can load model locally, e.g.:
# model_path = "where/you/store/local/models/zephyr-7b-beta" # change this to your model path
# model = AutoModelForCausalLM.from_pretrained(model_path, device_map="auto")
# tokenizer = AutoTokenizer.from_pretrained(model_path)
# pipe = pipeline(
# "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=1024, model_kwargs={"temperature":0.1}
# )
# llm = HuggingFacePipeline(pipeline=pipe)

In [5]:
# Function for reading and chunking text
def load_pdf_as_docs(pdf_path, loader_module=None):
 """Load and parse pdf files."""
 
 if pdf_path.endswith('.pdf'): # single file
 pdf_docs = [pdf_path]
 else: # a directory
 pdf_docs = [os.path.join(pdf_path, f) for f in os.listdir(pdf_path) if f.endswith('.pdf')]
 
 docs = []
 
 if loader_module is None: # Set PDFLoader
 loader_module = PyPDFLoader
 for pdf in pdf_docs:
 loader = loader_module(pdf)
 doc = loader.load()
 docs.extend(doc)
 
 return docs

def get_doc_chunks(docs, splitter=None):
 """Split docs into chunks."""
 
 if splitter is None:
 splitter = RecursiveCharacterTextSplitter(
 separators=["\n\n", "\n"], chunk_size=256, chunk_overlap=128
 )
 chunks = splitter.split_documents(docs)
 
 return chunks

In [6]:
# Specify the directory containing your PDFs
directory = "data/documents" # change to your pdf directory

# Find and parse all PDFs in the directory
pdf_docs = load_pdf_as_docs(directory, PyPDFLoader)

document_chunks = get_doc_chunks(pdf_docs)

In [None]:
# Set embedding
embeddings = HuggingFaceEmbeddings(model_name='BAAI/bge-base-en-v1.5') # choose the one you like

# Set vectorstore, e.g. FAISS
texts = ["LISA - Lithium Ion Solid-state Assistant"]
vectorstore = FAISS.from_texts(texts, embeddings) # this is a workaround as FAISS cannot be initialized by 'FAISS(embedding_function=embeddings)', waiting for Langchain fix
# You may also use Chroma
# vectorstore = Chroma(embedding_function=embeddings)

In [8]:
# Create retrievers
# Some advanced RAG, with parent document retriever, hybrid-search and rerank

# 1. ParentDocumentRetriever. Note: this will take a long time (~several minutes)

from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
# For local storage, ref: https://stackoverflow.com/questions/77385587/persist-parentdocumentretriever-of-langchain
store = InMemoryStore()

parent_splitter = RecursiveCharacterTextSplitter(separators=["\n\n", "\n"], chunk_size=512, chunk_overlap=128)
child_splitter = RecursiveCharacterTextSplitter(separators=["\n\n", "\n"], chunk_size=256, chunk_overlap=64)

parent_doc_retriver = ParentDocumentRetriever(
 vectorstore=vectorstore,
 docstore=store,
 child_splitter=child_splitter,
 parent_splitter=parent_splitter,
)
parent_doc_retriver.add_documents(pdf_docs)

In [9]:
# 2. Hybrid search
from langchain.retrievers import BM25Retriever

bm25_retriever = BM25Retriever.from_documents(document_chunks, k=5) # 1/2 of dense retriever, experimental value

In [10]:
# 3. Rerank
"""
Ref:
https://medium.aiplanet.com/advanced-rag-cohere-re-ranker-99acc941601c
https://github.com/langchain-ai/langchain/issues/13076
good to read:
https://teemukanstren.com/2023/12/25/llmrag-based-question-answering/
"""
from __future__ import annotations
from typing import Dict, Optional, Sequence
from langchain.schema import Document
from langchain.pydantic_v1 import Extra, root_validator

from langchain.callbacks.manager import Callbacks
from langchain.retrievers.document_compressors.base import BaseDocumentCompressor

from sentence_transformers import CrossEncoder

model_name = "BAAI/bge-reranker-large"

class BgeRerank(BaseDocumentCompressor):
 model_name:str = model_name
 """Model name to use for reranking.""" 
 top_n: int = 10 
 """Number of documents to return."""
 model:CrossEncoder = CrossEncoder(model_name)
 """CrossEncoder instance to use for reranking."""

 def bge_rerank(self,query,docs):
 model_inputs = [[query, doc] for doc in docs]
 scores = self.model.predict(model_inputs)
 results = sorted(enumerate(scores), key=lambda x: x[1], reverse=True)
 return results[:self.top_n]


 class Config:
 """Configuration for this pydantic object."""

 extra = Extra.forbid
 arbitrary_types_allowed = True

 def compress_documents(
 self,
 documents: Sequence[Document],
 query: str,
 callbacks: Optional[Callbacks] = None,
 ) -> Sequence[Document]:
 """
 Compress documents using BAAI/bge-reranker models.

 Args:
 documents: A sequence of documents to compress.
 query: The query to use for compressing the documents.
 callbacks: Callbacks to run during the compression process.

 Returns:
 A sequence of compressed documents.
 """
 
 if len(documents) == 0: # to avoid empty api call
 return []
 doc_list = list(documents)
 _docs = [d.page_content for d in doc_list]
 results = self.bge_rerank(query, _docs)
 final_results = []
 for r in results:
 doc = doc_list[r[0]]
 doc.metadata["relevance_score"] = r[1]
 final_results.append(doc)
 return final_results
 
 
from langchain.retrievers import ContextualCompressionRetriever

In [11]:
# Stack all the retrievers together
from langchain.retrievers import EnsembleRetriever
# Ensemble all above
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, parent_doc_retriver], weights=[0.5, 0.5])

# Rerank
compressor = BgeRerank()
rerank_retriever = ContextualCompressionRetriever(
 base_compressor=compressor, base_retriever=ensemble_retriever
)

In [12]:
## Now begin to build Q&A system
class RAGChain:
 def __init__(
 self, memory_key="chat_history", output_key="answer", return_messages=True
 ):
 self.memory_key = memory_key
 self.output_key = output_key
 self.return_messages = return_messages

 def create(self, retriver, llm):
 memory = ConversationBufferWindowMemory(
 memory_key=self.memory_key,
 return_messages=self.return_messages,
 output_key=self.output_key,
 )

 # https://github.com/langchain-ai/langchain/issues/4608
 conversation_chain = ConversationalRetrievalChain.from_llm(
 llm=llm,
 retriever=retriver,
 memory=memory,
 return_source_documents=True,
 rephrase_question=False, # disable rephrase, for test purpose
 get_chat_history=lambda x: x,
 )
 
 return conversation_chain
 
 
rag_chain = RAGChain()
lisa_qa_conversation = rag_chain.create(rerank_retriever, llm)

In [None]:
# Now begin to ask question
question = "Please name two common solid electrolytes."
result = lisa_qa_conversation({"question":question, "chat_history": []})
print(result["answer"])

In [None]:
# The rests are for Gradio GUI

import gradio as gr
import time
from pathlib import Path

# Gradio utils
def add_text(history, text):
 """Add conversation to history message."""
 history = history + [(text, None)]
 yield history, ""


def bot_lisa(history):
 """Get answer from LLM."""
 result = lisa_qa_conversation(
 {
 "question": history[-1][0], # or "query" if RetrievalQA
 "chat_history": history[:-1],
 }
 )
 print(f"Answer: {result['answer']}")
 print(f"Source document: {result['source_documents']}") # for debug
 # Citation post-processing
 answer_text = result["answer"].strip()
 history[-1][1] = "" # Fake stream, TODO: implement streaming
 for character in result["answer"].strip():
 time.sleep(0.002)
 history[-1][1] += character
 yield history, "citation place holder"


def bot(history, qa_conversation):
 """Get answer from LLM."""
 # print("id of qa conver", id(qa_conversation)) # for debug
 if qa_conversation is None:
 gr.Warning("Please upload a document first.")
 
 result = qa_conversation(
 {
 "question": history[-1][0], # or "query" if RetrievalQA
 "chat_history": history[:-1],
 }
 )
 print(f"Source document: {result['source_documents']}") # for debug
 history[-1][1] = "" # Fake stream, TODO: implement streaming
 for character in result["answer"].strip():
 time.sleep(0.002)
 history[-1][1] += character
 yield history


# Ref: https://huggingface.co/spaces/fffiloni/langchain-chat-with-pdf
def document_changes(doc_path):#, repo_id):
 if doc_path is None:
 gr.Warning("Please choose a document first and wait until uploaded.")
 return "Please choose a document and wait until uploaded.", None # for langchain_status, qa_conversation
 
 print("now reading document")
 print(f"file is located at {doc_path[0]}")
 
 file_extension = Path(doc_path[0]).suffix
 if file_extension == ".pdf":
 pdf_docs = load_pdf_as_docs(doc_path[0])
 document_chunks = get_doc_chunks(pdf_docs)
 elif file_extension == ".xml":
 raise
 # documents = load_xml_as_docs(doc_path[0])
 
 print("now creating vectordatabase")
 
 texts = ["LISA - Lithium Ion Solid-state Assistant"]
 vectorstore = FAISS.from_texts(texts, embeddings)

 store = InMemoryStore()

 parent_splitter = RecursiveCharacterTextSplitter(separators=["\n\n", "\n"], chunk_size=512, chunk_overlap=256)
 child_splitter = RecursiveCharacterTextSplitter(separators=["\n\n", "\n"], chunk_size=256, chunk_overlap=128)

 parent_doc_retriver = ParentDocumentRetriever(
 vectorstore=vectorstore,
 docstore=store,
 child_splitter=child_splitter,
 parent_splitter=parent_splitter,
 )
 parent_doc_retriver.add_documents(pdf_docs)

 bm25_retriever = BM25Retriever.from_documents(document_chunks, k=5) # 1/2 of dense retriever, experimental value

 # Ensemble all above
 ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, parent_doc_retriver], weights=[0.5, 0.5])

 compressor = BgeRerank()
 rerank_retriever = ContextualCompressionRetriever(
 base_compressor=compressor, base_retriever=ensemble_retriever
 )

 rag_chain = RAGChain()
 qa_conversation = rag_chain.create(rerank_retriever, llm)
 
 print("now getting llm model")
 

 file_name = Path(doc_path[0]).name # First file
 return f"Ready for {file_name}", qa_conversation


# Main gradio UI
def main():
 # Gradio interface
 with gr.Blocks() as demo:
 ######################################################################
 # LISA chat tab

 # Title info
 gr.Markdown("## LISA")
 gr.Markdown("Q&A system with RAG.")

 with gr.Tab("LISA"):
 # Chatbot
 chatbot = gr.Chatbot(
 [],
 elem_id="chatbot",
 label="Document Assistant (chat-history context is not supported at the moment, fixing...)",
 bubble_full_width=False,
 show_copy_button=True,
 likeable=True,
 ) # .style(height=750)
 with gr.Row():
 with gr.Column(scale=80):
 user_txt = gr.Textbox(
 label="Question",
 placeholder="Type question and press Enter",
 ) # .style(container=False)
 with gr.Column(scale=10):
 submit_btn = gr.Button("Submit", variant="primary")
 with gr.Column(scale=10):
 clear_btn = gr.Button("Clear", variant="stop")
 # Reference (citations)
 with gr.Accordion("Advanced - Document references", open=False):
 doc_citation = gr.Markdown()
 # alternative: https://www.gradio.app/guides/creating-a-chatbot-fast
 gr.Examples(
 examples=[
 "Please name two common solid electrolytes.",
 "Please name two common oxide solid electrolytes.",
 "Please tell me what is solid-state battery.",
 "How to synthesize gc-LPSC?",
 "Please tell me the purpose of Kadi4Mat.",
 "Who is working on Kadi4Mat?",
 "Can you recommend a paper to get a deeper understanding of Kadi4Mat?",
 # "How to synthesize gc-LPSC, e.g., glass-ceramic Li5.5PS4.5Cl1.5?",
 ],
 inputs=user_txt,
 outputs=chatbot,
 fn=add_text,
 # cache_examples=True,
 )

 # Manage functions
 user_txt.submit(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(
 bot_lisa, chatbot, [chatbot, doc_citation]
 )

 submit_btn.click(
 add_text,
 [chatbot, user_txt],
 [chatbot, user_txt],
 # concurrency_limit=8,
 queue=False,
 ).then(bot_lisa, chatbot, [chatbot, doc_citation])

 clear_btn.click(lambda: None, None, chatbot, queue=False)

 ######################################################################

 ######################################################################
 # Document-based QA

 with gr.Tab("Document-based Q&A"):
 qa_conversation = gr.State()
 
 with gr.Row():
 with gr.Column(scale=3, variant="load_file_panel"):
 with gr.Row():
 gr.HTML(
 "Upload a pdf/xml file, click the Load file button and when everything is ready, you can start asking questions about the document."
 )
 with gr.Row():
 uploaded_doc = gr.File(
 label="Upload pdf/xml file (single)",
 file_count="multiple", # For better looking, but only support 1 file
 file_types=[".pdf", ".xml"],
 type="filepath",
 height=100,
 )

 with gr.Row():
 langchain_status = gr.Textbox(
 label="Status", placeholder="", interactive=False
 )
 load_document = gr.Button("Load file")

 with gr.Column(scale=7, variant="chat_panel"):
 chatbot = gr.Chatbot(
 [],
 elem_id="chatbot",
 # label="Document Assistant (chat-history context is not supported at the moment, fixing...)",
 label="Document Assistant (chat-history context is not supported at the moment, fixing...)",
 show_copy_button=True,
 likeable=True,
 ) # .style(height=350)
 docqa_question = gr.Textbox(
 label="Question",
 placeholder="Type question and press Enter/click Submit",
 )
 with gr.Row():
 with gr.Column(scale=50):
 docqa_submit_btn = gr.Button("Submit", variant="primary")
 with gr.Column(scale=50):
 docqa_clear_btn = gr.Button("Clear", variant="stop")
 
 gr.Examples(
 examples=[
 "Summarize the paper",
 "Summarize the paper in 3 bullet points",
 "What are the contributions of this paper",
 "Explain the practical implications of this paper",
 "Methods used in this paper",
 "What data has been used in this paper",
 "Results of the paper",
 "Conclusions from the paper",
 "Limitations of this paper",
 "Future works suggested in this paper",
 ],
 inputs=docqa_question,
 outputs=chatbot,
 fn=add_text,
 # cache_examples=True,
 )

 load_document.click(
 document_changes,
 inputs=[uploaded_doc], # , repo_id],
 outputs=[langchain_status, qa_conversation],#, docqa_db, docqa_retriever],
 queue=False,
 )
 
 docqa_question.submit(add_text, [chatbot, docqa_question], [chatbot, docqa_question]).then(
 bot, [chatbot, qa_conversation], chatbot
 )
 docqa_submit_btn.click(add_text, [chatbot, docqa_question], [chatbot, docqa_question]).then(
 bot, [chatbot, qa_conversation], chatbot
 )

 gr.Markdown("*Notes: The model may produce incorrect statements. Users should treat these outputs as suggestions or starting points, not as definitive or accurate facts.")

 ######################################################################

 demo.queue().launch(share=True)
 
 
main()