File size: 11,371 Bytes
7de43ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# agents/text_analysis_agent.py
import logging
import os
from typing import Dict, List, Optional, Tuple, Union, Any
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class TextAnalysisAgent:
    def __init__(self, text_model_manager, summary_model_manager=None, 

                 token_manager=None, cache_manager=None, metrics_calculator=None):
        """Initialize the TextAnalysisAgent with required model managers and utilities."""
        self.logger = logging.getLogger(__name__)
        self.text_model_manager = text_model_manager
        self.summary_model_manager = summary_model_manager
        self.token_manager = token_manager
        self.cache_manager = cache_manager
        self.metrics_calculator = metrics_calculator
        
        # Default relevance threshold
        self.relevance_threshold = 0.5
        
        # Default confidence values
        self.confidence_high_threshold = 0.7
        self.confidence_low_threshold = 0.3
        
        # Agent name for logging
        self.agent_name = "text_analysis_agent"
        
    def read_text_file(self, file_path: str) -> Tuple[str, bool]:
        """

        Read a text file and return its content.

        Returns a tuple of (content, success).

        """
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
            return content, True
        except UnicodeDecodeError:
            # Try with different encoding
            try:
                with open(file_path, 'r', encoding='latin-1') as file:
                    content = file.read()
                return content, True
            except Exception as e:
                self.logger.error(f"Failed to read file with latin-1 encoding: {e}")
                return f"Error reading file: {str(e)}", False
        except Exception as e:
            self.logger.error(f"Failed to read file: {e}")
            return f"Error reading file: {str(e)}", False
    
    def analyze_topic_relevance(self, topic: str, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """

        Determine which documents are relevant to the user's topic.

        Adds relevance scores and updates the documents list in-place.

        Returns the updated documents list.

        """
        self.logger.info(f"Analyzing relevance of {len(documents)} documents to topic: {topic}")
        
        # Extract document texts
        doc_texts = [doc.get("content", "") for doc in documents]
        
        # Check if we have valid texts
        valid_docs = []
        valid_indices = []
        for i, text in enumerate(doc_texts):
            if isinstance(text, str) and text and not text.startswith("Error"):
                valid_docs.append(text)
                valid_indices.append(i)
        
        # Compute relevance scores for valid documents
        if valid_docs:
            similarities = self.text_model_manager.compute_similarity(
                topic, valid_docs, agent_name=self.agent_name)
            
            # Update documents with relevance scores
            for idx, sim, orig_idx in zip(range(len(similarities)), similarities, valid_indices):
                documents[orig_idx]["relevance_score"] = float(sim)
                documents[orig_idx]["is_relevant"] = sim >= self.relevance_threshold
                
                # Log for metrics
                if self.metrics_calculator:
                    self.metrics_calculator.log_tokens_saved(10)  # Approximate tokens saved by using embeddings
        
        # Set relevance to 0 for invalid documents
        for i, doc in enumerate(documents):
            if i not in valid_indices:
                doc["relevance_score"] = 0.0
                doc["is_relevant"] = False
        
        # Sort documents by relevance
        documents.sort(key=lambda x: x.get("relevance_score", 0), reverse=True)
        
        return documents
    
    def extract_document_content(self, document: Dict[str, Any], topic: str) -> Dict[str, Any]:
        """

        Extract key information from a document based on the topic.

        Updates the document dict in-place and returns it.

        """
        content = document.get("content", "")
        if not content or content.startswith("Error") or not document.get("is_relevant", False):
            document["analysis"] = {"error": "Document not relevant or contains errors"}
            document["confidence"] = 0.0
            return document
        
        # Analyze document content
        analysis = self.text_model_manager.analyze_document(
            content, query=topic, agent_name=self.agent_name)
        
        # Generate summary if summary model is available
        if self.summary_model_manager and len(content.split()) > 50:
            # Create a prompt that focuses on the topic
            prompt = f"Summarize the following document in relation to the topic '{topic}':\n\n{content}"
            
            # Generate summary
            summary_result = self.summary_model_manager.generate_summary(
                prompt, 
                prefix="summarize: ", 
                agent_name=self.agent_name,
                params={"max_length": 150, "min_length": 30}
            )
            
            analysis["summary"] = summary_result.get("summary", "Summary generation failed")
            analysis["compression_ratio"] = summary_result.get("compression_ratio", 0)
        else:
            # For very short documents, just use the content
            analysis["summary"] = content[:200] + "..." if len(content) > 200 else content
            analysis["compression_ratio"] = 1.0
        
        # Calculate confidence based on relevance score and content analysis
        relevance_score = document.get("relevance_score", 0)
        content_quality = min(1.0, len(content) / 1000)  # Simple heuristic based on length
        
        # Weighted confidence calculation
        confidence = 0.7 * relevance_score + 0.3 * content_quality
        
        # Update document
        document["analysis"] = analysis
        document["confidence"] = confidence
        
        return document
    
    def process_documents_batch(self, topic: str, documents: List[Dict[str, Any]], 

                               batch_size: int = 5, max_workers: int = 3) -> List[Dict[str, Any]]:
        """

        Process a batch of documents in parallel.

        Returns the processed documents.

        """
        # First determine relevance for all documents
        documents = self.analyze_topic_relevance(topic, documents)
        
        # Then analyze relevant documents in parallel
        relevant_docs = [doc for doc in documents if doc.get("is_relevant", False)]
        
        if not relevant_docs:
            self.logger.warning(f"No relevant documents found for topic: {topic}")
            return documents
        
        # Process in batches to avoid memory issues
        for i in range(0, len(relevant_docs), batch_size):
            batch = relevant_docs[i:i+batch_size]
            
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {executor.submit(self.extract_document_content, doc, topic): doc for doc in batch}
                
                for future in as_completed(futures):
                    try:
                        future.result()  # Document is updated in-place
                    except Exception as e:
                        doc = futures[future]
                        doc["analysis"] = {"error": f"Processing error: {str(e)}"}
                        doc["confidence"] = 0.0
                        self.logger.error(f"Error processing document: {e}")
        
        return documents
    
    def generate_text_analysis_report(self, topic: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
        """

        Generate a comprehensive report of the text analysis findings.

        Returns a report dict.

        """
        # Filter for relevant documents with analysis
        analyzed_docs = [doc for doc in documents if doc.get("is_relevant", False) and "analysis" in doc]
        
        # Calculate overall confidence
        if analyzed_docs:
            avg_confidence = sum(doc.get("confidence", 0) for doc in analyzed_docs) / len(analyzed_docs)
        else:
            avg_confidence = 0.0
        
        # Prepare report content
        report = {
            "topic": topic,
            "total_documents": len(documents),
            "relevant_documents": len([d for d in documents if d.get("is_relevant", False)]),
            "successfully_analyzed": len(analyzed_docs),
            "overall_confidence": avg_confidence,
            "confidence_level": self._get_confidence_level(avg_confidence),
            "document_analyses": []
        }
        
        # Add individual document analyses
        for doc in analyzed_docs:
            doc_report = {
                "filename": doc.get("filename", "Unknown"),
                "relevance_score": doc.get("relevance_score", 0),
                "confidence": doc.get("confidence", 0),
                "summary": doc.get("analysis", {}).get("summary", "No summary available")
            }
            report["document_analyses"].append(doc_report)
        
        return report
    
    def _get_confidence_level(self, confidence_score: float) -> str:
        """Convert numerical confidence to descriptive level."""
        if confidence_score >= self.confidence_high_threshold:
            return "high"
        elif confidence_score >= self.confidence_low_threshold:
            return "medium"
        else:
            return "low"
    
    def process_text_files(self, topic: str, file_paths: List[str]) -> Dict[str, Any]:
        """

        Main method to process a list of text files for a given topic.

        Returns a comprehensive analysis report.

        """
        start_time = time.time()
        self.logger.info(f"Processing {len(file_paths)} text files for topic: {topic}")
        
        # Read all files
        documents = []
        for file_path in file_paths:
            content, success = self.read_text_file(file_path)
            doc = {
                "filename": os.path.basename(file_path),
                "filepath": file_path,
                "content": content,
                "success": success
            }
            documents.append(doc)
        
        # Process documents
        processed_docs = self.process_documents_batch(topic, documents)
        
        # Generate report
        report = self.generate_text_analysis_report(topic, processed_docs)
        
        # Add processing metadata
        processing_time = time.time() - start_time
        report["processing_time"] = processing_time
        report["processed_documents"] = processed_docs
        
        self.logger.info(f"Completed text analysis in {processing_time:.2f} seconds. " +
                        f"Found {report['relevant_documents']} relevant documents.")
        
        return report