File size: 5,461 Bytes
28ec96b e67b064 a2682b3 28ec96b c8d57fb e67b064 c8d57fb a2682b3 e21244d a2682b3 c8d57fb c708265 e67b064 c8d57fb c708265 a86dbdc e21244d c708265 de7395d e67b064 ba99a45 c8d57fb e67b064 c708265 a86dbdc c708265 e67b064 e21244d e67b064 0424ce2 e67b064 0424ce2 e67b064 9cb6544 e67b064 c708265 e67b064 0424ce2 c8d57fb 0424ce2 c708265 a2682b3 e67b064 c8d57fb e67b064 a86dbdc e67b064 a86dbdc a988661 e67b064 0424ce2 c8d57fb a86dbdc a4880e3 a86dbdc c8d57fb 0424ce2 e67b064 c8d57fb e67b064 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import datetime
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
from datetime import datetime as dt
logger = logging.getLogger(__name__)
class QueryProcessor:
def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
self.embedding_model = embedding_model
self.summarization_model = summarization_model
self.nlp_model = nlp_model
self.db_service = db_service
logger.info("QueryProcessor initialized")
async def process(
self,
query: str,
topic: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
try:
# Date handling
start_dt = self._parse_date(start_date) if start_date else None
end_dt = self._parse_date(end_date) if end_date else None
# Query processing
query_embedding = self.embedding_model.encode(query).tolist()
entities = self.nlp_model.extract_entities(query)
print(f"Extracted entities: {entities}")
# Database search
articles = await self._execute_semantic_search(
query_embedding,
start_dt,
end_dt,
topic,
entities
)
if not articles:
return {"message": "No articles found", "articles": []}
# Summary generation
print("Starting summary generation")
summary_data = self._generate_summary(articles)
return {
"summary": summary_data["summary"],
"key_sentences": summary_data["key_sentences"],
"articles": articles,
"entities": entities
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return {"error": str(e)}
def _parse_date(self, date_str: str) -> dt:
"""Safe date parsing with validation"""
try:
return dt.strptime(date_str, "%Y-%m-%d")
except ValueError as e:
logger.error(f"Invalid date format: {date_str}")
raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}")
def _extract_entities_safely(self, text: str) -> List[Tuple[str, str]]:
"""Robust entity extraction handling both strings and lists"""
try:
if isinstance(text, list):
logger.warning("Received list input for entity extraction, joining to string")
text = " ".join(text)
return self.nlp_model.extract_entities(text)
except Exception as e:
logger.error(f"Entity extraction failed: {str(e)}")
return []
async def _execute_semantic_search(
self,
query_embedding: List[float],
start_date: Optional[dt],
end_date: Optional[dt],
topic: Optional[str],
entities: List[Tuple[str, str]]
) -> List[Dict[str, Any]]:
"""Execute search with proper error handling"""
try:
return await self.db_service.semantic_search(
query_embedding=query_embedding,
start_date=start_date,
end_date=end_date,
topic=topic,
entities=entities
)
except Exception as e:
logger.error(f"Semantic search failed: {str(e)}")
raise
def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate summary from articles with fallback handling"""
try:
contents = [article["content"] for article in articles]
sentences = []
for content in contents:
if content:
sentences.extend(self.nlp_model.tokenize_sentences(content))
if not sentences:
logger.warning("No sentences available for summarization")
return {
"summary": "No content available for summarization",
"key_sentences": []
}
print("Starting first summary generation")
embeddings = self.embedding_model.encode(sentences)
print("Embeddings generated first summary")
similarity_matrix = np.dot(embeddings, embeddings.T) / (np.linalg.norm(embeddings, axis=1, keepdims=True) * np.linalg.norm(embeddings, axis=1, keepdims=True).T)
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
top_indices = np.argsort(-centrality_scores)[:10]
key_sentences = [sentences[idx].strip() for idx in top_indices]
combined_text = ' '.join(key_sentences)
print(f"First summary done with: {len(key_sentences)} sentences")
print(combined_text)
return {
"summary": self.summarization_model.summarize(combined_text),
"key_sentences": key_sentences
}
except Exception as e:
logger.error(f"Summary generation failed: {str(e)}")
return {
"summary": "Summary generation failed",
"key_sentences": []
} |