Spaces:
Running
Running
File size: 13,280 Bytes
dbd33b2 507c938 25b2b2b 42ef936 f62b0ff 507c938 42ef936 507c938 dbd33b2 507c938 dbd33b2 42ef936 25b2b2b dbd33b2 42ef936 dbd33b2 507c938 dbd33b2 42ef936 dbd33b2 25b2b2b 507c938 dbd33b2 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 a61b32e dbd33b2 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 25b2b2b 507c938 dbd33b2 507c938 25b2b2b dbd33b2 25b2b2b dbd33b2 507c938 25b2b2b dbd33b2 507c938 25b2b2b dbd33b2 507c938 25b2b2b dbd33b2 25b2b2b 507c938 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
from minsearch import Index
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import json
import logging
import re
from config import Config
from vector_store import get_vector_store
import sys
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout)
logger = logging.getLogger(__name__)
def clean_text(text):
if not isinstance(text, str):
logger.warning(f"Non-string input to clean_text: {type(text)}")
return ""
cleaned = re.sub(r'[^\w\s.,!?]', ' ', text)
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
return cleaned
class DataProcessor:
def __init__(self, text_fields=["content", "title", "description"],
keyword_fields=["video_id", "author", "upload_date"],
embedding_model=None):
self.text_fields = text_fields
self.keyword_fields = keyword_fields
self.all_fields = text_fields + keyword_fields
self.text_index = Index(text_fields=text_fields, keyword_fields=keyword_fields)
# Use appropriate model path based on environment
model_path = Config.get_model_path() if embedding_model is None else embedding_model
self.embedding_model = SentenceTransformer(model_path)
self.documents = []
self.embeddings = []
self.index_built = False
self.current_index_name = None
# Initialize vector store
VectorStore = get_vector_store(Config)
self.vector_store = VectorStore(self.embedding_model.get_sentence_embedding_dimension())
logger.info("Initialized FAISS vector store")
def process_transcript(self, video_id, transcript_data):
logger.info(f"Processing transcript for video {video_id}")
if not transcript_data:
logger.error(f"Transcript data is None for video {video_id}")
return None
if 'metadata' not in transcript_data or 'transcript' not in transcript_data:
logger.error(f"Invalid transcript data structure for video {video_id}")
logger.debug(f"Transcript data keys: {transcript_data.keys()}")
return None
metadata = transcript_data['metadata']
transcript = transcript_data['transcript']
logger.info(f"Number of transcript segments: {len(transcript)}")
full_transcript = " ".join([segment.get('text', '') for segment in transcript])
logger.debug(f"Full transcript length before cleaning: {len(full_transcript)}")
logger.debug(f"Full transcript sample before cleaning: '{full_transcript[:500]}...'")
cleaned_transcript = clean_text(full_transcript)
logger.debug(f"Cleaned transcript length: {len(cleaned_transcript)}")
logger.debug(f"Cleaned transcript sample: '{cleaned_transcript[:500]}...'")
if not cleaned_transcript:
logger.warning(f"Empty cleaned transcript for video {video_id}")
return None
doc = {
"video_id": video_id,
"content": cleaned_transcript,
"title": clean_text(metadata.get('title', '')),
"description": clean_text(metadata.get('description', 'Not Available')),
"author": metadata.get('author', ''),
"upload_date": metadata.get('upload_date', ''),
"segment_id": f"{video_id}_full",
"view_count": metadata.get('view_count', 0),
"like_count": metadata.get('like_count', 0),
"comment_count": metadata.get('comment_count', 0),
"video_duration": metadata.get('duration', '')
}
logger.debug(f"Document created for video {video_id}")
for field in self.all_fields:
logger.debug(f"Document {field} length: {len(str(doc.get(field, '')))}")
logger.debug(f"Document {field} sample: '{str(doc.get(field, ''))[:100]}...'")
self.documents.append(doc)
embedding = self.embedding_model.encode(cleaned_transcript + " " + metadata.get('title', ''))
self.embeddings.append(embedding)
logger.info(f"Processed transcript for video {video_id}")
# Return a dictionary with the processed content and other relevant information
return {
'content': cleaned_transcript,
'metadata': metadata,
'index_name': f"video_{video_id}_{self.embedding_model.get_sentence_embedding_dimension()}"
}
def build_index(self, index_name):
if not self.documents:
logger.error("No documents to index")
return None
logger.info(f"Building index with {len(self.documents)} documents")
# Fields to include in the fit function
index_fields = self.text_fields + self.keyword_fields
# Create a list of dictionaries with only the fields we want to index
docs_to_index = []
for doc in self.documents:
indexed_doc = {field: doc.get(field, '') for field in index_fields}
if all(indexed_doc.values()): # Check if all required fields have values
docs_to_index.append(indexed_doc)
else:
missing_fields = [field for field, value in indexed_doc.items() if not value]
logger.warning(f"Document with video_id {doc.get('video_id', 'unknown')} is missing values for fields: {missing_fields}")
if not docs_to_index:
logger.error("No valid documents to index")
return None
logger.info(f"Number of valid documents to index: {len(docs_to_index)}")
# Log the structure of the first document to be indexed
logger.debug("Structure of the first document to be indexed:")
logger.debug(json.dumps(docs_to_index[0], indent=2))
try:
logger.info("Fitting text index")
self.text_index.fit(docs_to_index)
self.index_built = True
logger.info("Text index built successfully")
except Exception as e:
logger.error(f"Error building text index: {str(e)}")
raise
try:
if not self.es.indices.exists(index=index_name):
self.es.indices.create(index=index_name, body={
"mappings": {
"properties": {
"embedding": {"type": "dense_vector", "dims": len(self.embeddings[0]), "index": True, "similarity": "cosine"},
"content": {"type": "text"},
"title": {"type": "text"},
"description": {"type": "text"},
"video_id": {"type": "keyword"},
"author": {"type": "keyword"},
"upload_date": {"type": "date"},
"segment_id": {"type": "keyword"},
"view_count": {"type": "integer"},
"like_count": {"type": "integer"},
"comment_count": {"type": "integer"},
"video_duration": {"type": "text"}
}
}
})
logger.info(f"Created Elasticsearch index: {index_name}")
for doc, embedding in zip(self.documents, self.embeddings):
doc_with_embedding = doc.copy()
doc_with_embedding['embedding'] = embedding.tolist()
self.es.index(index=index_name, body=doc_with_embedding, id=doc['segment_id'])
logger.info(f"Successfully indexed {len(self.documents)} documents in Elasticsearch")
self.current_index_name = index_name
return index_name
except Exception as e:
logger.error(f"Error building Elasticsearch index: {str(e)}")
raise
def compute_rrf(self, rank, k=60):
return 1 / (k + rank)
def hybrid_search(self, query, index_name, num_results=5):
if not index_name:
logger.error("No index name provided for hybrid search.")
raise ValueError("No index name provided for hybrid search.")
vector = self.embedding_model.encode(query)
knn_query = {
"field": "embedding",
"query_vector": vector.tolist(),
"k": 10,
"num_candidates": 100
}
keyword_query = {
"multi_match": {
"query": query,
"fields": self.text_fields
}
}
try:
knn_results = self.es.search(
index=index_name,
body={
"knn": knn_query,
"size": 10
}
)['hits']['hits']
keyword_results = self.es.search(
index=index_name,
body={
"query": keyword_query,
"size": 10
}
)['hits']['hits']
rrf_scores = {}
for rank, hit in enumerate(knn_results):
doc_id = hit['_id']
rrf_scores[doc_id] = self.compute_rrf(rank + 1)
for rank, hit in enumerate(keyword_results):
doc_id = hit['_id']
if doc_id in rrf_scores:
rrf_scores[doc_id] += self.compute_rrf(rank + 1)
else:
rrf_scores[doc_id] = self.compute_rrf(rank + 1)
reranked_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
final_results = []
for doc_id, score in reranked_docs[:num_results]:
doc = self.es.get(index=index_name, id=doc_id)
final_results.append(doc['_source'])
return final_results
except Exception as e:
logger.error(f"Error in hybrid search: {str(e)}")
raise
def search(self, query, filter_dict={}, boost_dict={}, num_results=10, method='hybrid', index_name=None):
if not index_name:
logger.error("No index name provided for search.")
raise ValueError("No index name provided for search.")
if not self.es.indices.exists(index=index_name):
logger.error(f"Index {index_name} does not exist.")
raise ValueError(f"Index {index_name} does not exist.")
logger.info(f"Performing {method} search for query: {query} in index: {index_name}")
try:
if method == 'text':
return self.text_search(query, filter_dict, boost_dict, num_results, index_name)
elif method == 'embedding':
return self.embedding_search(query, num_results, index_name)
else: # hybrid search
return self.hybrid_search(query, index_name, num_results)
except Exception as e:
logger.error(f"Error in search method {method}: {str(e)}")
raise
def text_search(self, query, filter_dict={}, boost_dict={}, num_results=10, index_name=None):
if not index_name:
logger.error("No index name provided for text search.")
raise ValueError("No index name provided for text search.")
try:
search_body = {
"query": {
"multi_match": {
"query": query,
"fields": self.text_fields
}
},
"size": num_results
}
response = self.es.search(index=index_name, body=search_body)
return [hit['_source'] for hit in response['hits']['hits']]
except Exception as e:
logger.error(f"Error in text search: {str(e)}")
raise
def embedding_search(self, query, num_results=10, index_name=None):
if not index_name:
logger.error("No index name provided for embedding search.")
raise ValueError("No index name provided for embedding search.")
try:
query_vector = self.embedding_model.encode(query).tolist()
script_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": query_vector}
}
}
}
response = self.es.search(
index=index_name,
body={
"size": num_results,
"query": script_query,
"_source": {"excludes": ["embedding"]}
}
)
return [hit['_source'] for hit in response['hits']['hits']]
except Exception as e:
logger.error(f"Error in embedding search: {str(e)}")
raise
def set_embedding_model(self, model_name):
self.embedding_model = SentenceTransformer(model_name)
logger.info(f"Embedding model set to: {model_name}") |