Spaces:
Running
Running
from typing import List, Dict | |
from src.core.html_processor import HTMLProcessor | |
from src.api.llm_api import LLMInterface | |
import logging | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
class DocumentProcessor: | |
def __init__(self, llm: LLMInterface): | |
self.html_processor = HTMLProcessor() | |
self.llm = llm | |
# 添加缓存 | |
self.cache = {} | |
def _clean_text(self, text: str) -> str: | |
"""清理文本内容""" | |
import re | |
# 移除多余空白 | |
text = re.sub(r'\s+', ' ', text) | |
# 移除特殊字符 | |
text = re.sub(r'[^\w\s\u4e00-\u9fff。,!?、]', '', text) | |
return text.strip() | |
def process_documents(self, search_results: List[Dict]) -> List[Dict]: | |
processed_docs = [] | |
batch_size = 5 # 批处理大小 | |
# 并行处理文档 | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
futures = [] | |
for result in search_results: | |
if result['url'] in self.cache: | |
processed_docs.append(self.cache[result['url']]) | |
continue | |
futures.append( | |
executor.submit(self._process_single_doc, result) | |
) | |
for future in as_completed(futures): | |
try: | |
doc = future.result() | |
if doc: | |
self.cache[doc['url']] = doc | |
processed_docs.append(doc) | |
except Exception as e: | |
logging.error(f"处理文档失败: {str(e)}") | |
return processed_docs[:5] # 限制返回数量 | |
def _process_single_doc(self, result: Dict) -> Dict: | |
try: | |
# 提取HTML内容 | |
html = self.html_processor.fetch_html(result['url']) | |
if not html: | |
return None | |
# 提取主要内容 | |
content = self.html_processor.extract_main_content(html) | |
content = self._clean_text(content) | |
if len(content) < 100: # 内容太短 | |
return None | |
# 生成更有针对性的总结 | |
summary = self.llm.summarize_document( | |
content=content, | |
title=result.get('title', ''), | |
url=result['url'] | |
) | |
if summary: | |
return { | |
'passage': summary, | |
'title': result.get('title', ''), | |
'url': result['url'] | |
} | |
except Exception as e: | |
logging.error(f"处理文档失败 ({result['url']}): {str(e)}") | |
return None |