Spaces:
Sleeping
Sleeping
# agents/text_analysis_agent.py | |
import logging | |
import os | |
from typing import Dict, List, Optional, Tuple, Union, Any | |
import numpy as np | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import time | |
class TextAnalysisAgent: | |
def __init__(self, text_model_manager, summary_model_manager=None, | |
token_manager=None, cache_manager=None, metrics_calculator=None): | |
"""Initialize the TextAnalysisAgent with required model managers and utilities.""" | |
self.logger = logging.getLogger(__name__) | |
self.text_model_manager = text_model_manager | |
self.summary_model_manager = summary_model_manager | |
self.token_manager = token_manager | |
self.cache_manager = cache_manager | |
self.metrics_calculator = metrics_calculator | |
# Default relevance threshold | |
self.relevance_threshold = 0.5 | |
# Default confidence values | |
self.confidence_high_threshold = 0.7 | |
self.confidence_low_threshold = 0.3 | |
# Agent name for logging | |
self.agent_name = "text_analysis_agent" | |
def read_text_file(self, file_path: str) -> Tuple[str, bool]: | |
""" | |
Read a text file and return its content. | |
Returns a tuple of (content, success). | |
""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
content = file.read() | |
return content, True | |
except UnicodeDecodeError: | |
# Try with different encoding | |
try: | |
with open(file_path, 'r', encoding='latin-1') as file: | |
content = file.read() | |
return content, True | |
except Exception as e: | |
self.logger.error(f"Failed to read file with latin-1 encoding: {e}") | |
return f"Error reading file: {str(e)}", False | |
except Exception as e: | |
self.logger.error(f"Failed to read file: {e}") | |
return f"Error reading file: {str(e)}", False | |
def analyze_topic_relevance(self, topic: str, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
""" | |
Determine which documents are relevant to the user's topic. | |
Adds relevance scores and updates the documents list in-place. | |
Returns the updated documents list. | |
""" | |
self.logger.info(f"Analyzing relevance of {len(documents)} documents to topic: {topic}") | |
# Extract document texts | |
doc_texts = [doc.get("content", "") for doc in documents] | |
# Check if we have valid texts | |
valid_docs = [] | |
valid_indices = [] | |
for i, text in enumerate(doc_texts): | |
if isinstance(text, str) and text and not text.startswith("Error"): | |
valid_docs.append(text) | |
valid_indices.append(i) | |
# Compute relevance scores for valid documents | |
if valid_docs: | |
similarities = self.text_model_manager.compute_similarity( | |
topic, valid_docs, agent_name=self.agent_name) | |
# Update documents with relevance scores | |
for idx, sim, orig_idx in zip(range(len(similarities)), similarities, valid_indices): | |
documents[orig_idx]["relevance_score"] = float(sim) | |
documents[orig_idx]["is_relevant"] = sim >= self.relevance_threshold | |
# Log for metrics | |
if self.metrics_calculator: | |
self.metrics_calculator.log_tokens_saved(10) # Approximate tokens saved by using embeddings | |
# Set relevance to 0 for invalid documents | |
for i, doc in enumerate(documents): | |
if i not in valid_indices: | |
doc["relevance_score"] = 0.0 | |
doc["is_relevant"] = False | |
# Sort documents by relevance | |
documents.sort(key=lambda x: x.get("relevance_score", 0), reverse=True) | |
return documents | |
def extract_document_content(self, document: Dict[str, Any], topic: str) -> Dict[str, Any]: | |
""" | |
Extract key information from a document based on the topic. | |
Updates the document dict in-place and returns it. | |
""" | |
content = document.get("content", "") | |
if not content or content.startswith("Error") or not document.get("is_relevant", False): | |
document["analysis"] = {"error": "Document not relevant or contains errors"} | |
document["confidence"] = 0.0 | |
return document | |
# Analyze document content | |
analysis = self.text_model_manager.analyze_document( | |
content, query=topic, agent_name=self.agent_name) | |
# Generate summary if summary model is available | |
if self.summary_model_manager and len(content.split()) > 50: | |
# Create a prompt that focuses on the topic | |
prompt = f"Summarize the following document in relation to the topic '{topic}':\n\n{content}" | |
# Generate summary | |
summary_result = self.summary_model_manager.generate_summary( | |
prompt, | |
prefix="summarize: ", | |
agent_name=self.agent_name, | |
params={"max_length": 150, "min_length": 30} | |
) | |
analysis["summary"] = summary_result.get("summary", "Summary generation failed") | |
analysis["compression_ratio"] = summary_result.get("compression_ratio", 0) | |
else: | |
# For very short documents, just use the content | |
analysis["summary"] = content[:200] + "..." if len(content) > 200 else content | |
analysis["compression_ratio"] = 1.0 | |
# Calculate confidence based on relevance score and content analysis | |
relevance_score = document.get("relevance_score", 0) | |
content_quality = min(1.0, len(content) / 1000) # Simple heuristic based on length | |
# Weighted confidence calculation | |
confidence = 0.7 * relevance_score + 0.3 * content_quality | |
# Update document | |
document["analysis"] = analysis | |
document["confidence"] = confidence | |
return document | |
def process_documents_batch(self, topic: str, documents: List[Dict[str, Any]], | |
batch_size: int = 5, max_workers: int = 3) -> List[Dict[str, Any]]: | |
""" | |
Process a batch of documents in parallel. | |
Returns the processed documents. | |
""" | |
# First determine relevance for all documents | |
documents = self.analyze_topic_relevance(topic, documents) | |
# Then analyze relevant documents in parallel | |
relevant_docs = [doc for doc in documents if doc.get("is_relevant", False)] | |
if not relevant_docs: | |
self.logger.warning(f"No relevant documents found for topic: {topic}") | |
return documents | |
# Process in batches to avoid memory issues | |
for i in range(0, len(relevant_docs), batch_size): | |
batch = relevant_docs[i:i+batch_size] | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
futures = {executor.submit(self.extract_document_content, doc, topic): doc for doc in batch} | |
for future in as_completed(futures): | |
try: | |
future.result() # Document is updated in-place | |
except Exception as e: | |
doc = futures[future] | |
doc["analysis"] = {"error": f"Processing error: {str(e)}"} | |
doc["confidence"] = 0.0 | |
self.logger.error(f"Error processing document: {e}") | |
return documents | |
def generate_text_analysis_report(self, topic: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: | |
""" | |
Generate a comprehensive report of the text analysis findings. | |
Returns a report dict. | |
""" | |
# Filter for relevant documents with analysis | |
analyzed_docs = [doc for doc in documents if doc.get("is_relevant", False) and "analysis" in doc] | |
# Calculate overall confidence | |
if analyzed_docs: | |
avg_confidence = sum(doc.get("confidence", 0) for doc in analyzed_docs) / len(analyzed_docs) | |
else: | |
avg_confidence = 0.0 | |
# Prepare report content | |
report = { | |
"topic": topic, | |
"total_documents": len(documents), | |
"relevant_documents": len([d for d in documents if d.get("is_relevant", False)]), | |
"successfully_analyzed": len(analyzed_docs), | |
"overall_confidence": avg_confidence, | |
"confidence_level": self._get_confidence_level(avg_confidence), | |
"document_analyses": [] | |
} | |
# Add individual document analyses | |
for doc in analyzed_docs: | |
doc_report = { | |
"filename": doc.get("filename", "Unknown"), | |
"relevance_score": doc.get("relevance_score", 0), | |
"confidence": doc.get("confidence", 0), | |
"summary": doc.get("analysis", {}).get("summary", "No summary available") | |
} | |
report["document_analyses"].append(doc_report) | |
return report | |
def _get_confidence_level(self, confidence_score: float) -> str: | |
"""Convert numerical confidence to descriptive level.""" | |
if confidence_score >= self.confidence_high_threshold: | |
return "high" | |
elif confidence_score >= self.confidence_low_threshold: | |
return "medium" | |
else: | |
return "low" | |
def process_text_files(self, topic: str, file_paths: List[str]) -> Dict[str, Any]: | |
""" | |
Main method to process a list of text files for a given topic. | |
Returns a comprehensive analysis report. | |
""" | |
start_time = time.time() | |
self.logger.info(f"Processing {len(file_paths)} text files for topic: {topic}") | |
# Read all files | |
documents = [] | |
for file_path in file_paths: | |
content, success = self.read_text_file(file_path) | |
doc = { | |
"filename": os.path.basename(file_path), | |
"filepath": file_path, | |
"content": content, | |
"success": success | |
} | |
documents.append(doc) | |
# Process documents | |
processed_docs = self.process_documents_batch(topic, documents) | |
# Generate report | |
report = self.generate_text_analysis_report(topic, processed_docs) | |
# Add processing metadata | |
processing_time = time.time() - start_time | |
report["processing_time"] = processing_time | |
report["processed_documents"] = processed_docs | |
self.logger.info(f"Completed text analysis in {processing_time:.2f} seconds. " + | |
f"Found {report['relevant_documents']} relevant documents.") | |
return report | |