Spaces:
Sleeping
Sleeping
File size: 11,371 Bytes
7de43ca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# agents/text_analysis_agent.py
import logging
import os
from typing import Dict, List, Optional, Tuple, Union, Any
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class TextAnalysisAgent:
def __init__(self, text_model_manager, summary_model_manager=None,
token_manager=None, cache_manager=None, metrics_calculator=None):
"""Initialize the TextAnalysisAgent with required model managers and utilities."""
self.logger = logging.getLogger(__name__)
self.text_model_manager = text_model_manager
self.summary_model_manager = summary_model_manager
self.token_manager = token_manager
self.cache_manager = cache_manager
self.metrics_calculator = metrics_calculator
# Default relevance threshold
self.relevance_threshold = 0.5
# Default confidence values
self.confidence_high_threshold = 0.7
self.confidence_low_threshold = 0.3
# Agent name for logging
self.agent_name = "text_analysis_agent"
def read_text_file(self, file_path: str) -> Tuple[str, bool]:
"""
Read a text file and return its content.
Returns a tuple of (content, success).
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content, True
except UnicodeDecodeError:
# Try with different encoding
try:
with open(file_path, 'r', encoding='latin-1') as file:
content = file.read()
return content, True
except Exception as e:
self.logger.error(f"Failed to read file with latin-1 encoding: {e}")
return f"Error reading file: {str(e)}", False
except Exception as e:
self.logger.error(f"Failed to read file: {e}")
return f"Error reading file: {str(e)}", False
def analyze_topic_relevance(self, topic: str, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Determine which documents are relevant to the user's topic.
Adds relevance scores and updates the documents list in-place.
Returns the updated documents list.
"""
self.logger.info(f"Analyzing relevance of {len(documents)} documents to topic: {topic}")
# Extract document texts
doc_texts = [doc.get("content", "") for doc in documents]
# Check if we have valid texts
valid_docs = []
valid_indices = []
for i, text in enumerate(doc_texts):
if isinstance(text, str) and text and not text.startswith("Error"):
valid_docs.append(text)
valid_indices.append(i)
# Compute relevance scores for valid documents
if valid_docs:
similarities = self.text_model_manager.compute_similarity(
topic, valid_docs, agent_name=self.agent_name)
# Update documents with relevance scores
for idx, sim, orig_idx in zip(range(len(similarities)), similarities, valid_indices):
documents[orig_idx]["relevance_score"] = float(sim)
documents[orig_idx]["is_relevant"] = sim >= self.relevance_threshold
# Log for metrics
if self.metrics_calculator:
self.metrics_calculator.log_tokens_saved(10) # Approximate tokens saved by using embeddings
# Set relevance to 0 for invalid documents
for i, doc in enumerate(documents):
if i not in valid_indices:
doc["relevance_score"] = 0.0
doc["is_relevant"] = False
# Sort documents by relevance
documents.sort(key=lambda x: x.get("relevance_score", 0), reverse=True)
return documents
def extract_document_content(self, document: Dict[str, Any], topic: str) -> Dict[str, Any]:
"""
Extract key information from a document based on the topic.
Updates the document dict in-place and returns it.
"""
content = document.get("content", "")
if not content or content.startswith("Error") or not document.get("is_relevant", False):
document["analysis"] = {"error": "Document not relevant or contains errors"}
document["confidence"] = 0.0
return document
# Analyze document content
analysis = self.text_model_manager.analyze_document(
content, query=topic, agent_name=self.agent_name)
# Generate summary if summary model is available
if self.summary_model_manager and len(content.split()) > 50:
# Create a prompt that focuses on the topic
prompt = f"Summarize the following document in relation to the topic '{topic}':\n\n{content}"
# Generate summary
summary_result = self.summary_model_manager.generate_summary(
prompt,
prefix="summarize: ",
agent_name=self.agent_name,
params={"max_length": 150, "min_length": 30}
)
analysis["summary"] = summary_result.get("summary", "Summary generation failed")
analysis["compression_ratio"] = summary_result.get("compression_ratio", 0)
else:
# For very short documents, just use the content
analysis["summary"] = content[:200] + "..." if len(content) > 200 else content
analysis["compression_ratio"] = 1.0
# Calculate confidence based on relevance score and content analysis
relevance_score = document.get("relevance_score", 0)
content_quality = min(1.0, len(content) / 1000) # Simple heuristic based on length
# Weighted confidence calculation
confidence = 0.7 * relevance_score + 0.3 * content_quality
# Update document
document["analysis"] = analysis
document["confidence"] = confidence
return document
def process_documents_batch(self, topic: str, documents: List[Dict[str, Any]],
batch_size: int = 5, max_workers: int = 3) -> List[Dict[str, Any]]:
"""
Process a batch of documents in parallel.
Returns the processed documents.
"""
# First determine relevance for all documents
documents = self.analyze_topic_relevance(topic, documents)
# Then analyze relevant documents in parallel
relevant_docs = [doc for doc in documents if doc.get("is_relevant", False)]
if not relevant_docs:
self.logger.warning(f"No relevant documents found for topic: {topic}")
return documents
# Process in batches to avoid memory issues
for i in range(0, len(relevant_docs), batch_size):
batch = relevant_docs[i:i+batch_size]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self.extract_document_content, doc, topic): doc for doc in batch}
for future in as_completed(futures):
try:
future.result() # Document is updated in-place
except Exception as e:
doc = futures[future]
doc["analysis"] = {"error": f"Processing error: {str(e)}"}
doc["confidence"] = 0.0
self.logger.error(f"Error processing document: {e}")
return documents
def generate_text_analysis_report(self, topic: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate a comprehensive report of the text analysis findings.
Returns a report dict.
"""
# Filter for relevant documents with analysis
analyzed_docs = [doc for doc in documents if doc.get("is_relevant", False) and "analysis" in doc]
# Calculate overall confidence
if analyzed_docs:
avg_confidence = sum(doc.get("confidence", 0) for doc in analyzed_docs) / len(analyzed_docs)
else:
avg_confidence = 0.0
# Prepare report content
report = {
"topic": topic,
"total_documents": len(documents),
"relevant_documents": len([d for d in documents if d.get("is_relevant", False)]),
"successfully_analyzed": len(analyzed_docs),
"overall_confidence": avg_confidence,
"confidence_level": self._get_confidence_level(avg_confidence),
"document_analyses": []
}
# Add individual document analyses
for doc in analyzed_docs:
doc_report = {
"filename": doc.get("filename", "Unknown"),
"relevance_score": doc.get("relevance_score", 0),
"confidence": doc.get("confidence", 0),
"summary": doc.get("analysis", {}).get("summary", "No summary available")
}
report["document_analyses"].append(doc_report)
return report
def _get_confidence_level(self, confidence_score: float) -> str:
"""Convert numerical confidence to descriptive level."""
if confidence_score >= self.confidence_high_threshold:
return "high"
elif confidence_score >= self.confidence_low_threshold:
return "medium"
else:
return "low"
def process_text_files(self, topic: str, file_paths: List[str]) -> Dict[str, Any]:
"""
Main method to process a list of text files for a given topic.
Returns a comprehensive analysis report.
"""
start_time = time.time()
self.logger.info(f"Processing {len(file_paths)} text files for topic: {topic}")
# Read all files
documents = []
for file_path in file_paths:
content, success = self.read_text_file(file_path)
doc = {
"filename": os.path.basename(file_path),
"filepath": file_path,
"content": content,
"success": success
}
documents.append(doc)
# Process documents
processed_docs = self.process_documents_batch(topic, documents)
# Generate report
report = self.generate_text_analysis_report(topic, processed_docs)
# Add processing metadata
processing_time = time.time() - start_time
report["processing_time"] = processing_time
report["processed_documents"] = processed_docs
self.logger.info(f"Completed text analysis in {processing_time:.2f} seconds. " +
f"Found {report['relevant_documents']} relevant documents.")
return report
|