Toursim-Test / src /core /html_processor.py
zhuhai111's picture
Upload 43 files
7cc8bc0 verified
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import time
import logging
class HTMLProcessor:
def __init__(self, timeout: int = 5):
self.session = requests.Session()
self.timeout = timeout
def fetch_html(self, url: str) -> str:
"""获取单个URL的HTML内容"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
try:
logging.info(f"开始获取URL: {url}")
response = self.session.get(
url,
timeout=self.timeout,
headers=headers,
verify=False
)
response.raise_for_status()
# 检查响应内容类型
content_type = response.headers.get('content-type', '')
if 'text/html' not in content_type.lower():
logging.warning(f"非HTML响应: {content_type}")
# 设置正确的编码
response.encoding = response.apparent_encoding
html = response.text
logging.info(f"成功获取HTML,长度: {len(html)}")
return html
except requests.Timeout:
logging.error(f"获取URL超时: {url}")
except requests.RequestException as e:
logging.error(f"获取URL失败 {url}: {str(e)}")
except Exception as e:
logging.error(f"未预期的错误 {url}: {str(e)}")
return ""
def fetch_multiple_html(self, urls: List[str], max_urls: int = 10) -> List[Dict]:
"""
并行获取多个URL的HTML内容
Args:
urls: URL列表
max_urls: 最大获取数量
Returns:
List[Dict]: 包含成功获取的HTML内容列表
"""
results = []
urls = urls[:max_urls] # 只处理前max_urls个URL
with ThreadPoolExecutor(max_workers=max_urls) as executor:
# 提交所有任务
future_to_url = {
executor.submit(self.fetch_html, url): url
for url in urls
}
# 处理完成的任务
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
html = future.result()
if html: # 只添加成功获取的结果
results.append({
'url': url,
'html': html,
'metadata': self.extract_metadata(html)
})
except Exception as e:
print(f"处理URL失败 {url}: {e}")
return results
def extract_main_content(self, html: str) -> str:
"""提取HTML中的主要内容"""
if not html:
logging.warning("输入的HTML为空")
return ""
try:
soup = BeautifulSoup(html, 'html.parser')
# 移除脚本和样式元素
for script in soup(["script", "style", "iframe", "nav", "footer", "header"]):
script.decompose()
# 记录原始长度
original_length = len(html)
# 尝试找到主要内容容器
main_content = None
possible_content_ids = ['content', 'main', 'article', 'post']
possible_content_classes = ['content', 'article', 'post', 'main-content']
# 按ID查找
for content_id in possible_content_ids:
main_content = soup.find(id=content_id)
if main_content:
break
# 按class查找
if not main_content:
for content_class in possible_content_classes:
main_content = soup.find(class_=content_class)
if main_content:
break
# 如果找不到特定容器,使用全文
text = main_content.get_text() if main_content else soup.get_text()
# 清理文本
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
# 记录处理后长度
processed_length = len(text)
# 添加日志
logging.info(f"HTML处理前长度: {original_length}, 处理后长度: {processed_length}")
# 如果处理后文本太短,可能是提取失败
if processed_length < 100 and original_length > 1000:
logging.warning(f"提取的内容异常短: {processed_length} 字符")
return ""
return text
except Exception as e:
logging.error(f"提取主要内容时出错: {str(e)}")
return ""
def extract_metadata(self, html: str) -> dict:
"""提取HTML中的元数据"""
try:
soup = BeautifulSoup(html, 'html.parser')
metadata = {
'title': '',
'description': '',
'keywords': ''
}
# 更安全的标题提取
title = ''
if soup.title and soup.title.string:
title = soup.title.string.strip()
else:
# 尝试从h1标签提取标题
h1 = soup.find('h1')
if h1:
title = h1.get_text().strip()
# 如果还是没有标题,使用默认值
metadata['title'] = title if title else "未知标题"
# 提取meta描述
meta_desc = soup.find('meta', attrs={'name': ['description', 'Description']})
if meta_desc:
metadata['description'] = meta_desc.get('content', '').strip()
# 提取meta关键词
meta_keywords = soup.find('meta', attrs={'name': ['keywords', 'Keywords']})
if meta_keywords:
metadata['keywords'] = meta_keywords.get('content', '').strip()
# 确保所有字段都有值
metadata = {k: v if v else '未知' for k, v in metadata.items()}
return metadata
except Exception as e:
logging.error(f"提取元数据时出错: {str(e)}")
return {
'title': '未知标题',
'description': '未知描述',
'keywords': '未知关键词'
}