File size: 7,059 Bytes
bd255d8
1
import os import json import asyncio import random import re from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode import nest_asyncio import signal  nest_asyncio.apply()  # 常量 OUTPUT_FILE_BASE = "python_code_results.json" MAX_CONCURRENT = 5 REQUEST_DELAY = (0.5, 1.5)  # CrawlerState 类 class CrawlerState:     def __init__(self):         self.visited = set()         self.queue = asyncio.Queue()         self.sem = asyncio.Semaphore(MAX_CONCURRENT)         self.results = []         self.active_tasks = 0         self.lock = asyncio.Lock()         self.interrupted = False  # Add an interruption flag      def handle_interrupt(self):         self.interrupted = True  # Set the flag         print("\n🚨 爬取中断,正在保存已爬取的内容...")   # 获取唯一文件名 def get_unique_filename():     counter = 1     while True:         filename = f"{os.path.splitext(OUTPUT_FILE_BASE)[0]}_{counter}.json"         if not os.path.exists(filename):             return filename         counter += 1.   # 检查是否为内部链接 def is_internal_link(base_url, href):     base_parts = urlparse(base_url)     href_parts = urlparse(href)     if not href_parts.netloc:         return True     return href_parts.netloc == base_parts.netloc   # 抓取页面 async def fetch_page(url, crawler, sem):     async with sem:         try:             await asyncio.sleep(random.uniform(*REQUEST_DELAY))             run_cfg = CrawlerRunConfig(                 cache_mode=CacheMode.BYPASS,                 page_timeout=30000  # 30秒超时(单位:毫秒)             )             result = await crawler.arun(url, config=run_cfg)             return result.cleaned_html if result.success else None         except (asyncio.TimeoutError, ConnectionError) as e:             print(f"请求异常 {url}: {str(e)}")             return None   # 提取链接 def extract_links(html, base_url, allowed_patterns=None):     soup = BeautifulSoup(html, 'html.parser')     links = set()     for tag in soup.find_all(['a', 'link'], href=True):         href = tag['href']         full_url = urljoin(base_url, href)         if is_internal_link(base_url, full_url):             if allowed_patterns:                 for pattern in allowed_patterns:                     if re.match(pattern, full_url):                         links.add(full_url)                         break  # Only add if it matches any pattern             else:                 links.add(full_url)     return links   # 检测 Python 代码 def detect_python_code(text):     keywords = {'def', 'class', 'import', 'from', 'try', 'except', 'with', 'async'}     return any(kw in line for line in text.split('\n') for kw in keywords)   # 处理 URL async def process_url(url, crawler, state, allowed_patterns=None):     async with state.sem:         async with state.lock:             if url in state.visited or state.interrupted:  # Check interrupt flag                 return             state.visited.add(url)             state.active_tasks += 1         try:             print(f"正在爬取: {url}")             html = await fetch_page(url, crawler, state.sem)             if not html:                 return             code_blocks = extract_code_blocks(html)             if code_blocks:                 async with state.lock:                     state.results.append({                         'url': url,                         'code_blocks': code_blocks                     })                     print(f"✅ 发现 {len(code_blocks)} 个代码块")             new_links = extract_links(html, url, allowed_patterns)             for link in new_links:                 async with state.lock:                     if link not in state.visited and not state.interrupted:  # Check interrupt flag                         await state.queue.put(link)         except Exception as e:             print(f"处理 {url} 发生异常: {e}")         finally:             async with state.lock:                 state.active_tasks -= 1   # 提取代码块 def extract_code_blocks(html):     soup = BeautifulSoup(html, 'html.parser')     blocks = []     for pre in soup.find_all('pre'):         code = pre.get_text().strip()         if detect_python_code(code):             blocks.append({                 'code': code,                 'context': get_code_context(pre)             })     for code_tag in soup.find_all('code'):         code = code_tag.get_text().strip()         if code and detect_python_code(code):             blocks.append({                 'code': code,                 'context': get_code_context(code_tag)             })     return blocks   # 获取代码上下文 def get_code_context(element):     context = {}     for h in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:         header = element.find_previous(h)         if header:             context['header'] = header.get_text().strip()             break     parent = element.find_parent(['div', 'section', 'article'])     if parent and 'id' in parent.attrs:         context['container_id'] = parent['id']     return context   # 工作线程 async def worker(crawler, state):     while True:         try:             url = await asyncio.wait_for(state.queue.get(), timeout=5)             await process_url(url, crawler, state)             state.queue.task_done()         except (asyncio.TimeoutError, asyncio.CancelledError):             if state.active_tasks == 0 and state.queue.empty():                 break   # 爬取网站 async def crawl_website(base_url, allowed_patterns=None):     state = CrawlerState()     signal.signal(signal.SIGINT, lambda signum, frame: state.handle_interrupt()) # 注册 Ctrl+C 信号处理程序     await state.queue.put(base_url)     # 移除 timeout 参数     browser_cfg = BrowserConfig(headless=True)     try:         async with AsyncWebCrawler(config=browser_cfg) as crawler:             workers = [asyncio.create_task(worker(crawler, state)) for _ in range(MAX_CONCURRENT)]             await state.queue.join()             for task in workers:                 task.cancel()             await asyncio.gather(*workers, return_exceptions=True)     except asyncio.CancelledError:         print("爬取任务被取消")     except Exception as e:         print(f"爬取过程中发生异常: {e}")     finally:         output_file = get_unique_filename()         with open(output_file, 'w', encoding='utf-8') as f:             json.dump(state.results, f, indent=2, ensure_ascii=False)         print(f"\n✅ 爬取完成!访问页面数:{len(state.visited)}")         print(f"📁 结果保存至:{os.path.abspath(output_file)}")   # 主函数 if __name__ == "__main__":     target_url = input("请输入目标网站URL: ")     # 处理通配符     if "*/" in target_url:         base_url = target_url.split("*")[0]         allowed_patterns = [re.escape(target_url.replace("*", ".*"))]         asyncio.run(crawl_website(base_url, allowed_patterns))     else:         asyncio.run(crawl_website(target_url))