Spaces:
Running
Running
import os import json import asyncio import random import re from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode import nest_asyncio import signal nest_asyncio.apply() # 常量 OUTPUT_FILE_BASE = "python_code_results.json" MAX_CONCURRENT = 5 REQUEST_DELAY = (0.5, 1.5) # CrawlerState 类 class CrawlerState: def __init__(self): self.visited = set() self.queue = asyncio.Queue() self.sem = asyncio.Semaphore(MAX_CONCURRENT) self.results = [] self.active_tasks = 0 self.lock = asyncio.Lock() self.interrupted = False # Add an interruption flag def handle_interrupt(self): self.interrupted = True # Set the flag print("\n🚨 爬取中断,正在保存已爬取的内容...") # 获取唯一文件名 def get_unique_filename(): counter = 1 while True: filename = f"{os.path.splitext(OUTPUT_FILE_BASE)[0]}_{counter}.json" if not os.path.exists(filename): return filename counter += 1. # 检查是否为内部链接 def is_internal_link(base_url, href): base_parts = urlparse(base_url) href_parts = urlparse(href) if not href_parts.netloc: return True return href_parts.netloc == base_parts.netloc # 抓取页面 async def fetch_page(url, crawler, sem): async with sem: try: await asyncio.sleep(random.uniform(*REQUEST_DELAY)) run_cfg = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, page_timeout=30000 # 30秒超时(单位:毫秒) ) result = await crawler.arun(url, config=run_cfg) return result.cleaned_html if result.success else None except (asyncio.TimeoutError, ConnectionError) as e: print(f"请求异常 {url}: {str(e)}") return None # 提取链接 def extract_links(html, base_url, allowed_patterns=None): soup = BeautifulSoup(html, 'html.parser') links = set() for tag in soup.find_all(['a', 'link'], href=True): href = tag['href'] full_url = urljoin(base_url, href) if is_internal_link(base_url, full_url): if allowed_patterns: for pattern in allowed_patterns: if re.match(pattern, full_url): links.add(full_url) break # Only add if it matches any pattern else: links.add(full_url) return links # 检测 Python 代码 def detect_python_code(text): keywords = {'def', 'class', 'import', 'from', 'try', 'except', 'with', 'async'} return any(kw in line for line in text.split('\n') for kw in keywords) # 处理 URL async def process_url(url, crawler, state, allowed_patterns=None): async with state.sem: async with state.lock: if url in state.visited or state.interrupted: # Check interrupt flag return state.visited.add(url) state.active_tasks += 1 try: print(f"正在爬取: {url}") html = await fetch_page(url, crawler, state.sem) if not html: return code_blocks = extract_code_blocks(html) if code_blocks: async with state.lock: state.results.append({ 'url': url, 'code_blocks': code_blocks }) print(f"✅ 发现 {len(code_blocks)} 个代码块") new_links = extract_links(html, url, allowed_patterns) for link in new_links: async with state.lock: if link not in state.visited and not state.interrupted: # Check interrupt flag await state.queue.put(link) except Exception as e: print(f"处理 {url} 发生异常: {e}") finally: async with state.lock: state.active_tasks -= 1 # 提取代码块 def extract_code_blocks(html): soup = BeautifulSoup(html, 'html.parser') blocks = [] for pre in soup.find_all('pre'): code = pre.get_text().strip() if detect_python_code(code): blocks.append({ 'code': code, 'context': get_code_context(pre) }) for code_tag in soup.find_all('code'): code = code_tag.get_text().strip() if code and detect_python_code(code): blocks.append({ 'code': code, 'context': get_code_context(code_tag) }) return blocks # 获取代码上下文 def get_code_context(element): context = {} for h in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: header = element.find_previous(h) if header: context['header'] = header.get_text().strip() break parent = element.find_parent(['div', 'section', 'article']) if parent and 'id' in parent.attrs: context['container_id'] = parent['id'] return context # 工作线程 async def worker(crawler, state): while True: try: url = await asyncio.wait_for(state.queue.get(), timeout=5) await process_url(url, crawler, state) state.queue.task_done() except (asyncio.TimeoutError, asyncio.CancelledError): if state.active_tasks == 0 and state.queue.empty(): break # 爬取网站 async def crawl_website(base_url, allowed_patterns=None): state = CrawlerState() signal.signal(signal.SIGINT, lambda signum, frame: state.handle_interrupt()) # 注册 Ctrl+C 信号处理程序 await state.queue.put(base_url) # 移除 timeout 参数 browser_cfg = BrowserConfig(headless=True) try: async with AsyncWebCrawler(config=browser_cfg) as crawler: workers = [asyncio.create_task(worker(crawler, state)) for _ in range(MAX_CONCURRENT)] await state.queue.join() for task in workers: task.cancel() await asyncio.gather(*workers, return_exceptions=True) except asyncio.CancelledError: print("爬取任务被取消") except Exception as e: print(f"爬取过程中发生异常: {e}") finally: output_file = get_unique_filename() with open(output_file, 'w', encoding='utf-8') as f: json.dump(state.results, f, indent=2, ensure_ascii=False) print(f"\n✅ 爬取完成!访问页面数:{len(state.visited)}") print(f"📁 结果保存至:{os.path.abspath(output_file)}") # 主函数 if __name__ == "__main__": target_url = input("请输入目标网站URL: ") # 处理通配符 if "*/" in target_url: base_url = target_url.split("*")[0] allowed_patterns = [re.escape(target_url.replace("*", ".*"))] asyncio.run(crawl_website(base_url, allowed_patterns)) else: asyncio.run(crawl_website(target_url)) |