DrishtiSharma's picture
Update test.py
7f17ee4 verified
raw
history blame
6.43 kB
import sys
import os
import re
import shutil
import time
import streamlit as st
import nltk
# Ensure NLTK 'punkt' resource is downloaded
nltk_data_path = os.path.join(os.getcwd(), "nltk_data")
os.makedirs(nltk_data_path, exist_ok=True)
nltk.data.path.append(nltk_data_path)
# Force download of the 'punkt' resource
try:
print("Ensuring NLTK 'punkt' resource is downloaded...")
nltk.download("punkt", download_dir=nltk_data_path)
except Exception as e:
print(f"Error downloading NLTK 'punkt': {e}")
sys.path.append(os.path.abspath("."))
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from langchain.document_loaders import UnstructuredPDFLoader
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import NLTKTextSplitter
from patent_downloader import PatentDownloader
PERSISTED_DIRECTORY = "."
# Fetch API key securely from the environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("Critical Error: OpenAI API key not found in the environment variables. Please configure it.")
st.stop()
def check_poppler_installed():
if not shutil.which("pdfinfo"):
raise EnvironmentError(
"Poppler is not installed or not in PATH. Install 'poppler-utils' for PDF processing."
)
check_poppler_installed()
def load_docs(document_path):
try:
loader = UnstructuredPDFLoader(
document_path,
mode="elements",
strategy="fast",
ocr_languages=None # Explicitly disable OCR
)
documents = loader.load()
text_splitter = NLTKTextSplitter(chunk_size=1000)
return text_splitter.split_documents(documents)
except Exception as e:
st.error(f"Failed to load and process PDF: {e}")
st.stop()
def already_indexed(vectordb, file_name):
indexed_sources = set(
x["source"] for x in vectordb.get(include=["metadatas"])["metadatas"]
)
return file_name in indexed_sources
def load_chain(file_name=None):
loaded_patent = st.session_state.get("LOADED_PATENT")
vectordb = Chroma(
persist_directory=PERSISTED_DIRECTORY,
embedding_function=HuggingFaceEmbeddings(),
)
if loaded_patent == file_name or already_indexed(vectordb, file_name):
st.write("βœ… Already indexed.")
else:
vectordb.delete_collection()
docs = load_docs(file_name)
st.write("πŸ” Number of Documents: ", len(docs))
vectordb = Chroma.from_documents(
docs, HuggingFaceEmbeddings(), persist_directory=PERSISTED_DIRECTORY
)
vectordb.persist()
st.session_state["LOADED_PATENT"] = file_name
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
input_key="question",
output_key="answer",
)
return ConversationalRetrievalChain.from_llm(
OpenAI(temperature=0, openai_api_key=OPENAI_API_KEY),
vectordb.as_retriever(search_kwargs={"k": 3}),
return_source_documents=False,
memory=memory,
)
def extract_patent_number(url):
pattern = r"/patent/([A-Z]{2}\d+)"
match = re.search(pattern, url)
return match.group(1) if match else None
def download_pdf(patent_number):
try:
patent_downloader = PatentDownloader(verbose=True)
output_path = patent_downloader.download(patents=patent_number)
return output_path[0] # Return the first file path
except Exception as e:
st.error(f"Failed to download patent PDF: {e}")
st.stop()
if __name__ == "__main__":
st.set_page_config(
page_title="Patent Chat: Google Patents Chat Demo",
page_icon="πŸ“–",
layout="wide",
initial_sidebar_state="expanded",
)
st.header("πŸ“– Patent Chat: Google Patents Chat Demo")
# Allow user to input the Google patent link
patent_link = st.text_input("Enter Google Patent Link:", key="PATENT_LINK")
if not patent_link:
st.warning("Please enter a Google patent link to proceed.")
st.stop()
patent_number = extract_patent_number(patent_link)
if not patent_number:
st.error("Invalid patent link format. Please provide a valid Google patent link.")
st.stop()
st.write(f"Patent number: **{patent_number}**")
# Download the PDF file
pdf_path = f"{patent_number}.pdf"
if os.path.isfile(pdf_path):
st.write("βœ… File already downloaded.")
else:
st.write("πŸ“₯ Downloading patent file...")
pdf_path = download_pdf(patent_number)
st.write(f"βœ… File downloaded: {pdf_path}")
# Load the conversational chain
st.write("πŸ”„ Loading document into the system...")
chain = load_chain(pdf_path)
st.success("πŸš€ Document successfully loaded! You can now start asking questions.")
# Initialize the chat
if "messages" not in st.session_state:
st.session_state["messages"] = [
{"role": "assistant", "content": "Hello! How can I assist you with this patent?"}
]
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# User input
if user_input := st.chat_input("What is your question?"):
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
# Generate assistant response
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
with st.spinner("Generating response..."):
try:
assistant_response = chain({"question": user_input})
for chunk in assistant_response["answer"].split():
full_response += chunk + " "
time.sleep(0.05) # Simulate typing effect
message_placeholder.markdown(full_response + "β–Œ")
except Exception as e:
full_response = f"An error occurred: {e}"
finally:
message_placeholder.markdown(full_response)
st.session_state.messages.append({"role": "assistant", "content": full_response})