crawler-ui / prompts.txt
gewei20's picture
Add 3 files
bd255d8 verified
import os import json import asyncio import random import re from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode import nest_asyncio import signal nest_asyncio.apply() # 常量 OUTPUT_FILE_BASE = "python_code_results.json" MAX_CONCURRENT = 5 REQUEST_DELAY = (0.5, 1.5) # CrawlerState 类 class CrawlerState: def __init__(self): self.visited = set() self.queue = asyncio.Queue() self.sem = asyncio.Semaphore(MAX_CONCURRENT) self.results = [] self.active_tasks = 0 self.lock = asyncio.Lock() self.interrupted = False # Add an interruption flag def handle_interrupt(self): self.interrupted = True # Set the flag print("\n🚨 爬取中断,正在保存已爬取的内容...") # 获取唯一文件名 def get_unique_filename(): counter = 1 while True: filename = f"{os.path.splitext(OUTPUT_FILE_BASE)[0]}_{counter}.json" if not os.path.exists(filename): return filename counter += 1. # 检查是否为内部链接 def is_internal_link(base_url, href): base_parts = urlparse(base_url) href_parts = urlparse(href) if not href_parts.netloc: return True return href_parts.netloc == base_parts.netloc # 抓取页面 async def fetch_page(url, crawler, sem): async with sem: try: await asyncio.sleep(random.uniform(*REQUEST_DELAY)) run_cfg = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, page_timeout=30000 # 30秒超时(单位:毫秒) ) result = await crawler.arun(url, config=run_cfg) return result.cleaned_html if result.success else None except (asyncio.TimeoutError, ConnectionError) as e: print(f"请求异常 {url}: {str(e)}") return None # 提取链接 def extract_links(html, base_url, allowed_patterns=None): soup = BeautifulSoup(html, 'html.parser') links = set() for tag in soup.find_all(['a', 'link'], href=True): href = tag['href'] full_url = urljoin(base_url, href) if is_internal_link(base_url, full_url): if allowed_patterns: for pattern in allowed_patterns: if re.match(pattern, full_url): links.add(full_url) break # Only add if it matches any pattern else: links.add(full_url) return links # 检测 Python 代码 def detect_python_code(text): keywords = {'def', 'class', 'import', 'from', 'try', 'except', 'with', 'async'} return any(kw in line for line in text.split('\n') for kw in keywords) # 处理 URL async def process_url(url, crawler, state, allowed_patterns=None): async with state.sem: async with state.lock: if url in state.visited or state.interrupted: # Check interrupt flag return state.visited.add(url) state.active_tasks += 1 try: print(f"正在爬取: {url}") html = await fetch_page(url, crawler, state.sem) if not html: return code_blocks = extract_code_blocks(html) if code_blocks: async with state.lock: state.results.append({ 'url': url, 'code_blocks': code_blocks }) print(f"✅ 发现 {len(code_blocks)} 个代码块") new_links = extract_links(html, url, allowed_patterns) for link in new_links: async with state.lock: if link not in state.visited and not state.interrupted: # Check interrupt flag await state.queue.put(link) except Exception as e: print(f"处理 {url} 发生异常: {e}") finally: async with state.lock: state.active_tasks -= 1 # 提取代码块 def extract_code_blocks(html): soup = BeautifulSoup(html, 'html.parser') blocks = [] for pre in soup.find_all('pre'): code = pre.get_text().strip() if detect_python_code(code): blocks.append({ 'code': code, 'context': get_code_context(pre) }) for code_tag in soup.find_all('code'): code = code_tag.get_text().strip() if code and detect_python_code(code): blocks.append({ 'code': code, 'context': get_code_context(code_tag) }) return blocks # 获取代码上下文 def get_code_context(element): context = {} for h in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: header = element.find_previous(h) if header: context['header'] = header.get_text().strip() break parent = element.find_parent(['div', 'section', 'article']) if parent and 'id' in parent.attrs: context['container_id'] = parent['id'] return context # 工作线程 async def worker(crawler, state): while True: try: url = await asyncio.wait_for(state.queue.get(), timeout=5) await process_url(url, crawler, state) state.queue.task_done() except (asyncio.TimeoutError, asyncio.CancelledError): if state.active_tasks == 0 and state.queue.empty(): break # 爬取网站 async def crawl_website(base_url, allowed_patterns=None): state = CrawlerState() signal.signal(signal.SIGINT, lambda signum, frame: state.handle_interrupt()) # 注册 Ctrl+C 信号处理程序 await state.queue.put(base_url) # 移除 timeout 参数 browser_cfg = BrowserConfig(headless=True) try: async with AsyncWebCrawler(config=browser_cfg) as crawler: workers = [asyncio.create_task(worker(crawler, state)) for _ in range(MAX_CONCURRENT)] await state.queue.join() for task in workers: task.cancel() await asyncio.gather(*workers, return_exceptions=True) except asyncio.CancelledError: print("爬取任务被取消") except Exception as e: print(f"爬取过程中发生异常: {e}") finally: output_file = get_unique_filename() with open(output_file, 'w', encoding='utf-8') as f: json.dump(state.results, f, indent=2, ensure_ascii=False) print(f"\n✅ 爬取完成!访问页面数:{len(state.visited)}") print(f"📁 结果保存至:{os.path.abspath(output_file)}") # 主函数 if __name__ == "__main__": target_url = input("请输入目标网站URL: ") # 处理通配符 if "*/" in target_url: base_url = target_url.split("*")[0] allowed_patterns = [re.escape(target_url.replace("*", ".*"))] asyncio.run(crawl_website(base_url, allowed_patterns)) else: asyncio.run(crawl_website(target_url))