from langchain.vectorstores import Chroma from chromadb.api.fastapi import requests from langchain.schema import Document from langchain.chains import RetrievalQA from langchain.embeddings import HuggingFaceBgeEmbeddings from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain.chains.query_constructor.base import AttributeInfo from langchain.retrievers.self_query.chroma import ChromaTranslator from llm.llmFactory import LLMFactory from datetime import datetime import baseInfra.dropbox_handler as dbh from baseInfra.dbInterface import DbInterface from uuid import UUID from langchain.text_splitter import RecursiveCharacterTextSplitter import logging, asyncio logger=logging.getLogger("root") class myChromaTranslator(ChromaTranslator): allowed_operators = ["$and", "$or"] """Subset of allowed logical operators.""" allowed_comparators = [ "$eq","$ne","$gt","$gte","$lt","$lte", "$contains","$not_contains","$in","$nin"] class ChromaIntf(): def __init__(self): self.db_interface=DbInterface() model_name = "BAAI/bge-large-en-v1.5" encode_kwargs = {'normalize_embeddings': True} # set True to compute cosine similarity self.embedding = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs={'device': 'cpu'}, encode_kwargs=encode_kwargs ) self.persist_db_directory = 'db' self.persist_docs_directory = "persistence-docs" self.logger_file = "persistence.log" loop=asyncio.get_event_loop() try: loop.run_until_complete(dbh.restoreFolder(self.persist_db_directory)) loop.run_until_complete(dbh.restoreFolder(self.persist_docs_directory)) except: print("Probably folder doesn't exist as it is brand new setup") docs = [ Document( page_content="this is test doc", metadata={"timestamp":1696743148.474055,"ID":"2000-01-01 15:57:11::664165-test","source":"test"}, id="2000-01-01 15:57:11::664165-test" ), ] self.vectorstore = Chroma.from_documents(documents=docs, embedding=self.embedding, persist_directory=self.persist_db_directory) #self.vectorstore._client. # timestamp --> time when added # source --> notes/references/web/youtube/book/conversation, default conversation # title --> of document , will be conversation when source is conversation, default blank # author --> will default to blank # "Year": 2024, #"Month": 1, #"Day": 3, #"Hour": 11, #"Minute": 29 self.metadata_field_info = [ AttributeInfo( name="timestamp", description="Python datetime.timestamp of the document in isoformat, should not be used for query", type="str", ), AttributeInfo( name="Year", description="Year from the date when the entry was added in YYYY format", type="int", ), AttributeInfo( name="Month", description="Month from the date when the entry was added it is from 1-12", type="int", ), AttributeInfo( name="Day", description="Day of month from the date-time stamp when the entry was added, it is from 1-31", type="int", ), AttributeInfo( name="Hour", description="Hour from the timestamp when the entry was added", type="int", ), AttributeInfo( name="Minute", description="Minute from the timestamp when the entry was added", type="int", ), AttributeInfo( name="source", description="Type of entry", type="string or list[string]", ), AttributeInfo( name="title", description="Title or Subject of the entry", type="string", ), AttributeInfo( name="author", description="Author of the entry", type="string", ) ] self.document_content_description = "Information to store for retrival from LLM based chatbot" lf=LLMFactory() #self.llm=lf.get_llm("executor2") self.llm=lf.get_llm("executor3") self.retriever = SelfQueryRetriever.from_llm( self.llm, self.vectorstore, self.document_content_description, self.metadata_field_info, structured_query_translator=ChromaTranslator(), verbose=True ) async def getRelevantDocs(self,query:str,kwargs:dict): """This should also post the result to firebase""" print("retriver state",self.retriever.search_kwargs) print("retriver state",self.retriever.search_type) try: for key in kwargs.keys(): if "search_type" in key: self.retriever.search_type=kwargs[key] else: self.retriever.search_kwargs[key]=kwargs[key] except: print("setting search args failed") print("reaching step2") try: #loop=asyncio.get_event_loop() retVal=self.retriever.get_relevant_documents(query) except Exception as ex: logger.exception("Exception occured:",exc_info=True) value=[] excludeMeta=True print("reaching step3") print(str(len(retVal))) print("reaching step4") try: for item in retVal: if excludeMeta: v=item.page_content+" \n" else: v="Info:"+item.page_content+" " for key in item.metadata.keys(): if key != "ID": v+=key+":"+str(item.metadata[key])+" " value.append(v) print("reaching step5") self.db_interface.add_to_cache(input=query,value=value) except: print("reaching step6") for item in retVal: if excludeMeta: v=item['page_content']+" \n" else: v="Info:"+item['page_content']+" " for key in item['metadata'].keys(): if key != "ID": v+=key+":"+str(item['metadata'][key])+" " value.append(v) print("reaching step7") self.db_interface.add_to_cache(input=query,value=value) print("reaching step8") return retVal async def addText(self,inStr:str,metadata): # metadata expected is some of following # timestamp --> time when added # source --> notes/references/web/youtube/book/conversation, default conversation # title --> of document , will be conversation when source is conversation, default blank # author --> will default to blank ##TODO: Preprocess inStr to remove any html, markdown tags etc. metadata=metadata.dict() if "timestamp" not in metadata.keys(): metadata['timestamp']=datetime.now().isoformat() else: metadata['timestamp']=datetime.fromisoformat(metadata['timestamp']) pass if "source" not in metadata.keys(): metadata['source']="conversation" if "title" not in metadata.keys(): metadata["title"] = "" if metadata["source"] == "conversation": metadata["title"] == "conversation" if "author" not in metadata.keys(): metadata["author"] = "" #TODO: If url is present in input or when the splitting need to be done, then we'll need to change how we # formulate the ID and may be filename to store information metadata['ID']=metadata['timestamp'].strftime("%Y-%m-%d %H-%M-%S")+"-"+metadata['title'] metadata['Year']=metadata['timestamp'].year metadata['Month']=metadata['timestamp'].month metadata['Day']=int(metadata['timestamp'].strftime("%d")) metadata['Hour']=metadata['timestamp'].hour metadata['Minute']=metadata['timestamp'].minute metadata['timestamp']=metadata['timestamp'].isoformat() print("Metadata is:") print(metadata) #md.pop("timestamp") with open("./docs/"+metadata['ID']+".txt","w") as fd: fd.write(inStr) print("written to file", inStr) text_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=50, length_function=len, is_separator_regex=False) #docs = [ Document(page_content=inStr, metadata=metadata)] docs=text_splitter.create_documents([inStr],[metadata]) partNumber=0 for doc in docs: if partNumber > 0: doc.metadata['ID']+=f"__{partNumber}" partNumber+=1 print(f"{partNumber} follows:") print(doc) try: print(metadata['ID']) ids=[doc.metadata['ID'] for doc in docs] print("ids are:") print(ids) return await self.vectorstore.aadd_documents(docs,ids=ids) except Exception as ex: logger.exception("exception in adding",exc_info=True) print("inside expect of addText") return await self.vectorstore.aadd_documents(docs,ids=[metadata.ID]) async def listDocs(self): collection=self.vectorstore._client.get_collection(self.vectorstore._LANGCHAIN_DEFAULT_COLLECTION_NAME,embedding_function=self.embedding) return collection.get() #return self.vectorstore._client._get(collection_id=self._uuid(collectionInfo.id)) async def persist(self): self.vectorstore.persist() await dbh.backupFile(self.logger_file) await dbh.backupFolder(self.persist_db_directory) return await dbh.backupFolder(self.persist_docs_directory) def _uuid(self,uuid_str: str) -> UUID: try: return UUID(uuid_str) except ValueError: print("Error generating uuid") raise ValueError(f"Could not parse {uuid_str} as a UUID")