from typing import List, Dict from src.core.html_processor import HTMLProcessor from src.api.llm_api import LLMInterface import logging from concurrent.futures import ThreadPoolExecutor, as_completed class DocumentProcessor: def __init__(self, llm: LLMInterface): self.html_processor = HTMLProcessor() self.llm = llm # 添加缓存 self.cache = {} def _clean_text(self, text: str) -> str: """清理文本内容""" import re # 移除多余空白 text = re.sub(r'\s+', ' ', text) # 移除特殊字符 text = re.sub(r'[^\w\s\u4e00-\u9fff。,!?、]', '', text) return text.strip() def process_documents(self, search_results: List[Dict]) -> List[Dict]: processed_docs = [] batch_size = 5 # 批处理大小 # 并行处理文档 with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for result in search_results: if result['url'] in self.cache: processed_docs.append(self.cache[result['url']]) continue futures.append( executor.submit(self._process_single_doc, result) ) for future in as_completed(futures): try: doc = future.result() if doc: self.cache[doc['url']] = doc processed_docs.append(doc) except Exception as e: logging.error(f"处理文档失败: {str(e)}") return processed_docs[:5] # 限制返回数量 def _process_single_doc(self, result: Dict) -> Dict: try: # 提取HTML内容 html = self.html_processor.fetch_html(result['url']) if not html: return None # 提取主要内容 content = self.html_processor.extract_main_content(html) content = self._clean_text(content) if len(content) < 100: # 内容太短 return None # 生成更有针对性的总结 summary = self.llm.summarize_document( content=content, title=result.get('title', ''), url=result['url'] ) if summary: return { 'passage': summary, 'title': result.get('title', ''), 'url': result['url'] } except Exception as e: logging.error(f"处理文档失败 ({result['url']}): {str(e)}") return None