Spaces:
Running
Running
File size: 7,345 Bytes
7cc8bc0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import time
import logging
class HTMLProcessor:
def __init__(self, timeout: int = 5):
self.session = requests.Session()
self.timeout = timeout
def fetch_html(self, url: str) -> str:
"""获取单个URL的HTML内容"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
try:
logging.info(f"开始获取URL: {url}")
response = self.session.get(
url,
timeout=self.timeout,
headers=headers,
verify=False
)
response.raise_for_status()
# 检查响应内容类型
content_type = response.headers.get('content-type', '')
if 'text/html' not in content_type.lower():
logging.warning(f"非HTML响应: {content_type}")
# 设置正确的编码
response.encoding = response.apparent_encoding
html = response.text
logging.info(f"成功获取HTML,长度: {len(html)}")
return html
except requests.Timeout:
logging.error(f"获取URL超时: {url}")
except requests.RequestException as e:
logging.error(f"获取URL失败 {url}: {str(e)}")
except Exception as e:
logging.error(f"未预期的错误 {url}: {str(e)}")
return ""
def fetch_multiple_html(self, urls: List[str], max_urls: int = 10) -> List[Dict]:
"""
并行获取多个URL的HTML内容
Args:
urls: URL列表
max_urls: 最大获取数量
Returns:
List[Dict]: 包含成功获取的HTML内容列表
"""
results = []
urls = urls[:max_urls] # 只处理前max_urls个URL
with ThreadPoolExecutor(max_workers=max_urls) as executor:
# 提交所有任务
future_to_url = {
executor.submit(self.fetch_html, url): url
for url in urls
}
# 处理完成的任务
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
html = future.result()
if html: # 只添加成功获取的结果
results.append({
'url': url,
'html': html,
'metadata': self.extract_metadata(html)
})
except Exception as e:
print(f"处理URL失败 {url}: {e}")
return results
def extract_main_content(self, html: str) -> str:
"""提取HTML中的主要内容"""
if not html:
logging.warning("输入的HTML为空")
return ""
try:
soup = BeautifulSoup(html, 'html.parser')
# 移除脚本和样式元素
for script in soup(["script", "style", "iframe", "nav", "footer", "header"]):
script.decompose()
# 记录原始长度
original_length = len(html)
# 尝试找到主要内容容器
main_content = None
possible_content_ids = ['content', 'main', 'article', 'post']
possible_content_classes = ['content', 'article', 'post', 'main-content']
# 按ID查找
for content_id in possible_content_ids:
main_content = soup.find(id=content_id)
if main_content:
break
# 按class查找
if not main_content:
for content_class in possible_content_classes:
main_content = soup.find(class_=content_class)
if main_content:
break
# 如果找不到特定容器,使用全文
text = main_content.get_text() if main_content else soup.get_text()
# 清理文本
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
# 记录处理后长度
processed_length = len(text)
# 添加日志
logging.info(f"HTML处理前长度: {original_length}, 处理后长度: {processed_length}")
# 如果处理后文本太短,可能是提取失败
if processed_length < 100 and original_length > 1000:
logging.warning(f"提取的内容异常短: {processed_length} 字符")
return ""
return text
except Exception as e:
logging.error(f"提取主要内容时出错: {str(e)}")
return ""
def extract_metadata(self, html: str) -> dict:
"""提取HTML中的元数据"""
try:
soup = BeautifulSoup(html, 'html.parser')
metadata = {
'title': '',
'description': '',
'keywords': ''
}
# 更安全的标题提取
title = ''
if soup.title and soup.title.string:
title = soup.title.string.strip()
else:
# 尝试从h1标签提取标题
h1 = soup.find('h1')
if h1:
title = h1.get_text().strip()
# 如果还是没有标题,使用默认值
metadata['title'] = title if title else "未知标题"
# 提取meta描述
meta_desc = soup.find('meta', attrs={'name': ['description', 'Description']})
if meta_desc:
metadata['description'] = meta_desc.get('content', '').strip()
# 提取meta关键词
meta_keywords = soup.find('meta', attrs={'name': ['keywords', 'Keywords']})
if meta_keywords:
metadata['keywords'] = meta_keywords.get('content', '').strip()
# 确保所有字段都有值
metadata = {k: v if v else '未知' for k, v in metadata.items()}
return metadata
except Exception as e:
logging.error(f"提取元数据时出错: {str(e)}")
return {
'title': '未知标题',
'description': '未知描述',
'keywords': '未知关键词'
} |