File size: 5,096 Bytes
28ec96b
e67b064
a2682b3
28ec96b
c8d57fb
e67b064
c8d57fb
 
a2682b3
 
 
 
 
 
 
e21244d
 
a2682b3
 
 
 
 
 
 
c8d57fb
c708265
e67b064
 
c8d57fb
c708265
 
 
e21244d
c708265
de7395d
e67b064
 
 
 
0424ce2
c8d57fb
 
 
e67b064
 
c708265
 
e67b064
e21244d
 
e67b064
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0424ce2
 
 
 
 
 
 
 
 
 
 
e67b064
 
 
 
 
 
0424ce2
e67b064
 
 
0424ce2
e67b064
 
 
 
 
0424ce2
e67b064
 
 
 
 
c708265
 
e67b064
0424ce2
c8d57fb
0424ce2
 
 
c708265
a2682b3
e67b064
c8d57fb
e67b064
 
 
 
 
 
 
 
 
 
 
0424ce2
c8d57fb
 
0424ce2
e67b064
c8d57fb
 
 
e67b064
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import datetime
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
from datetime import datetime as dt

logger = logging.getLogger(__name__)

class QueryProcessor:
    def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
        self.embedding_model = embedding_model
        self.summarization_model = summarization_model
        self.nlp_model = nlp_model
        self.db_service = db_service
        logger.info("QueryProcessor initialized")

    async def process(
        self,
        query: str,
        topic: Optional[str] = None,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None
    ) -> Dict[str, Any]:
        try:
            # Date handling
            start_dt = self._parse_date(start_date) if start_date else None
            end_dt = self._parse_date(end_date) if end_date else None
            
            # Query processing
            query_embedding = self.embedding_model.encode(query).tolist()
            entities = self.nlp_model.extract_entities(query)
            
            # Database search
            articles = await self._execute_semantic_search(
                query_embedding,
                start_dt,
                end_dt,
                topic,
                [ent[0] for ent in entities]
            )
            
            if not articles:
                return {"message": "No articles found", "articles": []}

            # Summary generation
            summary_data = self._generate_summary(articles)
            return {
                "summary": summary_data["summary"],
                "key_sentences": summary_data["key_sentences"],
                "articles": articles,
                "entities": entities
            }

        except Exception as e:
            logger.error(f"Processing failed: {str(e)}", exc_info=True)
            return {"error": str(e)}

    def _parse_date(self, date_str: str) -> dt:
        """Safe date parsing with validation"""
        try:
            return dt.strptime(date_str, "%Y-%m-%d")
        except ValueError as e:
            logger.error(f"Invalid date format: {date_str}")
            raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}")

    def _extract_entities_safely(self, text: str) -> List[Tuple[str, str]]:
        """Robust entity extraction handling both strings and lists"""
        try:
            if isinstance(text, list):
                logger.warning("Received list input for entity extraction, joining to string")
                text = " ".join(text)
            return self.nlp_model.extract_entities(text)
        except Exception as e:
            logger.error(f"Entity extraction failed: {str(e)}")
            return []

    async def _execute_semantic_search(
        self,
        query_embedding: List[float],
        start_date: Optional[dt],
        end_date: Optional[dt],
        topic: Optional[str],
        entities: List[Tuple[str, str]]
    ) -> List[Dict[str, Any]]:
        """Execute search with proper error handling"""
        try:
            entity_texts = [ent[0] for ent in entities]
            return await self.db_service.semantic_search(
                query_embedding=query_embedding,
                start_date=start_date,
                end_date=end_date,
                topic=topic,
                entities=entity_texts
            )
        except Exception as e:
            logger.error(f"Semantic search failed: {str(e)}")
            raise

    def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Generate summary from articles with fallback handling"""
        try:
            contents = [article["content"] for article in articles]
            sentences = []
            
            for content in contents:
                if content:
                    sentences.extend(self.nlp_model.tokenize_sentences(content))
            
            if not sentences:
                logger.warning("No sentences available for summarization")
                return {
                    "summary": "No content available for summarization",
                    "key_sentences": []
                }

            embeddings = self.embedding_model.encode(sentences)
            similarity_matrix = np.inner(embeddings, embeddings)
            centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
            
            top_indices = np.argsort(-centrality_scores)[:10]
            key_sentences = [sentences[idx].strip() for idx in top_indices]
            combined_text = ' '.join(key_sentences)
            
            return {
                "summary": self.summarization_model.summarize(combined_text),
                "key_sentences": key_sentences
            }

        except Exception as e:
            logger.error(f"Summary generation failed: {str(e)}")
            return {
                "summary": "Summary generation failed",
                "key_sentences": []
            }