File size: 19,955 Bytes
315962b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
import gradio as gr
import yt_dlp
import os
import re
import json
from pathlib import Path
import tempfile
import shutil
from urllib.parse import urlparse, parse_qs
import threading
from concurrent.futures import ThreadPoolExecutor
import time

SUPPORTED_PLATFORMS = {
    "抖音": r'(https?://)?(v\.douyin\.com|www\.douyin\.com)',
    "快手": r'(https?://)?(v\.kuaishou\.com|www\.kuaishou\.com)',
    "哔哩哔哩": r'(https?://)?(www\.bilibili\.com|b23\.tv)',
    "YouTube": r'(https?://)?(www\.youtube\.com|youtu\.be)',
    "小红书": r'(https?://)?(www\.xiaohongshu\.com|xhslink\.com)',
    "微博": r'(https?://)?(weibo\.com|t\.cn)',
    "西瓜视频": r'(https?://)?(www\.ixigua\.com)',
    "腾讯视频": r'(https?://)?(v\.qq\.com)'
}

def get_platform_from_url(url):
    """
    自动识别URL所属平台
    """
    if not url:
        return None
    
    for platform, pattern in SUPPORTED_PLATFORMS.items():
        if re.search(pattern, url):
            return platform
    return None

def get_platform_config(url, format_id=None):
    """
    根据URL返回对的配置
    """
    platform = get_platform_from_url(url)
    if not platform:
        return None
        
    # 基础配置
    base_config = {
        'format': format_id if format_id else 'best',
        'merge_output_format': 'mp4',
        # 网络相关设置
        'socket_timeout': 10,  # 减少超时时间
        'retries': 2,  # 减少重试次数
        'fragment_retries': 2,
        'retry_sleep': 2,  # 减少重试等待时间
        'concurrent_fragment_downloads': 8,
    }
    
    configs = {
        "抖音": {
            **base_config,
            'format': format_id if format_id else 'best',
        },
        "快手": {
            **base_config,
            'format': format_id if format_id else 'best',
        },
        "哔哩哔哩": {
            **base_config,
            'format': format_id if format_id else 'bestvideo+bestaudio/best',
            # B站特定设置
            'concurrent_fragment_downloads': 16,
            'file_access_retries': 2,
            'extractor_retries': 2,
            'fragment_retries': 2,
            'retry_sleep': 2,
        },
        "YouTube": {
            **base_config,
            'format': format_id if format_id else 'bestvideo+bestaudio/best',
        },
        "小红书": {
            **base_config,
            'format': format_id if format_id else 'best',
        },
        "微博": {
            **base_config,
            'format': format_id if format_id else 'best',
        },
        "西瓜视频": {
            **base_config,
            'format': format_id if format_id else 'best',
        },
        "腾讯视频": {
            **base_config,
            'format': format_id if format_id else 'best',
        }
    }
    
    return configs.get(platform)

def validate_url(url):
    """
    验证URL是否符合支持的平台格式
    """
    if not url:
        return False, "请输入视频链接"
    
    platform = get_platform_from_url(url)
    if not platform:
        return False, "不支持的平台或链接格式不正确"
    
    return True, f"识别为{platform}平台"

def format_filesize(bytes):
    """
    格式化文件大小显示
    """
    if not bytes:
        return "未知大小"
    
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes < 1024:
            return f"{bytes:.1f} {unit}"
        bytes /= 1024
    return f"{bytes:.1f} TB"

def parse_video_info(url):
    """
    解析视频信息
    """
    try:
        # 验证URL
        is_valid, message = validate_url(url)
        if not is_valid:
            return {"status": "error", "message": message}
        
        # 获取平台特定配置
        ydl_opts = get_platform_config(url)
        if not ydl_opts:
            return {"status": "error", "message": "不支持的平台"}
            
        ydl_opts.update({
            'quiet': True,
            'no_warnings': True,
        })
        
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=False)
            if not info:
                return {"status": "error", "message": "无法获取视频信息"}
            
            # 获取可用的格式
            formats = []
            seen_resolutions = set()  # 用于去重
            if 'formats' in info:
                # 过滤和排序格式
                video_formats = []
                for f in info['formats']:
                    # 过滤音频格式和没有视频编码的格式
                    if f.get('vcodec') == 'none' or not f.get('vcodec'):
                        continue
                    
                    # 获取分辨率
                    width = f.get('width', 0)
                    height = f.get('height', 0)
                    resolution = f.get('resolution', 'unknown')
                    if width and height:
                        resolution = f"{width}x{height}"
                    
                    # 获取格式说明
                    format_note = f.get('format_note', '')
                    if not format_note and resolution != 'unknown':
                        if height:
                            format_note = f"{height}p"
                    
                    # 创建唯一标识用于去重
                    resolution_key = f"{height}_{width}" if height and width else resolution
                    
                    # 如果这个分辨率已经存在,跳过
                    if resolution_key in seen_resolutions:
                        continue
                    seen_resolutions.add(resolution_key)
                    
                    # 创建格式信息
                    format_info = {
                        'format_id': f.get('format_id', ''),
                        'ext': f.get('ext', ''),
                        'resolution': resolution,
                        'format_note': format_note,
                        'quality': height or 0  # 用于排序
                    }
                    video_formats.append(format_info)
                
                # 按质量排序
                video_formats.sort(key=lambda x: x['quality'], reverse=True)
                formats = video_formats
            
            # 获取��览图
            thumbnail = info.get('thumbnail', '')
            if not thumbnail and 'thumbnails' in info:
                thumbnails = info['thumbnails']
                if thumbnails:
                    thumbnail = thumbnails[-1]['url']
            
            platform = get_platform_from_url(url)
            return {
                "status": "success",
                "message": "解析成功",
                "platform": platform,
                "title": info.get('title', '未知标题'),
                "duration": info.get('duration', 0),
                "formats": formats,
                "thumbnail": thumbnail,
                "description": info.get('description', ''),
                "webpage_url": info.get('webpage_url', url),
            }
            
    except Exception as e:
        return {"status": "error", "message": f"解析失败: {str(e)}"}

class DownloadProgress:
    def __init__(self):
        self.progress = 0
        self.status = "准备下载"
        self.lock = threading.Lock()
    
    def update(self, d):
        with self.lock:
            if d.get('status') == 'downloading':
                total = d.get('total_bytes')
                downloaded = d.get('downloaded_bytes')
                if total and downloaded:
                    self.progress = (downloaded / total) * 100
                self.status = f"下载中: {d.get('_percent_str', '0%')} of {d.get('_total_bytes_str', 'unknown')}"
            elif d.get('status') == 'finished':
                self.progress = 100
                self.status = "下载完成,正在处理..."

def get_downloads_dir():
    """
    获取用户的下载目录
    """
    # 获取用户主目录
    home = str(Path.home())
    # 获取下载目录
    downloads_dir = os.path.join(home, "Downloads")
    # 如果下载目录不存在,则创建
    if not os.path.exists(downloads_dir):
        downloads_dir = home
    return downloads_dir

def clean_filename(title, platform):
    """
    清理并格式化文件名
    """
    # 移除非法字符
    illegal_chars = r'[<>:"/\\|?*\n\r\t]'
    clean_title = re.sub(illegal_chars, '', title)
    
    # 移除多余的空格和特殊符号
    clean_title = re.sub(r'\s+', ' ', clean_title).strip()
    clean_title = re.sub(r'[,.,。!!@#$%^&*()()+=\[\]{};:]+', '', clean_title)
    
    # 移除表情符号
    clean_title = re.sub(r'[\U0001F300-\U0001F9FF]', '', clean_title)
    
    # 添加平台标识
    platform_suffix = {
        "抖音": "抖音",
        "快手": "快手",
        "哔哩哔哩": "B站",
        "YouTube": "YT",
        "小红书": "XHS",
        "微博": "微博",
        "西瓜视频": "西瓜",
        "腾讯视频": "腾讯"
    }
    
    # 限制标题长度(考虑到平台标识的长度)
    max_length = 50
    if len(clean_title) > max_length:
        clean_title = clean_title[:max_length-3] + '...'
    
    # 添加时间戳和平台标识
    timestamp = time.strftime("%Y%m%d", time.localtime())
    suffix = platform_suffix.get(platform, "视频")
    
    # 最终文件名格式:标题_时间_平台.mp4
    final_name = f"{clean_title}_{timestamp}_{suffix}"
    
    return final_name

def download_single_video(url, format_id, progress_tracker):
    """
    下载单个视频
    """
    try:
        # 创建临时目录
        temp_dir = tempfile.mkdtemp()
        
        # 获取平台信息
        platform = get_platform_from_url(url)
        if not platform:
            shutil.rmtree(temp_dir, ignore_errors=True)
            return {"status": "error", "message": "不支持的平台"}
        
        # 获取视频信息
        with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
            info = ydl.extract_info(url, download=False)
            # 清理并格式化文件名
            clean_title = clean_filename(info.get('title', 'video'), platform)
        
        ydl_opts = get_platform_config(url, format_id)
        if not ydl_opts:
            shutil.rmtree(temp_dir, ignore_errors=True)
            return {"status": "error", "message": "不支持的平台"}
        
        # 更新下载配置
        ydl_opts.update({
            'quiet': False,
            'no_warnings': False,
            'extract_flat': False,
            'paths': {'home': temp_dir},
            'progress_hooks': [progress_tracker.update],
            'outtmpl': clean_title + '.%(ext)s',  # 不使用绝对路径
            'ignoreerrors': True,  # 忽略部分错误继续下载
            'noprogress': False,  # 显示进度
            'continuedl': True,  # 支持断点续传
            'retries': float('inf'),  # 无限重试
            'fragment_retries': float('inf'),  # 片段无限重试
            'skip_unavailable_fragments': True,  # 跳过不可用片段
            'no_abort_on_error': True,  # 发生错误时不中止
        })
        
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            try:
                info = ydl.extract_info(url, download=True)
                if 'requested_downloads' in info:
                    file_path = info['requested_downloads'][0]['filepath']
                else:
                    file_path = os.path.join(temp_dir, f"{clean_title}.mp4")
                
                if os.path.exists(file_path):
                    # 检查文件大小
                    file_size = os.path.getsize(file_path)
                    if file_size == 0:
                        shutil.rmtree(temp_dir, ignore_errors=True)
                        return {"status": "error", "message": "下载的文件大小为0,可能下载失败"}
                    
                    # 创建一个新的临时文件
                    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
                    temp_file.close()
                    shutil.copy2(file_path, temp_file.name)
                    
                    # 清理原始临时目录
                    shutil.rmtree(temp_dir, ignore_errors=True)
                    
                    return {
                        "status": "success",
                        "file_path": temp_file.name,
                        "title": clean_title,
                        "ext": "mp4"
                    }
                else:
                    shutil.rmtree(temp_dir, ignore_errors=True)
                    return {"status": "error", "message": "下载文件不存在"}
            except Exception as e:
                error_msg = str(e)
                # 如果是超时错误且进度不为0,继续下载
                if ("timed out" in error_msg or "timeout" in error_msg) and progress_tracker.progress > 0:
                    return {
                        "status": "success",
                        "file_path": file_path if 'file_path' in locals() else None,
                        "title": clean_title,
                        "ext": "mp4"
                    }
                shutil.rmtree(temp_dir, ignore_errors=True)
                return {"status": "error", "message": f"下载过程中出错: {error_msg}"}
                
    except Exception as e:
        if 'temp_dir' in locals():
            shutil.rmtree(temp_dir, ignore_errors=True)
        return {"status": "error", "message": str(e)}

def download_video(urls, format_id=None):
    """
    下载视频并返回文件
    """
    if isinstance(urls, str):
        urls = [url.strip() for url in urls.split('\n') if url.strip()]
    
    if not urls:
        return "请输入至少一个视频链接", None, 0, "未开始下载"
    
    progress_tracker = DownloadProgress()
    result = download_single_video(urls[0], format_id, progress_tracker)
    
    if result["status"] == "success":
        try:
            # 返回文件路径供Gradio处理下载
            return "下载成功,正在传输...", result["file_path"], 100, "下载完成"
        except Exception as e:
            return f"文件处理失败: {str(e)}", None, 0, "下载失败"
    else:
        return f"下载失败: {result.get('message', '未知错误')}", None, 0, "下载失败"

# 创建Gradio界面
with gr.Blocks(title="视频下载工具", theme=gr.themes.Soft()) as demo:
    # 存储视频信息的状态变量
    video_info_state = gr.State({})
    
    with gr.Column(elem_id="header"):
        gr.Markdown("""
        # 🎥 视频下载工具
        
        一键下载各大平台视频,支持以下平台:
        """)
        
        with gr.Row():
            for platform in SUPPORTED_PLATFORMS.keys():
                gr.Markdown(f"<span class='platform-badge'>{platform}</span>", elem_classes="platform")
    
    with gr.Row():
        with gr.Column(scale=2):
            # 输入部分
            url_input = gr.Textbox(
                label="视频链接",
                placeholder="请输入视频链接,支持批量下载(每行一个链接)...",
                lines=3,
                info="支持多个平台的视频链接,自动识别平台类型"
            )
            parse_btn = gr.Button("解析视频", variant="secondary", size="lg")
            
            # 视频信息显示(使用Accordion组件)
            with gr.Accordion("视频详细信息", open=False, visible=False) as video_info_accordion:
                video_info = gr.JSON(show_label=False)
            
            format_choice = gr.Dropdown(
                label="选择清晰度",
                choices=[],
                interactive=True,
                visible=False
            )
            
            download_btn = gr.Button("开始下载", variant="primary", size="lg", interactive=False)
            
        with gr.Column(scale=3):
            # 预览和输出部分
            with gr.Row():
                preview_image = gr.Image(label="视频预览", visible=False)
            with gr.Row():
                progress = gr.Slider(
                    minimum=0,
                    maximum=100,
                    value=0,
                    label="下载进度",
                    interactive=False
                )
                status = gr.Textbox(
                    label="状态信息",
                    value="等待开始下载...",
                    interactive=False
                )
            # 使用File组件来处理下载
            output_file = gr.File(label="下载文件")
    
    # 添加自定义CSS
    gr.Markdown("""
    <style>
    #header {
        text-align: center;
        margin-bottom: 2rem;
    }
    .platform-badge {
        display: inline-block;
        padding: 0.5rem 1rem;
        margin: 0.5rem;
        border-radius: 2rem;
        background-color: #2196F3;
        color: white;
        font-weight: bold;
    }
    .gradio-container {
        max-width: 1200px !important;
    }
    .contain {
        margin: 0 auto;
        padding: 2rem;
    }
    .download-link {
        display: inline-block;
        padding: 0.8rem 1.5rem;
        background-color: #4CAF50;
        color: white;
        text-decoration: none;
        border-radius: 0.5rem;
        margin-top: 1rem;
        font-weight: bold;
        transition: background-color 0.3s;
    }
    .download-link:hover {
        background-color: #45a049;
    }
    </style>
    """)
    
    def update_video_info(url):
        """更新视频信息"""
        # 只解析第一个链接
        first_url = url.split('\n')[0].strip()
        info = parse_video_info(first_url)
        
        if info["status"] == "success":
            # 准备清晰度选项
            format_choices = []
            for fmt in info["formats"]:
                # 构建格式标签
                label_parts = []
                if fmt['format_note']:
                    label_parts.append(fmt['format_note'])
                if fmt['resolution'] != 'unknown':
                    label_parts.append(fmt['resolution'])
                
                label = " - ".join(filter(None, label_parts))
                if not label:
                    label = f"格式 {fmt['format_id']}"
                
                format_choices.append((label, fmt['format_id']))
            
            return [
                gr.update(visible=True, value=info),  # video_info
                gr.update(visible=True, choices=format_choices, value=format_choices[0][1] if format_choices else None),  # format_choice
                gr.update(interactive=True),  # download_btn
                gr.update(visible=True, value=info["thumbnail"]),  # preview_image
                f"解析成功: {info['title']} ({info['platform']})",  # status
                gr.update(visible=True)  # video_info_accordion
            ]
        else:
            return [
                gr.update(visible=False),  # video_info
                gr.update(visible=False),  # format_choice
                gr.update(interactive=False),  # download_btn
                gr.update(visible=False),  # preview_image
                info["message"],  # status
                gr.update(visible=False)  # video_info_accordion
            ]
    
    # 绑定解析按钮事件
    parse_btn.click(
        fn=update_video_info,
        inputs=[url_input],
        outputs=[video_info, format_choice, download_btn, preview_image, status, video_info_accordion]
    )
    
    # 绑定下载按钮事件
    download_btn.click(
        fn=download_video,
        inputs=[url_input, format_choice],
        outputs=[status, output_file, progress, status]
    )

# 启动应用
if __name__ == "__main__":
    demo.launch()