File size: 13,280 Bytes
dbd33b2
 
 
 
507c938
25b2b2b
 
42ef936
 
f62b0ff
507c938
42ef936
507c938
dbd33b2
 
507c938
 
 
 
 
 
dbd33b2
 
 
42ef936
 
25b2b2b
 
 
dbd33b2
42ef936
 
 
 
 
dbd33b2
 
507c938
 
dbd33b2
42ef936
 
 
 
 
dbd33b2
25b2b2b
 
 
 
 
 
 
 
 
507c938
 
dbd33b2
 
 
507c938
 
 
25b2b2b
 
 
507c938
25b2b2b
 
507c938
 
 
 
 
 
 
 
 
25b2b2b
507c938
 
25b2b2b
507c938
 
 
 
 
25b2b2b
 
 
 
 
 
507c938
25b2b2b
 
507c938
 
a61b32e
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
25b2b2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
507c938
25b2b2b
 
507c938
 
 
 
 
 
 
 
 
 
 
25b2b2b
507c938
 
25b2b2b
 
507c938
 
25b2b2b
507c938
 
 
 
 
dbd33b2
507c938
 
 
 
 
 
 
 
 
 
 
 
 
 
25b2b2b
 
 
dbd33b2
25b2b2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
 
 
 
25b2b2b
 
 
 
 
 
 
 
 
 
dbd33b2
507c938
 
 
 
 
25b2b2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
25b2b2b
 
 
 
 
 
 
 
 
dbd33b2
 
25b2b2b
 
 
 
 
 
 
 
 
 
 
 
 
507c938
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
from minsearch import Index
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import json
import logging
import re
from config import Config
from vector_store import get_vector_store
import sys

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout)
logger = logging.getLogger(__name__)

def clean_text(text):
    if not isinstance(text, str):
        logger.warning(f"Non-string input to clean_text: {type(text)}")
        return ""
    cleaned = re.sub(r'[^\w\s.,!?]', ' ', text)
    cleaned = re.sub(r'\s+', ' ', cleaned).strip()
    return cleaned

class DataProcessor:
    def __init__(self, text_fields=["content", "title", "description"], 
                 keyword_fields=["video_id", "author", "upload_date"],
                 embedding_model=None):
        self.text_fields = text_fields
        self.keyword_fields = keyword_fields
        self.all_fields = text_fields + keyword_fields
        self.text_index = Index(text_fields=text_fields, keyword_fields=keyword_fields)
        
        # Use appropriate model path based on environment
        model_path = Config.get_model_path() if embedding_model is None else embedding_model
        self.embedding_model = SentenceTransformer(model_path)
        
        self.documents = []
        self.embeddings = []
        self.index_built = False
        self.current_index_name = None

        # Initialize vector store
        VectorStore = get_vector_store(Config)
        self.vector_store = VectorStore(self.embedding_model.get_sentence_embedding_dimension())
        logger.info("Initialized FAISS vector store")
        
    def process_transcript(self, video_id, transcript_data):
        logger.info(f"Processing transcript for video {video_id}")
        
        if not transcript_data:
            logger.error(f"Transcript data is None for video {video_id}")
            return None
        
        if 'metadata' not in transcript_data or 'transcript' not in transcript_data:
            logger.error(f"Invalid transcript data structure for video {video_id}")
            logger.debug(f"Transcript data keys: {transcript_data.keys()}")
            return None

        metadata = transcript_data['metadata']
        transcript = transcript_data['transcript']

        logger.info(f"Number of transcript segments: {len(transcript)}")

        full_transcript = " ".join([segment.get('text', '') for segment in transcript])
        logger.debug(f"Full transcript length before cleaning: {len(full_transcript)}")
        logger.debug(f"Full transcript sample before cleaning: '{full_transcript[:500]}...'")

        cleaned_transcript = clean_text(full_transcript)
        logger.debug(f"Cleaned transcript length: {len(cleaned_transcript)}")
        logger.debug(f"Cleaned transcript sample: '{cleaned_transcript[:500]}...'")

        if not cleaned_transcript:
            logger.warning(f"Empty cleaned transcript for video {video_id}")
            return None

        doc = {
            "video_id": video_id,
            "content": cleaned_transcript,
            "title": clean_text(metadata.get('title', '')),
            "description": clean_text(metadata.get('description', 'Not Available')),
            "author": metadata.get('author', ''),
            "upload_date": metadata.get('upload_date', ''),
            "segment_id": f"{video_id}_full",
            "view_count": metadata.get('view_count', 0),
            "like_count": metadata.get('like_count', 0),
            "comment_count": metadata.get('comment_count', 0),
            "video_duration": metadata.get('duration', '')
        }
        
        logger.debug(f"Document created for video {video_id}")
        for field in self.all_fields:
            logger.debug(f"Document {field} length: {len(str(doc.get(field, '')))}")
            logger.debug(f"Document {field} sample: '{str(doc.get(field, ''))[:100]}...'")

        self.documents.append(doc)
        embedding = self.embedding_model.encode(cleaned_transcript + " " + metadata.get('title', ''))
        self.embeddings.append(embedding)

        logger.info(f"Processed transcript for video {video_id}")
        
        # Return a dictionary with the processed content and other relevant information
        return {
            'content': cleaned_transcript,
            'metadata': metadata,
            'index_name': f"video_{video_id}_{self.embedding_model.get_sentence_embedding_dimension()}"
        }

    def build_index(self, index_name):
        if not self.documents:
            logger.error("No documents to index")
            return None

        logger.info(f"Building index with {len(self.documents)} documents")
        
        # Fields to include in the fit function
        index_fields = self.text_fields + self.keyword_fields
        
        # Create a list of dictionaries with only the fields we want to index
        docs_to_index = []
        for doc in self.documents:
            indexed_doc = {field: doc.get(field, '') for field in index_fields}
            if all(indexed_doc.values()):  # Check if all required fields have values
                docs_to_index.append(indexed_doc)
            else:
                missing_fields = [field for field, value in indexed_doc.items() if not value]
                logger.warning(f"Document with video_id {doc.get('video_id', 'unknown')} is missing values for fields: {missing_fields}")

        if not docs_to_index:
            logger.error("No valid documents to index")
            return None

        logger.info(f"Number of valid documents to index: {len(docs_to_index)}")

        # Log the structure of the first document to be indexed
        logger.debug("Structure of the first document to be indexed:")
        logger.debug(json.dumps(docs_to_index[0], indent=2))

        try:
            logger.info("Fitting text index")
            self.text_index.fit(docs_to_index)
            self.index_built = True
            logger.info("Text index built successfully")
        except Exception as e:
            logger.error(f"Error building text index: {str(e)}")
            raise

        try:
            if not self.es.indices.exists(index=index_name):
                self.es.indices.create(index=index_name, body={
                    "mappings": {
                        "properties": {
                            "embedding": {"type": "dense_vector", "dims": len(self.embeddings[0]), "index": True, "similarity": "cosine"},
                            "content": {"type": "text"},
                            "title": {"type": "text"},
                            "description": {"type": "text"},
                            "video_id": {"type": "keyword"},
                            "author": {"type": "keyword"},
                            "upload_date": {"type": "date"},
                            "segment_id": {"type": "keyword"},
                            "view_count": {"type": "integer"},
                            "like_count": {"type": "integer"},
                            "comment_count": {"type": "integer"},
                            "video_duration": {"type": "text"}
                        }
                    }
                })
                logger.info(f"Created Elasticsearch index: {index_name}")

            for doc, embedding in zip(self.documents, self.embeddings):
                doc_with_embedding = doc.copy()
                doc_with_embedding['embedding'] = embedding.tolist()
                self.es.index(index=index_name, body=doc_with_embedding, id=doc['segment_id'])
            
            logger.info(f"Successfully indexed {len(self.documents)} documents in Elasticsearch")
            self.current_index_name = index_name
            return index_name
        except Exception as e:
            logger.error(f"Error building Elasticsearch index: {str(e)}")
            raise
    
    def compute_rrf(self, rank, k=60):
        return 1 / (k + rank)

    def hybrid_search(self, query, index_name, num_results=5):
        if not index_name:
            logger.error("No index name provided for hybrid search.")
            raise ValueError("No index name provided for hybrid search.")
        
        vector = self.embedding_model.encode(query)
        
        knn_query = {
            "field": "embedding",
            "query_vector": vector.tolist(),
            "k": 10,
            "num_candidates": 100
        }

        keyword_query = {
            "multi_match": {
                "query": query,
                "fields": self.text_fields
            }
        }

        try:
            knn_results = self.es.search(
                index=index_name, 
                body={
                    "knn": knn_query, 
                    "size": 10
                }
            )['hits']['hits']
            
            keyword_results = self.es.search(
                index=index_name, 
                body={
                    "query": keyword_query, 
                    "size": 10
                }
            )['hits']['hits']
            
            rrf_scores = {}
            for rank, hit in enumerate(knn_results):
                doc_id = hit['_id']
                rrf_scores[doc_id] = self.compute_rrf(rank + 1)

            for rank, hit in enumerate(keyword_results):
                doc_id = hit['_id']
                if doc_id in rrf_scores:
                    rrf_scores[doc_id] += self.compute_rrf(rank + 1)
                else:
                    rrf_scores[doc_id] = self.compute_rrf(rank + 1)

            reranked_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
            
            final_results = []
            for doc_id, score in reranked_docs[:num_results]:
                doc = self.es.get(index=index_name, id=doc_id)
                final_results.append(doc['_source'])
            
            return final_results
        except Exception as e:
            logger.error(f"Error in hybrid search: {str(e)}")
            raise

    def search(self, query, filter_dict={}, boost_dict={}, num_results=10, method='hybrid', index_name=None):
        if not index_name:
            logger.error("No index name provided for search.")
            raise ValueError("No index name provided for search.")
        
        if not self.es.indices.exists(index=index_name):
            logger.error(f"Index {index_name} does not exist.")
            raise ValueError(f"Index {index_name} does not exist.")
        
        logger.info(f"Performing {method} search for query: {query} in index: {index_name}")
        
        try:
            if method == 'text':
                return self.text_search(query, filter_dict, boost_dict, num_results, index_name)
            elif method == 'embedding':
                return self.embedding_search(query, num_results, index_name)
            else:  # hybrid search
                return self.hybrid_search(query, index_name, num_results)
        except Exception as e:
            logger.error(f"Error in search method {method}: {str(e)}")
            raise

    def text_search(self, query, filter_dict={}, boost_dict={}, num_results=10, index_name=None):
        if not index_name:
            logger.error("No index name provided for text search.")
            raise ValueError("No index name provided for text search.")
        
        try:
            search_body = {
                "query": {
                    "multi_match": {
                        "query": query,
                        "fields": self.text_fields
                    }
                },
                "size": num_results
            }
            response = self.es.search(index=index_name, body=search_body)
            return [hit['_source'] for hit in response['hits']['hits']]
        except Exception as e:
            logger.error(f"Error in text search: {str(e)}")
            raise

    def embedding_search(self, query, num_results=10, index_name=None):
        if not index_name:
            logger.error("No index name provided for embedding search.")
            raise ValueError("No index name provided for embedding search.")
        
        try:
            query_vector = self.embedding_model.encode(query).tolist()
            script_query = {
                "script_score": {
                    "query": {"match_all": {}},
                    "script": {
                        "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
                        "params": {"query_vector": query_vector}
                    }
                }
            }
            response = self.es.search(
                index=index_name,
                body={
                    "size": num_results,
                    "query": script_query,
                    "_source": {"excludes": ["embedding"]}
                }
            )
            return [hit['_source'] for hit in response['hits']['hits']]
        except Exception as e:
            logger.error(f"Error in embedding search: {str(e)}")
            raise
    
    def set_embedding_model(self, model_name):
        self.embedding_model = SentenceTransformer(model_name)
        logger.info(f"Embedding model set to: {model_name}")