Spaces:
Running
Running
import os | |
import uuid | |
import shutil | |
import tempfile | |
import zipfile | |
from faiss import IndexFlatL2 | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.docstore.in_memory import InMemoryDocstore | |
class VectorStoreManager: | |
def __init__(self, embeddings=None): | |
""" | |
Initializes the VectorStoreManager with a FAISS vector store. | |
Args: | |
embeddings (Embeddings, optional): Embeddings model used for the vector store. | |
""" | |
self.vectorstore = None | |
if embeddings: | |
self.vectorstore = self.create_vectorstore(embeddings) | |
def create_vectorstore(self, embeddings): | |
""" | |
Creates and initializes a FAISS vector store. | |
Args: | |
embeddings (Embeddings): Embeddings model used for the vector store. | |
Returns: | |
FAISS: Initialized vector store. | |
""" | |
# Define vector store dimensions based on embeddings | |
dimensions = len(embeddings.embed_query("dummy")) | |
# Initialize FAISS vector store | |
vectorstore = FAISS( | |
embedding_function=embeddings, | |
index=IndexFlatL2(dimensions), | |
docstore=InMemoryDocstore(), | |
index_to_docstore_id={}, | |
normalize_L2=False | |
) | |
print("Created a new FAISS vector store.") | |
return vectorstore | |
def add_documents(self, documents): | |
""" | |
Adds new documents to the FAISS vector store, each document with a unique UUID. | |
Args: | |
documents (list): List of Document objects to be added to the vector store. | |
Returns: | |
list: List of UUIDs corresponding to the added documents. | |
""" | |
if not self.vectorstore: | |
raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
uuids = [str(uuid.uuid4()) for _ in range(len(documents))] | |
self.vectorstore.add_documents(documents=documents, ids=uuids) | |
print(f"Added {len(documents)} documents to the vector store with IDs: {uuids}") | |
return uuids | |
def delete_documents(self, ids): | |
""" | |
Deletes documents from the FAISS vector store using their unique IDs. | |
Args: | |
ids (list): List of UUIDs corresponding to the documents to be deleted. | |
Returns: | |
bool: True if the documents were successfully deleted, False otherwise. | |
""" | |
if not self.vectorstore: | |
raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
if not ids: | |
print("No document IDs provided for deletion.") | |
return False | |
success = self.vectorstore.delete(ids=ids) | |
if success: | |
print(f"Successfully deleted documents with IDs: {ids}") | |
else: | |
print(f"Failed to delete documents with IDs: {ids}") | |
return success | |
def save(self, filename="faiss_index"): | |
""" | |
Saves the current FAISS vector store locally. If the saved store is a directory, | |
it compresses it into a ZIP archive. | |
Args: | |
filename (str): The filename or directory name where the vector store will be saved. | |
Returns: | |
dict: A dictionary with details about the saved file including file path and media type. | |
""" | |
if not self.vectorstore: | |
raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
# Save the vectorstore locally | |
self.vectorstore.save_local(filename) | |
print(f"Vector store saved to {filename}") | |
if not os.path.exists(filename): | |
raise FileNotFoundError("Saved vectorstore not found.") | |
# If the saved vectorstore is a directory, compress it into a zip file. | |
if os.path.isdir(filename): | |
zip_filename = filename + ".zip" | |
shutil.make_archive(filename, 'zip', filename) | |
return { | |
"file_path": zip_filename, | |
"media_type": "application/zip", | |
"serve_filename": os.path.basename(zip_filename), | |
"original": filename, | |
} | |
else: | |
return { | |
"file_path": filename, | |
"media_type": "application/octet-stream", | |
"serve_filename": os.path.basename(filename), | |
"original": filename, | |
} | |
def load(file_input, embeddings): | |
""" | |
Loads a FAISS vector store from an uploaded file or a filename. | |
If file_input is a file-like object, it is saved to a temporary file. | |
If it's a string (filename), it is used directly. | |
""" | |
# Check if file_input is a string (filename) or a file-like object. | |
if isinstance(file_input, str): | |
tmp_filename = file_input | |
else: | |
with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
tmp.write(file_input.read()) | |
tmp_filename = tmp.name | |
try: | |
if zipfile.is_zipfile(tmp_filename): | |
with tempfile.TemporaryDirectory() as extract_dir: | |
with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: | |
zip_ref.extractall(extract_dir) | |
extracted_items = os.listdir(extract_dir) | |
if len(extracted_items) == 1: | |
potential_dir = os.path.join(extract_dir, extracted_items[0]) | |
if os.path.isdir(potential_dir): | |
vectorstore_dir = potential_dir | |
else: | |
vectorstore_dir = extract_dir | |
else: | |
vectorstore_dir = extract_dir | |
new_vectorstore = FAISS.load_local(vectorstore_dir, embeddings, allow_dangerous_deserialization=True) | |
message = "Vector store loaded successfully from ZIP." | |
else: | |
new_vectorstore = FAISS.load_local(tmp_filename, embeddings, allow_dangerous_deserialization=True) | |
message = "Vector store loaded successfully." | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error loading vectorstore: {str(e)}") | |
finally: | |
# Only remove the temp file if we created it here (i.e. file_input was not a filename) | |
if not isinstance(file_input, str) and os.path.exists(tmp_filename): | |
os.remove(tmp_filename) | |
instance = VectorStoreManager() | |
instance.vectorstore = new_vectorstore | |
print(message) | |
return instance, message | |
def merge(self, file_input, embeddings): | |
""" | |
Merges an uploaded vector store file into the current FAISS vector store. | |
Args: | |
file_input (Union[file-like object, str]): An object with a .read() method or a filename (str). | |
embeddings (Embeddings): Embeddings model used for loading the vector store. | |
Returns: | |
dict: A dictionary containing a message indicating successful merging. | |
""" | |
# Determine if file_input is a filename (str) or a file-like object. | |
if isinstance(file_input, str): | |
tmp_filename = file_input | |
temp_created = False | |
else: | |
with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
tmp.write(file_input.read()) | |
tmp_filename = tmp.name | |
temp_created = True | |
try: | |
# Check if the file is a ZIP archive. | |
if zipfile.is_zipfile(tmp_filename): | |
with tempfile.TemporaryDirectory() as extract_dir: | |
with zipfile.ZipFile(tmp_filename, 'r') as zip_ref: | |
zip_ref.extractall(extract_dir) | |
extracted_items = os.listdir(extract_dir) | |
if len(extracted_items) == 1: | |
potential_dir = os.path.join(extract_dir, extracted_items[0]) | |
if os.path.isdir(potential_dir): | |
vectorstore_dir = potential_dir | |
else: | |
vectorstore_dir = extract_dir | |
else: | |
vectorstore_dir = extract_dir | |
source_store = FAISS.load_local( | |
vectorstore_dir, embeddings, allow_dangerous_deserialization=True | |
) | |
else: | |
source_store = FAISS.load_local( | |
tmp_filename, embeddings, allow_dangerous_deserialization=True | |
) | |
if not self.vectorstore: | |
raise ValueError("Vector store is not initialized. Please create or load a vector store first.") | |
self.vectorstore.merge_from(source_store) | |
print("Successfully merged the source vector store into the current vector store.") | |
except Exception as e: | |
raise Exception(f"Error merging vectorstore: {str(e)}") | |
finally: | |
if temp_created and os.path.exists(tmp_filename): | |
os.remove(tmp_filename) | |
return {"message": "Vector stores merged successfully"} | |