File size: 7,345 Bytes
7cc8bc0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import time
import logging

class HTMLProcessor:
    def __init__(self, timeout: int = 5):
        self.session = requests.Session()
        self.timeout = timeout
        
    def fetch_html(self, url: str) -> str:
        """获取单个URL的HTML内容"""
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive'
        }
        
        try:
            logging.info(f"开始获取URL: {url}")
            response = self.session.get(
                url, 
                timeout=self.timeout,
                headers=headers,
                verify=False
            )
            response.raise_for_status()
            
            # 检查响应内容类型
            content_type = response.headers.get('content-type', '')
            if 'text/html' not in content_type.lower():
                logging.warning(f"非HTML响应: {content_type}")
                
            # 设置正确的编码
            response.encoding = response.apparent_encoding
            
            html = response.text
            logging.info(f"成功获取HTML,长度: {len(html)}")
            
            return html
            
        except requests.Timeout:
            logging.error(f"获取URL超时: {url}")
        except requests.RequestException as e:
            logging.error(f"获取URL失败 {url}: {str(e)}")
        except Exception as e:
            logging.error(f"未预期的错误 {url}: {str(e)}")
        
        return ""
    
    def fetch_multiple_html(self, urls: List[str], max_urls: int = 10) -> List[Dict]:
        """
        并行获取多个URL的HTML内容
        
        Args:
            urls: URL列表
            max_urls: 最大获取数量
            
        Returns:
            List[Dict]: 包含成功获取的HTML内容列表
        """
        results = []
        urls = urls[:max_urls]  # 只处理前max_urls个URL
        
        with ThreadPoolExecutor(max_workers=max_urls) as executor:
            # 提交所有任务
            future_to_url = {
                executor.submit(self.fetch_html, url): url 
                for url in urls
            }
            
            # 处理完成的任务
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    html = future.result()
                    if html:  # 只添加成功获取的结果
                        results.append({
                            'url': url,
                            'html': html,
                            'metadata': self.extract_metadata(html)
                        })
                except Exception as e:
                    print(f"处理URL失败 {url}: {e}")
                    
        return results
        
    def extract_main_content(self, html: str) -> str:
        """提取HTML中的主要内容"""
        if not html:
            logging.warning("输入的HTML为空")
            return ""
        
        try:
            soup = BeautifulSoup(html, 'html.parser')
            
            # 移除脚本和样式元素
            for script in soup(["script", "style", "iframe", "nav", "footer", "header"]):
                script.decompose()
                
            # 记录原始长度
            original_length = len(html)
            
            # 尝试找到主要内容容器
            main_content = None
            possible_content_ids = ['content', 'main', 'article', 'post']
            possible_content_classes = ['content', 'article', 'post', 'main-content']
            
            # 按ID查找
            for content_id in possible_content_ids:
                main_content = soup.find(id=content_id)
                if main_content:
                    break
                
            # 按class查找
            if not main_content:
                for content_class in possible_content_classes:
                    main_content = soup.find(class_=content_class)
                    if main_content:
                        break
            
            # 如果找不到特定容器,使用全文
            text = main_content.get_text() if main_content else soup.get_text()
            
            # 清理文本
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = '\n'.join(chunk for chunk in chunks if chunk)
            
            # 记录处理后长度
            processed_length = len(text)
            
            # 添加日志
            logging.info(f"HTML处理前长度: {original_length}, 处理后长度: {processed_length}")
            
            # 如果处理后文本太短,可能是提取失败
            if processed_length < 100 and original_length > 1000:
                logging.warning(f"提取的内容异常短: {processed_length} 字符")
                return ""
            
            return text
        
        except Exception as e:
            logging.error(f"提取主要内容时出错: {str(e)}")
            return ""
        
    def extract_metadata(self, html: str) -> dict:
        """提取HTML中的元数据"""
        try:
            soup = BeautifulSoup(html, 'html.parser')
            metadata = {
                'title': '',
                'description': '',
                'keywords': ''
            }
            
            # 更安全的标题提取
            title = ''
            if soup.title and soup.title.string:
                title = soup.title.string.strip()
            else:
                # 尝试从h1标签提取标题
                h1 = soup.find('h1')
                if h1:
                    title = h1.get_text().strip()
                    
            # 如果还是没有标题,使用默认值
            metadata['title'] = title if title else "未知标题"
                
            # 提取meta描述
            meta_desc = soup.find('meta', attrs={'name': ['description', 'Description']})
            if meta_desc:
                metadata['description'] = meta_desc.get('content', '').strip()
                
            # 提取meta关键词
            meta_keywords = soup.find('meta', attrs={'name': ['keywords', 'Keywords']})
            if meta_keywords:
                metadata['keywords'] = meta_keywords.get('content', '').strip()
                
            # 确保所有字段都有值
            metadata = {k: v if v else '未知' for k, v in metadata.items()}
                
            return metadata
            
        except Exception as e:
            logging.error(f"提取元数据时出错: {str(e)}")
            return {
                'title': '未知标题',
                'description': '未知描述',
                'keywords': '未知关键词'
            }