# Code based on cutechicken/whisper-webui-translate import io from contextlib import redirect_stderr from tempfile import mkdtemp from typing import List import yt_dlp from yt_dlp import YoutubeDL from yt_dlp.postprocessor import PostProcessor class FilenameCollectorPP(PostProcessor): def __init__(self): super(FilenameCollectorPP, self).__init__(None) self.filenames = [] def run(self, information): self.filenames.append(information["filepath"]) return [], information def download_url( url: str, maxDuration: int = None, destinationDirectory: str = None, playlistItems: str = "1", cookies_from_browser: str = None, ) -> tuple[bool, str, List[str]]: try: video_paths = _perform_download( url, maxDuration=maxDuration, outputTemplate=None, destinationDirectory=destinationDirectory, playlistItems=playlistItems, cookies_from_browser=cookies_from_browser, ) return True, None, video_paths except yt_dlp.utils.DownloadError as e: # In case of an OS error, try again with a different output template if e.msg and e.msg.find("[Errno 36] File name too long") >= 0: try: video_paths = _perform_download( url, maxDuration=maxDuration, outputTemplate="%(title).10s %(id)s.%(ext)s", cookies_from_browser=cookies_from_browser, ) return True, None, video_paths except Exception as retry_error: return False, str(retry_error), [] return False, str(e), [] except ExceededMaximumDuration as e: return ( False, f"Video exceeds maximum duration: {e.videoDuration}s > {e.maxDuration}s", [], ) except Exception as e: return False, str(e), [] def _perform_download( url: str, maxDuration: int = None, outputTemplate: str = None, destinationDirectory: str = None, playlistItems: str = "1", onlyAudio: bool = False, cookies_from_browser: str = None, ): # Create a temporary directory to store the downloaded files if destinationDirectory is None: destinationDirectory = mkdtemp() ydl_opts = { "format": "bestaudio/best" if onlyAudio else "worstvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/worst", "paths": {"home": destinationDirectory}, "ignoreerrors": True, } if playlistItems: ydl_opts["playlist_items"] = playlistItems if cookies_from_browser: ydl_opts["cookies_from_browser"] = cookies_from_browser # Add output template if specified if outputTemplate: ydl_opts["outtmpl"] = outputTemplate errStrIO = EventStringIO(on_write=lambda text: print(f"\033[91m{text}\033[0m")) filename_collector = FilenameCollectorPP() with redirect_stderr(errStrIO): for _ in (True,): with YoutubeDL(ydl_opts) as ydl: if maxDuration and maxDuration > 0: info = ydl.extract_info(url, download=False) if not info: break entries = "entries" in info and info["entries"] or [info] total_duration = 0 # Compute total duration for entry in entries: if entry: total_duration += float(entry["duration"]) if total_duration >= maxDuration: raise ExceededMaximumDuration( videoDuration=total_duration, maxDuration=maxDuration, message="Video is too long", ) ydl.add_post_processor(filename_collector) ydl.download([url]) errMsg = errStrIO.getvalue() errMsg = ( [text for text in errMsg.split("\n") if text.startswith("ERROR")] if errMsg else "" ) if len(filename_collector.filenames) <= 0: raise Exception( f"Cannot download {url}, " + "\n".join(errMsg) if errMsg else "" ) result = [] for filename in filename_collector.filenames: result.append(filename) print("Downloaded " + filename) return result class ExceededMaximumDuration(Exception): def __init__(self, videoDuration, maxDuration, message): self.videoDuration = videoDuration self.maxDuration = maxDuration super().__init__(message) class EventStringIO(io.StringIO): def __init__(self, on_write=None, *args, **kwargs): super().__init__(*args, **kwargs) self.on_write = on_write def write(self, text): super().write(text) if self.on_write: self.on_write(text)