ai_agents_sustainable / agents /text_analysis_agent.py
Chamin09's picture
initial commit
7de43ca verified
# agents/text_analysis_agent.py
import logging
import os
from typing import Dict, List, Optional, Tuple, Union, Any
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class TextAnalysisAgent:
def __init__(self, text_model_manager, summary_model_manager=None,
token_manager=None, cache_manager=None, metrics_calculator=None):
"""Initialize the TextAnalysisAgent with required model managers and utilities."""
self.logger = logging.getLogger(__name__)
self.text_model_manager = text_model_manager
self.summary_model_manager = summary_model_manager
self.token_manager = token_manager
self.cache_manager = cache_manager
self.metrics_calculator = metrics_calculator
# Default relevance threshold
self.relevance_threshold = 0.5
# Default confidence values
self.confidence_high_threshold = 0.7
self.confidence_low_threshold = 0.3
# Agent name for logging
self.agent_name = "text_analysis_agent"
def read_text_file(self, file_path: str) -> Tuple[str, bool]:
"""
Read a text file and return its content.
Returns a tuple of (content, success).
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content, True
except UnicodeDecodeError:
# Try with different encoding
try:
with open(file_path, 'r', encoding='latin-1') as file:
content = file.read()
return content, True
except Exception as e:
self.logger.error(f"Failed to read file with latin-1 encoding: {e}")
return f"Error reading file: {str(e)}", False
except Exception as e:
self.logger.error(f"Failed to read file: {e}")
return f"Error reading file: {str(e)}", False
def analyze_topic_relevance(self, topic: str, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Determine which documents are relevant to the user's topic.
Adds relevance scores and updates the documents list in-place.
Returns the updated documents list.
"""
self.logger.info(f"Analyzing relevance of {len(documents)} documents to topic: {topic}")
# Extract document texts
doc_texts = [doc.get("content", "") for doc in documents]
# Check if we have valid texts
valid_docs = []
valid_indices = []
for i, text in enumerate(doc_texts):
if isinstance(text, str) and text and not text.startswith("Error"):
valid_docs.append(text)
valid_indices.append(i)
# Compute relevance scores for valid documents
if valid_docs:
similarities = self.text_model_manager.compute_similarity(
topic, valid_docs, agent_name=self.agent_name)
# Update documents with relevance scores
for idx, sim, orig_idx in zip(range(len(similarities)), similarities, valid_indices):
documents[orig_idx]["relevance_score"] = float(sim)
documents[orig_idx]["is_relevant"] = sim >= self.relevance_threshold
# Log for metrics
if self.metrics_calculator:
self.metrics_calculator.log_tokens_saved(10) # Approximate tokens saved by using embeddings
# Set relevance to 0 for invalid documents
for i, doc in enumerate(documents):
if i not in valid_indices:
doc["relevance_score"] = 0.0
doc["is_relevant"] = False
# Sort documents by relevance
documents.sort(key=lambda x: x.get("relevance_score", 0), reverse=True)
return documents
def extract_document_content(self, document: Dict[str, Any], topic: str) -> Dict[str, Any]:
"""
Extract key information from a document based on the topic.
Updates the document dict in-place and returns it.
"""
content = document.get("content", "")
if not content or content.startswith("Error") or not document.get("is_relevant", False):
document["analysis"] = {"error": "Document not relevant or contains errors"}
document["confidence"] = 0.0
return document
# Analyze document content
analysis = self.text_model_manager.analyze_document(
content, query=topic, agent_name=self.agent_name)
# Generate summary if summary model is available
if self.summary_model_manager and len(content.split()) > 50:
# Create a prompt that focuses on the topic
prompt = f"Summarize the following document in relation to the topic '{topic}':\n\n{content}"
# Generate summary
summary_result = self.summary_model_manager.generate_summary(
prompt,
prefix="summarize: ",
agent_name=self.agent_name,
params={"max_length": 150, "min_length": 30}
)
analysis["summary"] = summary_result.get("summary", "Summary generation failed")
analysis["compression_ratio"] = summary_result.get("compression_ratio", 0)
else:
# For very short documents, just use the content
analysis["summary"] = content[:200] + "..." if len(content) > 200 else content
analysis["compression_ratio"] = 1.0
# Calculate confidence based on relevance score and content analysis
relevance_score = document.get("relevance_score", 0)
content_quality = min(1.0, len(content) / 1000) # Simple heuristic based on length
# Weighted confidence calculation
confidence = 0.7 * relevance_score + 0.3 * content_quality
# Update document
document["analysis"] = analysis
document["confidence"] = confidence
return document
def process_documents_batch(self, topic: str, documents: List[Dict[str, Any]],
batch_size: int = 5, max_workers: int = 3) -> List[Dict[str, Any]]:
"""
Process a batch of documents in parallel.
Returns the processed documents.
"""
# First determine relevance for all documents
documents = self.analyze_topic_relevance(topic, documents)
# Then analyze relevant documents in parallel
relevant_docs = [doc for doc in documents if doc.get("is_relevant", False)]
if not relevant_docs:
self.logger.warning(f"No relevant documents found for topic: {topic}")
return documents
# Process in batches to avoid memory issues
for i in range(0, len(relevant_docs), batch_size):
batch = relevant_docs[i:i+batch_size]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self.extract_document_content, doc, topic): doc for doc in batch}
for future in as_completed(futures):
try:
future.result() # Document is updated in-place
except Exception as e:
doc = futures[future]
doc["analysis"] = {"error": f"Processing error: {str(e)}"}
doc["confidence"] = 0.0
self.logger.error(f"Error processing document: {e}")
return documents
def generate_text_analysis_report(self, topic: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate a comprehensive report of the text analysis findings.
Returns a report dict.
"""
# Filter for relevant documents with analysis
analyzed_docs = [doc for doc in documents if doc.get("is_relevant", False) and "analysis" in doc]
# Calculate overall confidence
if analyzed_docs:
avg_confidence = sum(doc.get("confidence", 0) for doc in analyzed_docs) / len(analyzed_docs)
else:
avg_confidence = 0.0
# Prepare report content
report = {
"topic": topic,
"total_documents": len(documents),
"relevant_documents": len([d for d in documents if d.get("is_relevant", False)]),
"successfully_analyzed": len(analyzed_docs),
"overall_confidence": avg_confidence,
"confidence_level": self._get_confidence_level(avg_confidence),
"document_analyses": []
}
# Add individual document analyses
for doc in analyzed_docs:
doc_report = {
"filename": doc.get("filename", "Unknown"),
"relevance_score": doc.get("relevance_score", 0),
"confidence": doc.get("confidence", 0),
"summary": doc.get("analysis", {}).get("summary", "No summary available")
}
report["document_analyses"].append(doc_report)
return report
def _get_confidence_level(self, confidence_score: float) -> str:
"""Convert numerical confidence to descriptive level."""
if confidence_score >= self.confidence_high_threshold:
return "high"
elif confidence_score >= self.confidence_low_threshold:
return "medium"
else:
return "low"
def process_text_files(self, topic: str, file_paths: List[str]) -> Dict[str, Any]:
"""
Main method to process a list of text files for a given topic.
Returns a comprehensive analysis report.
"""
start_time = time.time()
self.logger.info(f"Processing {len(file_paths)} text files for topic: {topic}")
# Read all files
documents = []
for file_path in file_paths:
content, success = self.read_text_file(file_path)
doc = {
"filename": os.path.basename(file_path),
"filepath": file_path,
"content": content,
"success": success
}
documents.append(doc)
# Process documents
processed_docs = self.process_documents_batch(topic, documents)
# Generate report
report = self.generate_text_analysis_report(topic, processed_docs)
# Add processing metadata
processing_time = time.time() - start_time
report["processing_time"] = processing_time
report["processed_documents"] = processed_docs
self.logger.info(f"Completed text analysis in {processing_time:.2f} seconds. " +
f"Found {report['relevant_documents']} relevant documents.")
return report