# agents/text_analysis_agent.py import logging import os from typing import Dict, List, Optional, Tuple, Union, Any import numpy as np from concurrent.futures import ThreadPoolExecutor, as_completed import time class TextAnalysisAgent: def __init__(self, text_model_manager, summary_model_manager=None, token_manager=None, cache_manager=None, metrics_calculator=None): """Initialize the TextAnalysisAgent with required model managers and utilities.""" self.logger = logging.getLogger(__name__) self.text_model_manager = text_model_manager self.summary_model_manager = summary_model_manager self.token_manager = token_manager self.cache_manager = cache_manager self.metrics_calculator = metrics_calculator # Default relevance threshold self.relevance_threshold = 0.5 # Default confidence values self.confidence_high_threshold = 0.7 self.confidence_low_threshold = 0.3 # Agent name for logging self.agent_name = "text_analysis_agent" def read_text_file(self, file_path: str) -> Tuple[str, bool]: """ Read a text file and return its content. Returns a tuple of (content, success). """ try: with open(file_path, 'r', encoding='utf-8') as file: content = file.read() return content, True except UnicodeDecodeError: # Try with different encoding try: with open(file_path, 'r', encoding='latin-1') as file: content = file.read() return content, True except Exception as e: self.logger.error(f"Failed to read file with latin-1 encoding: {e}") return f"Error reading file: {str(e)}", False except Exception as e: self.logger.error(f"Failed to read file: {e}") return f"Error reading file: {str(e)}", False def analyze_topic_relevance(self, topic: str, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Determine which documents are relevant to the user's topic. Adds relevance scores and updates the documents list in-place. Returns the updated documents list. """ self.logger.info(f"Analyzing relevance of {len(documents)} documents to topic: {topic}") # Extract document texts doc_texts = [doc.get("content", "") for doc in documents] # Check if we have valid texts valid_docs = [] valid_indices = [] for i, text in enumerate(doc_texts): if isinstance(text, str) and text and not text.startswith("Error"): valid_docs.append(text) valid_indices.append(i) # Compute relevance scores for valid documents if valid_docs: similarities = self.text_model_manager.compute_similarity( topic, valid_docs, agent_name=self.agent_name) # Update documents with relevance scores for idx, sim, orig_idx in zip(range(len(similarities)), similarities, valid_indices): documents[orig_idx]["relevance_score"] = float(sim) documents[orig_idx]["is_relevant"] = sim >= self.relevance_threshold # Log for metrics if self.metrics_calculator: self.metrics_calculator.log_tokens_saved(10) # Approximate tokens saved by using embeddings # Set relevance to 0 for invalid documents for i, doc in enumerate(documents): if i not in valid_indices: doc["relevance_score"] = 0.0 doc["is_relevant"] = False # Sort documents by relevance documents.sort(key=lambda x: x.get("relevance_score", 0), reverse=True) return documents def extract_document_content(self, document: Dict[str, Any], topic: str) -> Dict[str, Any]: """ Extract key information from a document based on the topic. Updates the document dict in-place and returns it. """ content = document.get("content", "") if not content or content.startswith("Error") or not document.get("is_relevant", False): document["analysis"] = {"error": "Document not relevant or contains errors"} document["confidence"] = 0.0 return document # Analyze document content analysis = self.text_model_manager.analyze_document( content, query=topic, agent_name=self.agent_name) # Generate summary if summary model is available if self.summary_model_manager and len(content.split()) > 50: # Create a prompt that focuses on the topic prompt = f"Summarize the following document in relation to the topic '{topic}':\n\n{content}" # Generate summary summary_result = self.summary_model_manager.generate_summary( prompt, prefix="summarize: ", agent_name=self.agent_name, params={"max_length": 150, "min_length": 30} ) analysis["summary"] = summary_result.get("summary", "Summary generation failed") analysis["compression_ratio"] = summary_result.get("compression_ratio", 0) else: # For very short documents, just use the content analysis["summary"] = content[:200] + "..." if len(content) > 200 else content analysis["compression_ratio"] = 1.0 # Calculate confidence based on relevance score and content analysis relevance_score = document.get("relevance_score", 0) content_quality = min(1.0, len(content) / 1000) # Simple heuristic based on length # Weighted confidence calculation confidence = 0.7 * relevance_score + 0.3 * content_quality # Update document document["analysis"] = analysis document["confidence"] = confidence return document def process_documents_batch(self, topic: str, documents: List[Dict[str, Any]], batch_size: int = 5, max_workers: int = 3) -> List[Dict[str, Any]]: """ Process a batch of documents in parallel. Returns the processed documents. """ # First determine relevance for all documents documents = self.analyze_topic_relevance(topic, documents) # Then analyze relevant documents in parallel relevant_docs = [doc for doc in documents if doc.get("is_relevant", False)] if not relevant_docs: self.logger.warning(f"No relevant documents found for topic: {topic}") return documents # Process in batches to avoid memory issues for i in range(0, len(relevant_docs), batch_size): batch = relevant_docs[i:i+batch_size] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(self.extract_document_content, doc, topic): doc for doc in batch} for future in as_completed(futures): try: future.result() # Document is updated in-place except Exception as e: doc = futures[future] doc["analysis"] = {"error": f"Processing error: {str(e)}"} doc["confidence"] = 0.0 self.logger.error(f"Error processing document: {e}") return documents def generate_text_analysis_report(self, topic: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: """ Generate a comprehensive report of the text analysis findings. Returns a report dict. """ # Filter for relevant documents with analysis analyzed_docs = [doc for doc in documents if doc.get("is_relevant", False) and "analysis" in doc] # Calculate overall confidence if analyzed_docs: avg_confidence = sum(doc.get("confidence", 0) for doc in analyzed_docs) / len(analyzed_docs) else: avg_confidence = 0.0 # Prepare report content report = { "topic": topic, "total_documents": len(documents), "relevant_documents": len([d for d in documents if d.get("is_relevant", False)]), "successfully_analyzed": len(analyzed_docs), "overall_confidence": avg_confidence, "confidence_level": self._get_confidence_level(avg_confidence), "document_analyses": [] } # Add individual document analyses for doc in analyzed_docs: doc_report = { "filename": doc.get("filename", "Unknown"), "relevance_score": doc.get("relevance_score", 0), "confidence": doc.get("confidence", 0), "summary": doc.get("analysis", {}).get("summary", "No summary available") } report["document_analyses"].append(doc_report) return report def _get_confidence_level(self, confidence_score: float) -> str: """Convert numerical confidence to descriptive level.""" if confidence_score >= self.confidence_high_threshold: return "high" elif confidence_score >= self.confidence_low_threshold: return "medium" else: return "low" def process_text_files(self, topic: str, file_paths: List[str]) -> Dict[str, Any]: """ Main method to process a list of text files for a given topic. Returns a comprehensive analysis report. """ start_time = time.time() self.logger.info(f"Processing {len(file_paths)} text files for topic: {topic}") # Read all files documents = [] for file_path in file_paths: content, success = self.read_text_file(file_path) doc = { "filename": os.path.basename(file_path), "filepath": file_path, "content": content, "success": success } documents.append(doc) # Process documents processed_docs = self.process_documents_batch(topic, documents) # Generate report report = self.generate_text_analysis_report(topic, processed_docs) # Add processing metadata processing_time = time.time() - start_time report["processing_time"] = processing_time report["processed_documents"] = processed_docs self.logger.info(f"Completed text analysis in {processing_time:.2f} seconds. " + f"Found {report['relevant_documents']} relevant documents.") return report