Spaces:
Running
Running
File size: 9,286 Bytes
7b7cab6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
import os
import uuid
import shutil
import tempfile
import zipfile
from faiss import IndexFlatL2
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
class VectorStoreManager:
def __init__(self, embeddings=None):
"""
Initializes the VectorStoreManager with a FAISS vector store.
Args:
embeddings (Embeddings, optional): Embeddings model used for the vector store.
"""
self.vectorstore = None
if embeddings:
self.vectorstore = self.create_vectorstore(embeddings)
def create_vectorstore(self, embeddings):
"""
Creates and initializes a FAISS vector store.
Args:
embeddings (Embeddings): Embeddings model used for the vector store.
Returns:
FAISS: Initialized vector store.
"""
# Define vector store dimensions based on embeddings
dimensions = len(embeddings.embed_query("dummy"))
# Initialize FAISS vector store
vectorstore = FAISS(
embedding_function=embeddings,
index=IndexFlatL2(dimensions),
docstore=InMemoryDocstore(),
index_to_docstore_id={},
normalize_L2=False
)
print("Created a new FAISS vector store.")
return vectorstore
def add_documents(self, documents):
"""
Adds new documents to the FAISS vector store, each document with a unique UUID.
Args:
documents (list): List of Document objects to be added to the vector store.
Returns:
list: List of UUIDs corresponding to the added documents.
"""
if not self.vectorstore:
raise ValueError("Vector store is not initialized. Please create or load a vector store first.")
uuids = [str(uuid.uuid4()) for _ in range(len(documents))]
self.vectorstore.add_documents(documents=documents, ids=uuids)
print(f"Added {len(documents)} documents to the vector store with IDs: {uuids}")
return uuids
def delete_documents(self, ids):
"""
Deletes documents from the FAISS vector store using their unique IDs.
Args:
ids (list): List of UUIDs corresponding to the documents to be deleted.
Returns:
bool: True if the documents were successfully deleted, False otherwise.
"""
if not self.vectorstore:
raise ValueError("Vector store is not initialized. Please create or load a vector store first.")
if not ids:
print("No document IDs provided for deletion.")
return False
success = self.vectorstore.delete(ids=ids)
if success:
print(f"Successfully deleted documents with IDs: {ids}")
else:
print(f"Failed to delete documents with IDs: {ids}")
return success
def save(self, filename="faiss_index"):
"""
Saves the current FAISS vector store locally. If the saved store is a directory,
it compresses it into a ZIP archive.
Args:
filename (str): The filename or directory name where the vector store will be saved.
Returns:
dict: A dictionary with details about the saved file including file path and media type.
"""
if not self.vectorstore:
raise ValueError("Vector store is not initialized. Please create or load a vector store first.")
# Save the vectorstore locally
self.vectorstore.save_local(filename)
print(f"Vector store saved to {filename}")
if not os.path.exists(filename):
raise FileNotFoundError("Saved vectorstore not found.")
# If the saved vectorstore is a directory, compress it into a zip file.
if os.path.isdir(filename):
zip_filename = filename + ".zip"
shutil.make_archive(filename, 'zip', filename)
return {
"file_path": zip_filename,
"media_type": "application/zip",
"serve_filename": os.path.basename(zip_filename),
"original": filename,
}
else:
return {
"file_path": filename,
"media_type": "application/octet-stream",
"serve_filename": os.path.basename(filename),
"original": filename,
}
@staticmethod
def load(file_input, embeddings):
"""
Loads a FAISS vector store from an uploaded file or a filename.
If file_input is a file-like object, it is saved to a temporary file.
If it's a string (filename), it is used directly.
"""
# Check if file_input is a string (filename) or a file-like object.
if isinstance(file_input, str):
tmp_filename = file_input
else:
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(file_input.read())
tmp_filename = tmp.name
try:
if zipfile.is_zipfile(tmp_filename):
with tempfile.TemporaryDirectory() as extract_dir:
with zipfile.ZipFile(tmp_filename, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
extracted_items = os.listdir(extract_dir)
if len(extracted_items) == 1:
potential_dir = os.path.join(extract_dir, extracted_items[0])
if os.path.isdir(potential_dir):
vectorstore_dir = potential_dir
else:
vectorstore_dir = extract_dir
else:
vectorstore_dir = extract_dir
new_vectorstore = FAISS.load_local(vectorstore_dir, embeddings, allow_dangerous_deserialization=True)
message = "Vector store loaded successfully from ZIP."
else:
new_vectorstore = FAISS.load_local(tmp_filename, embeddings, allow_dangerous_deserialization=True)
message = "Vector store loaded successfully."
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error loading vectorstore: {str(e)}")
finally:
# Only remove the temp file if we created it here (i.e. file_input was not a filename)
if not isinstance(file_input, str) and os.path.exists(tmp_filename):
os.remove(tmp_filename)
instance = VectorStoreManager()
instance.vectorstore = new_vectorstore
print(message)
return instance, message
def merge(self, file_input, embeddings):
"""
Merges an uploaded vector store file into the current FAISS vector store.
Args:
file_input (Union[file-like object, str]): An object with a .read() method or a filename (str).
embeddings (Embeddings): Embeddings model used for loading the vector store.
Returns:
dict: A dictionary containing a message indicating successful merging.
"""
# Determine if file_input is a filename (str) or a file-like object.
if isinstance(file_input, str):
tmp_filename = file_input
temp_created = False
else:
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(file_input.read())
tmp_filename = tmp.name
temp_created = True
try:
# Check if the file is a ZIP archive.
if zipfile.is_zipfile(tmp_filename):
with tempfile.TemporaryDirectory() as extract_dir:
with zipfile.ZipFile(tmp_filename, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
extracted_items = os.listdir(extract_dir)
if len(extracted_items) == 1:
potential_dir = os.path.join(extract_dir, extracted_items[0])
if os.path.isdir(potential_dir):
vectorstore_dir = potential_dir
else:
vectorstore_dir = extract_dir
else:
vectorstore_dir = extract_dir
source_store = FAISS.load_local(
vectorstore_dir, embeddings, allow_dangerous_deserialization=True
)
else:
source_store = FAISS.load_local(
tmp_filename, embeddings, allow_dangerous_deserialization=True
)
if not self.vectorstore:
raise ValueError("Vector store is not initialized. Please create or load a vector store first.")
self.vectorstore.merge_from(source_store)
print("Successfully merged the source vector store into the current vector store.")
except Exception as e:
raise Exception(f"Error merging vectorstore: {str(e)}")
finally:
if temp_created and os.path.exists(tmp_filename):
os.remove(tmp_filename)
return {"message": "Vector stores merged successfully"}
|