rag-youtube-assistant / app /data_processor.py
ganesh3's picture
Update app/data_processor.py
f62b0ff verified
from minsearch import Index
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import json
import logging
import re
from config import Config
from vector_store import get_vector_store
import sys
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stdout)
logger = logging.getLogger(__name__)
def clean_text(text):
if not isinstance(text, str):
logger.warning(f"Non-string input to clean_text: {type(text)}")
return ""
cleaned = re.sub(r'[^\w\s.,!?]', ' ', text)
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
return cleaned
class DataProcessor:
def __init__(self, text_fields=["content", "title", "description"],
keyword_fields=["video_id", "author", "upload_date"],
embedding_model=None):
self.text_fields = text_fields
self.keyword_fields = keyword_fields
self.all_fields = text_fields + keyword_fields
self.text_index = Index(text_fields=text_fields, keyword_fields=keyword_fields)
# Use appropriate model path based on environment
model_path = Config.get_model_path() if embedding_model is None else embedding_model
self.embedding_model = SentenceTransformer(model_path)
self.documents = []
self.embeddings = []
self.index_built = False
self.current_index_name = None
# Initialize vector store
VectorStore = get_vector_store(Config)
self.vector_store = VectorStore(self.embedding_model.get_sentence_embedding_dimension())
logger.info("Initialized FAISS vector store")
def process_transcript(self, video_id, transcript_data):
logger.info(f"Processing transcript for video {video_id}")
if not transcript_data:
logger.error(f"Transcript data is None for video {video_id}")
return None
if 'metadata' not in transcript_data or 'transcript' not in transcript_data:
logger.error(f"Invalid transcript data structure for video {video_id}")
logger.debug(f"Transcript data keys: {transcript_data.keys()}")
return None
metadata = transcript_data['metadata']
transcript = transcript_data['transcript']
logger.info(f"Number of transcript segments: {len(transcript)}")
full_transcript = " ".join([segment.get('text', '') for segment in transcript])
logger.debug(f"Full transcript length before cleaning: {len(full_transcript)}")
logger.debug(f"Full transcript sample before cleaning: '{full_transcript[:500]}...'")
cleaned_transcript = clean_text(full_transcript)
logger.debug(f"Cleaned transcript length: {len(cleaned_transcript)}")
logger.debug(f"Cleaned transcript sample: '{cleaned_transcript[:500]}...'")
if not cleaned_transcript:
logger.warning(f"Empty cleaned transcript for video {video_id}")
return None
doc = {
"video_id": video_id,
"content": cleaned_transcript,
"title": clean_text(metadata.get('title', '')),
"description": clean_text(metadata.get('description', 'Not Available')),
"author": metadata.get('author', ''),
"upload_date": metadata.get('upload_date', ''),
"segment_id": f"{video_id}_full",
"view_count": metadata.get('view_count', 0),
"like_count": metadata.get('like_count', 0),
"comment_count": metadata.get('comment_count', 0),
"video_duration": metadata.get('duration', '')
}
logger.debug(f"Document created for video {video_id}")
for field in self.all_fields:
logger.debug(f"Document {field} length: {len(str(doc.get(field, '')))}")
logger.debug(f"Document {field} sample: '{str(doc.get(field, ''))[:100]}...'")
self.documents.append(doc)
embedding = self.embedding_model.encode(cleaned_transcript + " " + metadata.get('title', ''))
self.embeddings.append(embedding)
logger.info(f"Processed transcript for video {video_id}")
# Return a dictionary with the processed content and other relevant information
return {
'content': cleaned_transcript,
'metadata': metadata,
'index_name': f"video_{video_id}_{self.embedding_model.get_sentence_embedding_dimension()}"
}
def build_index(self, index_name):
if not self.documents:
logger.error("No documents to index")
return None
logger.info(f"Building index with {len(self.documents)} documents")
# Fields to include in the fit function
index_fields = self.text_fields + self.keyword_fields
# Create a list of dictionaries with only the fields we want to index
docs_to_index = []
for doc in self.documents:
indexed_doc = {field: doc.get(field, '') for field in index_fields}
if all(indexed_doc.values()): # Check if all required fields have values
docs_to_index.append(indexed_doc)
else:
missing_fields = [field for field, value in indexed_doc.items() if not value]
logger.warning(f"Document with video_id {doc.get('video_id', 'unknown')} is missing values for fields: {missing_fields}")
if not docs_to_index:
logger.error("No valid documents to index")
return None
logger.info(f"Number of valid documents to index: {len(docs_to_index)}")
# Log the structure of the first document to be indexed
logger.debug("Structure of the first document to be indexed:")
logger.debug(json.dumps(docs_to_index[0], indent=2))
try:
logger.info("Fitting text index")
self.text_index.fit(docs_to_index)
self.index_built = True
logger.info("Text index built successfully")
except Exception as e:
logger.error(f"Error building text index: {str(e)}")
raise
try:
if not self.es.indices.exists(index=index_name):
self.es.indices.create(index=index_name, body={
"mappings": {
"properties": {
"embedding": {"type": "dense_vector", "dims": len(self.embeddings[0]), "index": True, "similarity": "cosine"},
"content": {"type": "text"},
"title": {"type": "text"},
"description": {"type": "text"},
"video_id": {"type": "keyword"},
"author": {"type": "keyword"},
"upload_date": {"type": "date"},
"segment_id": {"type": "keyword"},
"view_count": {"type": "integer"},
"like_count": {"type": "integer"},
"comment_count": {"type": "integer"},
"video_duration": {"type": "text"}
}
}
})
logger.info(f"Created Elasticsearch index: {index_name}")
for doc, embedding in zip(self.documents, self.embeddings):
doc_with_embedding = doc.copy()
doc_with_embedding['embedding'] = embedding.tolist()
self.es.index(index=index_name, body=doc_with_embedding, id=doc['segment_id'])
logger.info(f"Successfully indexed {len(self.documents)} documents in Elasticsearch")
self.current_index_name = index_name
return index_name
except Exception as e:
logger.error(f"Error building Elasticsearch index: {str(e)}")
raise
def compute_rrf(self, rank, k=60):
return 1 / (k + rank)
def hybrid_search(self, query, index_name, num_results=5):
if not index_name:
logger.error("No index name provided for hybrid search.")
raise ValueError("No index name provided for hybrid search.")
vector = self.embedding_model.encode(query)
knn_query = {
"field": "embedding",
"query_vector": vector.tolist(),
"k": 10,
"num_candidates": 100
}
keyword_query = {
"multi_match": {
"query": query,
"fields": self.text_fields
}
}
try:
knn_results = self.es.search(
index=index_name,
body={
"knn": knn_query,
"size": 10
}
)['hits']['hits']
keyword_results = self.es.search(
index=index_name,
body={
"query": keyword_query,
"size": 10
}
)['hits']['hits']
rrf_scores = {}
for rank, hit in enumerate(knn_results):
doc_id = hit['_id']
rrf_scores[doc_id] = self.compute_rrf(rank + 1)
for rank, hit in enumerate(keyword_results):
doc_id = hit['_id']
if doc_id in rrf_scores:
rrf_scores[doc_id] += self.compute_rrf(rank + 1)
else:
rrf_scores[doc_id] = self.compute_rrf(rank + 1)
reranked_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
final_results = []
for doc_id, score in reranked_docs[:num_results]:
doc = self.es.get(index=index_name, id=doc_id)
final_results.append(doc['_source'])
return final_results
except Exception as e:
logger.error(f"Error in hybrid search: {str(e)}")
raise
def search(self, query, filter_dict={}, boost_dict={}, num_results=10, method='hybrid', index_name=None):
if not index_name:
logger.error("No index name provided for search.")
raise ValueError("No index name provided for search.")
if not self.es.indices.exists(index=index_name):
logger.error(f"Index {index_name} does not exist.")
raise ValueError(f"Index {index_name} does not exist.")
logger.info(f"Performing {method} search for query: {query} in index: {index_name}")
try:
if method == 'text':
return self.text_search(query, filter_dict, boost_dict, num_results, index_name)
elif method == 'embedding':
return self.embedding_search(query, num_results, index_name)
else: # hybrid search
return self.hybrid_search(query, index_name, num_results)
except Exception as e:
logger.error(f"Error in search method {method}: {str(e)}")
raise
def text_search(self, query, filter_dict={}, boost_dict={}, num_results=10, index_name=None):
if not index_name:
logger.error("No index name provided for text search.")
raise ValueError("No index name provided for text search.")
try:
search_body = {
"query": {
"multi_match": {
"query": query,
"fields": self.text_fields
}
},
"size": num_results
}
response = self.es.search(index=index_name, body=search_body)
return [hit['_source'] for hit in response['hits']['hits']]
except Exception as e:
logger.error(f"Error in text search: {str(e)}")
raise
def embedding_search(self, query, num_results=10, index_name=None):
if not index_name:
logger.error("No index name provided for embedding search.")
raise ValueError("No index name provided for embedding search.")
try:
query_vector = self.embedding_model.encode(query).tolist()
script_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": query_vector}
}
}
}
response = self.es.search(
index=index_name,
body={
"size": num_results,
"query": script_query,
"_source": {"excludes": ["embedding"]}
}
)
return [hit['_source'] for hit in response['hits']['hits']]
except Exception as e:
logger.error(f"Error in embedding search: {str(e)}")
raise
def set_embedding_model(self, model_name):
self.embedding_model = SentenceTransformer(model_name)
logger.info(f"Embedding model set to: {model_name}")