import os import uuid import shutil import tempfile import zipfile from faiss import IndexFlatL2 from langchain_community.vectorstores import FAISS from langchain_community.docstore.in_memory import InMemoryDocstore class VectorStoreManager: def __init__(self, embeddings=None): """ Initializes the VectorStoreManager with a FAISS vector store. Args: embeddings (Embeddings, optional): Embeddings model used for the vector store. """ self.vectorstore = None if embeddings: self.vectorstore = self.create_vectorstore(embeddings) def create_vectorstore(self, embeddings): """ Creates and initializes a FAISS vector store. Args: embeddings (Embeddings): Embeddings model used for the vector store. Returns: FAISS: Initialized vector store. """ # Define vector store dimensions based on embeddings dimensions = len(embeddings.embed_query("dummy")) # Initialize FAISS vector store vectorstore = FAISS( embedding_function=embeddings, index=IndexFlatL2(dimensions), docstore=InMemoryDocstore(), index_to_docstore_id={}, normalize_L2=False ) print("Created a new FAISS vector store.") return vectorstore def add_documents(self, documents): """ Adds new documents to the FAISS vector store, each document with a unique UUID. Args: documents (list): List of Document objects to be added to the vector store. Returns: list: List of UUIDs corresponding to the added documents. """ if not self.vectorstore: raise ValueError("Vector store is not initialized. Please create or load a vector store first.") uuids = [str(uuid.uuid4()) for _ in range(len(documents))] self.vectorstore.add_documents(documents=documents, ids=uuids) print(f"Added {len(documents)} documents to the vector store with IDs: {uuids}") return uuids def delete_documents(self, ids): """ Deletes documents from the FAISS vector store using their unique IDs. Args: ids (list): List of UUIDs corresponding to the documents to be deleted. Returns: bool: True if the documents were successfully deleted, False otherwise. """ if not self.vectorstore: raise ValueError("Vector store is not initialized. Please create or load a vector store first.") if not ids: print("No document IDs provided for deletion.") return False success = self.vectorstore.delete(ids=ids) if success: print(f"Successfully deleted documents with IDs: {ids}") else: print(f"Failed to delete documents with IDs: {ids}") return success def save(self, filename="faiss_index"): """ Saves the current FAISS vector store locally. If the saved store is a directory, it compresses it into a ZIP archive. Args: filename (str): The filename or directory name where the vector store will be saved. Returns: dict: A dictionary with details about the saved file including file path and media type. """ if not self.vectorstore: raise ValueError("Vector store is not initialized. Please create or load a vector store first.") # Save the vectorstore locally self.vectorstore.save_local(filename) print(f"Vector store saved to {filename}") if not os.path.exists(filename): raise FileNotFoundError("Saved vectorstore not found.") # If the saved vectorstore is a directory, compress it into a zip file. if os.path.isdir(filename): zip_filename = filename + ".zip" shutil.make_archive(filename, 'zip', filename) return { "file_path": zip_filename, "media_type": "application/zip", "serve_filename": os.path.basename(zip_filename), "original": filename, } else: return { "file_path": filename, "media_type": "application/octet-stream", "serve_filename": os.path.basename(filename), "original": filename, } @staticmethod def load(file_input, embeddings): """ Loads a FAISS vector store from an uploaded file or a filename. If file_input is a file-like object, it is saved to a temporary file. If it's a string (filename), it is used directly. """ # Check if file_input is a string (filename) or a file-like object. if isinstance(file_input, str): tmp_filename = file_input else: with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(file_input.read()) tmp_filename = tmp.name try: if zipfile.is_zipfile(tmp_filename): with tempfile.TemporaryDirectory() as extract_dir: with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: zip_ref.extractall(extract_dir) extracted_items = os.listdir(extract_dir) if len(extracted_items) == 1: potential_dir = os.path.join(extract_dir, extracted_items[0]) if os.path.isdir(potential_dir): vectorstore_dir = potential_dir else: vectorstore_dir = extract_dir else: vectorstore_dir = extract_dir new_vectorstore = FAISS.load_local(vectorstore_dir, embeddings, allow_dangerous_deserialization=True) message = "Vector store loaded successfully from ZIP." else: new_vectorstore = FAISS.load_local(tmp_filename, embeddings, allow_dangerous_deserialization=True) message = "Vector store loaded successfully." except Exception as e: raise HTTPException(status_code=500, detail=f"Error loading vectorstore: {str(e)}") finally: # Only remove the temp file if we created it here (i.e. file_input was not a filename) if not isinstance(file_input, str) and os.path.exists(tmp_filename): os.remove(tmp_filename) instance = VectorStoreManager() instance.vectorstore = new_vectorstore print(message) return instance, message def merge(self, file_input, embeddings): """ Merges an uploaded vector store file into the current FAISS vector store. Args: file_input (Union[file-like object, str]): An object with a .read() method or a filename (str). embeddings (Embeddings): Embeddings model used for loading the vector store. Returns: dict: A dictionary containing a message indicating successful merging. """ # Determine if file_input is a filename (str) or a file-like object. if isinstance(file_input, str): tmp_filename = file_input temp_created = False else: with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(file_input.read()) tmp_filename = tmp.name temp_created = True try: # Check if the file is a ZIP archive. if zipfile.is_zipfile(tmp_filename): with tempfile.TemporaryDirectory() as extract_dir: with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: zip_ref.extractall(extract_dir) extracted_items = os.listdir(extract_dir) if len(extracted_items) == 1: potential_dir = os.path.join(extract_dir, extracted_items[0]) if os.path.isdir(potential_dir): vectorstore_dir = potential_dir else: vectorstore_dir = extract_dir else: vectorstore_dir = extract_dir source_store = FAISS.load_local( vectorstore_dir, embeddings, allow_dangerous_deserialization=True ) else: source_store = FAISS.load_local( tmp_filename, embeddings, allow_dangerous_deserialization=True ) if not self.vectorstore: raise ValueError("Vector store is not initialized. Please create or load a vector store first.") self.vectorstore.merge_from(source_store) print("Successfully merged the source vector store into the current vector store.") except Exception as e: raise Exception(f"Error merging vectorstore: {str(e)}") finally: if temp_created and os.path.exists(tmp_filename): os.remove(tmp_filename) return {"message": "Vector stores merged successfully"}