File size: 9,286 Bytes
7b7cab6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import os
import uuid
import shutil
import tempfile
import zipfile

from faiss import IndexFlatL2

from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore


class VectorStoreManager:
    def __init__(self, embeddings=None):
        """
        Initializes the VectorStoreManager with a FAISS vector store.

        Args:
            embeddings (Embeddings, optional): Embeddings model used for the vector store.
        """
        self.vectorstore = None
        if embeddings:
            self.vectorstore = self.create_vectorstore(embeddings)

    def create_vectorstore(self, embeddings):
        """
        Creates and initializes a FAISS vector store.

        Args:
            embeddings (Embeddings): Embeddings model used for the vector store.

        Returns:
            FAISS: Initialized vector store.
        """
        # Define vector store dimensions based on embeddings
        dimensions = len(embeddings.embed_query("dummy"))
        
        # Initialize FAISS vector store
        vectorstore = FAISS(
            embedding_function=embeddings,
            index=IndexFlatL2(dimensions),
            docstore=InMemoryDocstore(),
            index_to_docstore_id={},
            normalize_L2=False
        )
        
        print("Created a new FAISS vector store.")
        return vectorstore

    def add_documents(self, documents):
        """
        Adds new documents to the FAISS vector store, each document with a unique UUID.

        Args:
            documents (list): List of Document objects to be added to the vector store.

        Returns:
            list: List of UUIDs corresponding to the added documents.
        """
        if not self.vectorstore:
            raise ValueError("Vector store is not initialized. Please create or load a vector store first.")

        uuids = [str(uuid.uuid4()) for _ in range(len(documents))]
        self.vectorstore.add_documents(documents=documents, ids=uuids)
        
        print(f"Added {len(documents)} documents to the vector store with IDs: {uuids}")
        return uuids

    def delete_documents(self, ids):
        """
        Deletes documents from the FAISS vector store using their unique IDs.

        Args:
            ids (list): List of UUIDs corresponding to the documents to be deleted.

        Returns:
            bool: True if the documents were successfully deleted, False otherwise.
        """
        if not self.vectorstore:
            raise ValueError("Vector store is not initialized. Please create or load a vector store first.")

        if not ids:
            print("No document IDs provided for deletion.")
            return False

        success = self.vectorstore.delete(ids=ids)
        if success:
            print(f"Successfully deleted documents with IDs: {ids}")
        else:
            print(f"Failed to delete documents with IDs: {ids}")
        return success

    def save(self, filename="faiss_index"):
        """
        Saves the current FAISS vector store locally. If the saved store is a directory,
        it compresses it into a ZIP archive.

        Args:
            filename (str): The filename or directory name where the vector store will be saved.

        Returns:
            dict: A dictionary with details about the saved file including file path and media type.
        """
        if not self.vectorstore:
            raise ValueError("Vector store is not initialized. Please create or load a vector store first.")

        # Save the vectorstore locally
        self.vectorstore.save_local(filename)
        print(f"Vector store saved to {filename}")

        if not os.path.exists(filename):
            raise FileNotFoundError("Saved vectorstore not found.")

        # If the saved vectorstore is a directory, compress it into a zip file.
        if os.path.isdir(filename):
            zip_filename = filename + ".zip"
            shutil.make_archive(filename, 'zip', filename)
            return {
                "file_path": zip_filename,
                "media_type": "application/zip",
                "serve_filename": os.path.basename(zip_filename),
                "original": filename,
            }
        else:
            return {
                "file_path": filename,
                "media_type": "application/octet-stream",
                "serve_filename": os.path.basename(filename),
                "original": filename,
            }

    @staticmethod
    def load(file_input, embeddings):
        """
        Loads a FAISS vector store from an uploaded file or a filename.
        If file_input is a file-like object, it is saved to a temporary file.
        If it's a string (filename), it is used directly.
        """
        # Check if file_input is a string (filename) or a file-like object.
        if isinstance(file_input, str):
            tmp_filename = file_input
        else:
            with tempfile.NamedTemporaryFile(delete=False) as tmp:
                tmp.write(file_input.read())
                tmp_filename = tmp.name

        try:
            if zipfile.is_zipfile(tmp_filename):
                with tempfile.TemporaryDirectory() as extract_dir:
                    with zipfile.ZipFile(tmp_filename, 'r') as zip_ref:
                        zip_ref.extractall(extract_dir)
                    extracted_items = os.listdir(extract_dir)
                    if len(extracted_items) == 1:
                        potential_dir = os.path.join(extract_dir, extracted_items[0])
                        if os.path.isdir(potential_dir):
                            vectorstore_dir = potential_dir
                        else:
                            vectorstore_dir = extract_dir
                    else:
                        vectorstore_dir = extract_dir

                    new_vectorstore = FAISS.load_local(vectorstore_dir, embeddings, allow_dangerous_deserialization=True)
                message = "Vector store loaded successfully from ZIP."
            else:
                new_vectorstore = FAISS.load_local(tmp_filename, embeddings, allow_dangerous_deserialization=True)
                message = "Vector store loaded successfully."
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Error loading vectorstore: {str(e)}")
        finally:
            # Only remove the temp file if we created it here (i.e. file_input was not a filename)
            if not isinstance(file_input, str) and os.path.exists(tmp_filename):
                os.remove(tmp_filename)

        instance = VectorStoreManager()
        instance.vectorstore = new_vectorstore
        print(message)
        return instance, message

    def merge(self, file_input, embeddings):
        """
        Merges an uploaded vector store file into the current FAISS vector store.

        Args:
            file_input (Union[file-like object, str]): An object with a .read() method or a filename (str).
            embeddings (Embeddings): Embeddings model used for loading the vector store.

        Returns:
            dict: A dictionary containing a message indicating successful merging.
        """
        # Determine if file_input is a filename (str) or a file-like object.
        if isinstance(file_input, str):
            tmp_filename = file_input
            temp_created = False
        else:
            with tempfile.NamedTemporaryFile(delete=False) as tmp:
                tmp.write(file_input.read())
                tmp_filename = tmp.name
            temp_created = True

        try:
            # Check if the file is a ZIP archive.
            if zipfile.is_zipfile(tmp_filename):
                with tempfile.TemporaryDirectory() as extract_dir:
                    with zipfile.ZipFile(tmp_filename, 'r') as zip_ref:
                        zip_ref.extractall(extract_dir)
                    extracted_items = os.listdir(extract_dir)
                    if len(extracted_items) == 1:
                        potential_dir = os.path.join(extract_dir, extracted_items[0])
                        if os.path.isdir(potential_dir):
                            vectorstore_dir = potential_dir
                        else:
                            vectorstore_dir = extract_dir
                    else:
                        vectorstore_dir = extract_dir

                    source_store = FAISS.load_local(
                        vectorstore_dir, embeddings, allow_dangerous_deserialization=True
                    )
            else:
                source_store = FAISS.load_local(
                    tmp_filename, embeddings, allow_dangerous_deserialization=True
                )

            if not self.vectorstore:
                raise ValueError("Vector store is not initialized. Please create or load a vector store first.")

            self.vectorstore.merge_from(source_store)
            print("Successfully merged the source vector store into the current vector store.")
        except Exception as e:
            raise Exception(f"Error merging vectorstore: {str(e)}")
        finally:
            if temp_created and os.path.exists(tmp_filename):
                os.remove(tmp_filename)
        return {"message": "Vector stores merged successfully"}