|
import datetime |
|
from typing import List, Dict, Any, Optional, Tuple |
|
import numpy as np |
|
from models.LexRank import degree_centrality_scores |
|
import logging |
|
from datetime import datetime as dt |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class QueryProcessor: |
|
def __init__(self, embedding_model, summarization_model, nlp_model, db_service): |
|
self.embedding_model = embedding_model |
|
self.summarization_model = summarization_model |
|
self.nlp_model = nlp_model |
|
self.db_service = db_service |
|
logger.info("QueryProcessor initialized") |
|
|
|
async def process( |
|
self, |
|
query: str, |
|
topic: Optional[str] = None, |
|
start_date: Optional[str] = None, |
|
end_date: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
try: |
|
|
|
start_dt = self._parse_date(start_date) if start_date else None |
|
end_dt = self._parse_date(end_date) if end_date else None |
|
|
|
|
|
query_embedding = self.embedding_model.encode(query).tolist() |
|
entities = self.nlp_model.extract_entities(query) |
|
print(f"Extracted entities: {entities}") |
|
|
|
|
|
articles = await self._execute_semantic_search( |
|
query_embedding, |
|
start_dt, |
|
end_dt, |
|
topic, |
|
entities |
|
) |
|
|
|
if not articles: |
|
return {"message": "No articles found", "articles": []} |
|
|
|
|
|
print("Starting summary generation") |
|
summary_data = self._generate_summary(articles) |
|
return { |
|
"summary": summary_data["summary"], |
|
"key_sentences": summary_data["key_sentences"], |
|
"articles": articles, |
|
"entities": entities |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Processing failed: {str(e)}", exc_info=True) |
|
return {"error": str(e)} |
|
|
|
def _parse_date(self, date_str: str) -> dt: |
|
"""Safe date parsing with validation""" |
|
try: |
|
return dt.strptime(date_str, "%Y-%m-%d") |
|
except ValueError as e: |
|
logger.error(f"Invalid date format: {date_str}") |
|
raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}") |
|
|
|
def _extract_entities_safely(self, text: str) -> List[Tuple[str, str]]: |
|
"""Robust entity extraction handling both strings and lists""" |
|
try: |
|
if isinstance(text, list): |
|
logger.warning("Received list input for entity extraction, joining to string") |
|
text = " ".join(text) |
|
return self.nlp_model.extract_entities(text) |
|
except Exception as e: |
|
logger.error(f"Entity extraction failed: {str(e)}") |
|
return [] |
|
|
|
async def _execute_semantic_search( |
|
self, |
|
query_embedding: List[float], |
|
start_date: Optional[dt], |
|
end_date: Optional[dt], |
|
topic: Optional[str], |
|
entities: List[Tuple[str, str]] |
|
) -> List[Dict[str, Any]]: |
|
"""Execute search with proper error handling""" |
|
try: |
|
return await self.db_service.semantic_search( |
|
query_embedding=query_embedding, |
|
start_date=start_date, |
|
end_date=end_date, |
|
topic=topic, |
|
entities=entities |
|
) |
|
except Exception as e: |
|
logger.error(f"Semantic search failed: {str(e)}") |
|
raise |
|
|
|
def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
"""Generate summary from articles with fallback handling""" |
|
try: |
|
contents = [article["content"] for article in articles] |
|
sentences = [] |
|
|
|
for content in contents: |
|
if content: |
|
sentences.extend(self.nlp_model.tokenize_sentences(content)) |
|
|
|
if not sentences: |
|
logger.warning("No sentences available for summarization") |
|
return { |
|
"summary": "No content available for summarization", |
|
"key_sentences": [] |
|
} |
|
|
|
print("Starting first summary generation") |
|
embeddings = self.embedding_model.encode(sentences) |
|
print("Embeddings generated first summary") |
|
similarity_matrix = np.dot(embeddings, embeddings.T) / (np.linalg.norm(embeddings, axis=1, keepdims=True) * np.linalg.norm(embeddings, axis=1, keepdims=True).T) |
|
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None) |
|
|
|
top_indices = np.argsort(-centrality_scores)[:10] |
|
key_sentences = [sentences[idx].strip() for idx in top_indices] |
|
combined_text = ' '.join(key_sentences) |
|
|
|
print(f"First summary done with: {len(key_sentences)} sentences") |
|
print(combined_text) |
|
|
|
return { |
|
"summary": self.summarization_model.summarize(combined_text), |
|
"key_sentences": key_sentences |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Summary generation failed: {str(e)}") |
|
return { |
|
"summary": "Summary generation failed", |
|
"key_sentences": [] |
|
} |