import gradio as gr import yt_dlp import os import re import json from pathlib import Path import tempfile import shutil from urllib.parse import urlparse, parse_qs import threading from concurrent.futures import ThreadPoolExecutor import time SUPPORTED_PLATFORMS = { "抖音": r'(https?://)?(v\.douyin\.com|www\.douyin\.com)', "快手": r'(https?://)?(v\.kuaishou\.com|www\.kuaishou\.com)', "哔哩哔哩": r'(https?://)?(www\.bilibili\.com|b23\.tv)', "YouTube": r'(https?://)?(www\.youtube\.com|youtu\.be)', "小红书": r'(https?://)?(www\.xiaohongshu\.com|xhslink\.com)', "微博": r'(https?://)?(weibo\.com|t\.cn)', "西瓜视频": r'(https?://)?(www\.ixigua\.com)', "腾讯视频": r'(https?://)?(v\.qq\.com)' } def get_platform_from_url(url): """ 自动识别URL所属平台 """ if not url: return None for platform, pattern in SUPPORTED_PLATFORMS.items(): if re.search(pattern, url): return platform return None def get_platform_config(url, format_id=None): """ 根据URL返回对的配置 """ platform = get_platform_from_url(url) if not platform: return None # 基础配置 base_config = { 'format': format_id if format_id else 'best', 'merge_output_format': 'mp4', # 网络相关设置 'socket_timeout': 10, # 减少超时时间 'retries': 2, # 减少重试次数 'fragment_retries': 2, 'retry_sleep': 2, # 减少重试等待时间 'concurrent_fragment_downloads': 8, } configs = { "抖音": { **base_config, 'format': format_id if format_id else 'best', }, "快手": { **base_config, 'format': format_id if format_id else 'best', }, "哔哩哔哩": { **base_config, 'format': format_id if format_id else 'bestvideo+bestaudio/best', # B站特定设置 'concurrent_fragment_downloads': 16, 'file_access_retries': 2, 'extractor_retries': 2, 'fragment_retries': 2, 'retry_sleep': 2, }, "YouTube": { **base_config, 'format': format_id if format_id else 'bestvideo+bestaudio/best', }, "小红书": { **base_config, 'format': format_id if format_id else 'best', }, "微博": { **base_config, 'format': format_id if format_id else 'best', }, "西瓜视频": { **base_config, 'format': format_id if format_id else 'best', }, "腾讯视频": { **base_config, 'format': format_id if format_id else 'best', } } return configs.get(platform) def validate_url(url): """ 验证URL是否符合支持的平台格式 """ if not url: return False, "请输入视频链接" platform = get_platform_from_url(url) if not platform: return False, "不支持的平台或链接格式不正确" return True, f"识别为{platform}平台" def format_filesize(bytes): """ 格式化文件大小显示 """ if not bytes: return "未知大小" for unit in ['B', 'KB', 'MB', 'GB']: if bytes < 1024: return f"{bytes:.1f} {unit}" bytes /= 1024 return f"{bytes:.1f} TB" def parse_video_info(url): """ 解析视频信息 """ try: # 验证URL is_valid, message = validate_url(url) if not is_valid: return {"status": "error", "message": message} # 获取平台特定配置 ydl_opts = get_platform_config(url) if not ydl_opts: return {"status": "error", "message": "不支持的平台"} ydl_opts.update({ 'quiet': True, 'no_warnings': True, }) with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if not info: return {"status": "error", "message": "无法获取视频信息"} # 获取可用的格式 formats = [] seen_resolutions = set() # 用于去重 if 'formats' in info: # 过滤和排序格式 video_formats = [] for f in info['formats']: # 过滤音频格式和没有视频编码的格式 if f.get('vcodec') == 'none' or not f.get('vcodec'): continue # 获取分辨率 width = f.get('width', 0) height = f.get('height', 0) resolution = f.get('resolution', 'unknown') if width and height: resolution = f"{width}x{height}" # 获取格式说明 format_note = f.get('format_note', '') if not format_note and resolution != 'unknown': if height: format_note = f"{height}p" # 创建唯一标识用于去重 resolution_key = f"{height}_{width}" if height and width else resolution # 如果这个分辨率已经存在,跳过 if resolution_key in seen_resolutions: continue seen_resolutions.add(resolution_key) # 创建格式信息 format_info = { 'format_id': f.get('format_id', ''), 'ext': f.get('ext', ''), 'resolution': resolution, 'format_note': format_note, 'quality': height or 0 # 用于排序 } video_formats.append(format_info) # 按质量排序 video_formats.sort(key=lambda x: x['quality'], reverse=True) formats = video_formats # 获取��览图 thumbnail = info.get('thumbnail', '') if not thumbnail and 'thumbnails' in info: thumbnails = info['thumbnails'] if thumbnails: thumbnail = thumbnails[-1]['url'] platform = get_platform_from_url(url) return { "status": "success", "message": "解析成功", "platform": platform, "title": info.get('title', '未知标题'), "duration": info.get('duration', 0), "formats": formats, "thumbnail": thumbnail, "description": info.get('description', ''), "webpage_url": info.get('webpage_url', url), } except Exception as e: return {"status": "error", "message": f"解析失败: {str(e)}"} class DownloadProgress: def __init__(self): self.progress = 0 self.status = "准备下载" self.lock = threading.Lock() def update(self, d): with self.lock: if d.get('status') == 'downloading': total = d.get('total_bytes') downloaded = d.get('downloaded_bytes') if total and downloaded: self.progress = (downloaded / total) * 100 self.status = f"下载中: {d.get('_percent_str', '0%')} of {d.get('_total_bytes_str', 'unknown')}" elif d.get('status') == 'finished': self.progress = 100 self.status = "下载完成,正在处理..." def get_downloads_dir(): """ 获取用户的下载目录 """ # 获取用户主目录 home = str(Path.home()) # 获取下载目录 downloads_dir = os.path.join(home, "Downloads") # 如果下载目录不存在,则创建 if not os.path.exists(downloads_dir): downloads_dir = home return downloads_dir def clean_filename(title, platform): """ 清理并格式化文件名 """ # 移除非法字符 illegal_chars = r'[<>:"/\\|?*\n\r\t]' clean_title = re.sub(illegal_chars, '', title) # 移除多余的空格和特殊符号 clean_title = re.sub(r'\s+', ' ', clean_title).strip() clean_title = re.sub(r'[,.,。!!@#$%^&*()()+=\[\]{};:]+', '', clean_title) # 移除表情符号 clean_title = re.sub(r'[\U0001F300-\U0001F9FF]', '', clean_title) # 添加平台标识 platform_suffix = { "抖音": "抖音", "快手": "快手", "哔哩哔哩": "B站", "YouTube": "YT", "小红书": "XHS", "微博": "微博", "西瓜视频": "西瓜", "腾讯视频": "腾讯" } # 限制标题长度(考虑到平台标识的长度) max_length = 50 if len(clean_title) > max_length: clean_title = clean_title[:max_length-3] + '...' # 添加时间戳和平台标识 timestamp = time.strftime("%Y%m%d", time.localtime()) suffix = platform_suffix.get(platform, "视频") # 最终文件名格式:标题_时间_平台.mp4 final_name = f"{clean_title}_{timestamp}_{suffix}" return final_name def download_single_video(url, format_id, progress_tracker): """ 下载单个视频 """ try: # 创建临时目录 temp_dir = tempfile.mkdtemp() # 获取平台信息 platform = get_platform_from_url(url) if not platform: shutil.rmtree(temp_dir, ignore_errors=True) return {"status": "error", "message": "不支持的平台"} # 获取视频信息 with yt_dlp.YoutubeDL({'quiet': True}) as ydl: info = ydl.extract_info(url, download=False) # 清理并格式化文件名 clean_title = clean_filename(info.get('title', 'video'), platform) ydl_opts = get_platform_config(url, format_id) if not ydl_opts: shutil.rmtree(temp_dir, ignore_errors=True) return {"status": "error", "message": "不支持的平台"} # 更新下载配置 ydl_opts.update({ 'quiet': False, 'no_warnings': False, 'extract_flat': False, 'paths': {'home': temp_dir}, 'progress_hooks': [progress_tracker.update], 'outtmpl': clean_title + '.%(ext)s', # 不使用绝对路径 'ignoreerrors': True, # 忽略部分错误继续下载 'noprogress': False, # 显示进度 'continuedl': True, # 支持断点续传 'retries': float('inf'), # 无限重试 'fragment_retries': float('inf'), # 片段无限重试 'skip_unavailable_fragments': True, # 跳过不可用片段 'no_abort_on_error': True, # 发生错误时不中止 }) with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(url, download=True) if 'requested_downloads' in info: file_path = info['requested_downloads'][0]['filepath'] else: file_path = os.path.join(temp_dir, f"{clean_title}.mp4") if os.path.exists(file_path): # 检查文件大小 file_size = os.path.getsize(file_path) if file_size == 0: shutil.rmtree(temp_dir, ignore_errors=True) return {"status": "error", "message": "下载的文件大小为0,可能下载失败"} # 创建一个新的临时文件 temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') temp_file.close() shutil.copy2(file_path, temp_file.name) # 清理原始临时目录 shutil.rmtree(temp_dir, ignore_errors=True) return { "status": "success", "file_path": temp_file.name, "title": clean_title, "ext": "mp4" } else: shutil.rmtree(temp_dir, ignore_errors=True) return {"status": "error", "message": "下载文件不存在"} except Exception as e: error_msg = str(e) # 如果是超时错误且进度不为0,继续下载 if ("timed out" in error_msg or "timeout" in error_msg) and progress_tracker.progress > 0: return { "status": "success", "file_path": file_path if 'file_path' in locals() else None, "title": clean_title, "ext": "mp4" } shutil.rmtree(temp_dir, ignore_errors=True) return {"status": "error", "message": f"下载过程中出错: {error_msg}"} except Exception as e: if 'temp_dir' in locals(): shutil.rmtree(temp_dir, ignore_errors=True) return {"status": "error", "message": str(e)} def download_video(urls, format_id=None): """ 下载视频并返回文件 """ if isinstance(urls, str): urls = [url.strip() for url in urls.split('\n') if url.strip()] if not urls: return "请输入至少一个视频链接", None, 0, "未开始下载" progress_tracker = DownloadProgress() result = download_single_video(urls[0], format_id, progress_tracker) if result["status"] == "success": try: # 返回文件路径供Gradio处理下载 return "下载成功,正在传输...", result["file_path"], 100, "下载完成" except Exception as e: return f"文件处理失败: {str(e)}", None, 0, "下载失败" else: return f"下载失败: {result.get('message', '未知错误')}", None, 0, "下载失败" # 创建Gradio界面 with gr.Blocks(title="视频下载工具", theme=gr.themes.Soft()) as demo: # 存储视频信息的状态变量 video_info_state = gr.State({}) with gr.Column(elem_id="header"): gr.Markdown(""" # 🎥 视频下载工具 一键下载各大平台视频,支持以下平台: """) with gr.Row(): for platform in SUPPORTED_PLATFORMS.keys(): gr.Markdown(f"{platform}", elem_classes="platform") with gr.Row(): with gr.Column(scale=2): # 输入部分 url_input = gr.Textbox( label="视频链接", placeholder="请输入视频链接,支持批量下载(每行一个链接)...", lines=3, info="支持多个平台的视频链接,自动识别平台类型" ) parse_btn = gr.Button("解析视频", variant="secondary", size="lg") # 视频信息显示(使用Accordion组件) with gr.Accordion("视频详细信息", open=False, visible=False) as video_info_accordion: video_info = gr.JSON(show_label=False) format_choice = gr.Dropdown( label="选择清晰度", choices=[], interactive=True, visible=False ) download_btn = gr.Button("开始下载", variant="primary", size="lg", interactive=False) with gr.Column(scale=3): # 预览和输出部分 with gr.Row(): preview_image = gr.Image(label="视频预览", visible=False) with gr.Row(): progress = gr.Slider( minimum=0, maximum=100, value=0, label="下载进度", interactive=False ) status = gr.Textbox( label="状态信息", value="等待开始下载...", interactive=False ) # 使用File组件来处理下载 output_file = gr.File(label="下载文件") # 添加自定义CSS gr.Markdown(""" """) def update_video_info(url): """更新视频信息""" # 只解析第一个链接 first_url = url.split('\n')[0].strip() info = parse_video_info(first_url) if info["status"] == "success": # 准备清晰度选项 format_choices = [] for fmt in info["formats"]: # 构建格式标签 label_parts = [] if fmt['format_note']: label_parts.append(fmt['format_note']) if fmt['resolution'] != 'unknown': label_parts.append(fmt['resolution']) label = " - ".join(filter(None, label_parts)) if not label: label = f"格式 {fmt['format_id']}" format_choices.append((label, fmt['format_id'])) return [ gr.update(visible=True, value=info), # video_info gr.update(visible=True, choices=format_choices, value=format_choices[0][1] if format_choices else None), # format_choice gr.update(interactive=True), # download_btn gr.update(visible=True, value=info["thumbnail"]), # preview_image f"解析成功: {info['title']} ({info['platform']})", # status gr.update(visible=True) # video_info_accordion ] else: return [ gr.update(visible=False), # video_info gr.update(visible=False), # format_choice gr.update(interactive=False), # download_btn gr.update(visible=False), # preview_image info["message"], # status gr.update(visible=False) # video_info_accordion ] # 绑定解析按钮事件 parse_btn.click( fn=update_video_info, inputs=[url_input], outputs=[video_info, format_choice, download_btn, preview_image, status, video_info_accordion] ) # 绑定下载按钮事件 download_btn.click( fn=download_video, inputs=[url_input, format_choice], outputs=[status, output_file, progress, status] ) # 启动应用 if __name__ == "__main__": demo.launch()