import os import itertools import numpy as np import torch from PIL import Image, ImageOps import cv2 import psutil import subprocess import re import time import folder_paths from comfy.utils import common_upscale, ProgressBar import nodes from comfy.k_diffusion.utils import FolderOfImages from .logger import logger from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory,\ lazy_get_audio, hash_path, validate_path, strip_path, try_download_video, \ is_url, imageOrLatent, ffmpeg_path, ENCODE_ARGS, floatOrInt video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov'] VHSLoadFormats = { 'None': {}, 'AnimateDiff': {'target_rate': 8, 'dim': (8,0,512,512)}, 'Mochi': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(6,1)}, 'LTXV': {'target_rate': 24, 'dim': (32,0,768,512), 'frames':(8,1)}, 'Hunyuan': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(4,1)}, 'Cosmos': {'target_rate': 24, 'dim': (16,0,1280,704), 'frames':(8,1)}, 'Wan': {'target_rate': 16, 'dim': (8,0,832,480), 'frames':(4,1)}, } """ External plugins may add additional formats to nodes.VHSLoadFormats In addition to shorthand options, direct widget names will map a given dict to options. Adding a third arguement to a frames tuple can enable strict checks on number of loaded frames, i.e (8,1,True) """ if not hasattr(nodes, 'VHSLoadFormats'): nodes.VHSLoadFormats = {} def get_load_formats(): #TODO: check if {**extra_config.VHSLoafFormats, **VHSLoadFormats} has minimum version formats = {} formats.update(nodes.VHSLoadFormats) formats.update(VHSLoadFormats) return (list(formats.keys()), {'default': 'AnimateDiff', 'formats': formats}) def get_format(format): if format in VHSLoadFormats: return VHSLoadFormats[format] return nodes.VHSLoadFormats.get(format, {}) def is_gif(filename) -> bool: file_parts = filename.split('.') return len(file_parts) > 1 and file_parts[-1] == "gif" def target_size(width, height, custom_width, custom_height, downscale_ratio=8) -> tuple[int, int]: if downscale_ratio is None: downscale_ratio = 8 if custom_width == 0 and custom_height == 0: pass elif custom_height == 0: height *= custom_width/width width = custom_width elif custom_width == 0: width *= custom_height/height height = custom_height else: width = custom_width height = custom_height width = int(width/downscale_ratio + 0.5) * downscale_ratio height = int(height/downscale_ratio + 0.5) * downscale_ratio return (width, height) def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, select_every_nth, meta_batch=None, unique_id=None): video_cap = cv2.VideoCapture(video) if not video_cap.isOpened() or not video_cap.grab(): raise ValueError(f"{video} could not be loaded with cv.") # extract video metadata fps = video_cap.get(cv2.CAP_PROP_FPS) width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps width = 0 if width <=0 or height <=0: _, frame = video_cap.retrieve() height, width, _ = frame.shape # set video_cap to look at start_index frame total_frame_count = 0 total_frames_evaluated = -1 frames_added = 0 base_frame_time = 1 / fps prev_frame = None if force_rate == 0: target_frame_time = base_frame_time else: target_frame_time = 1/force_rate if total_frames > 0: if force_rate != 0: yieldable_frames = int(total_frames / fps * force_rate) else: yieldable_frames = total_frames if select_every_nth: yieldable_frames //= select_every_nth if frame_load_cap != 0: yieldable_frames = min(frame_load_cap, yieldable_frames) else: yieldable_frames = 0 yield (width, height, fps, duration, total_frames, target_frame_time, yieldable_frames) pbar = ProgressBar(yieldable_frames) time_offset=target_frame_time while video_cap.isOpened(): if time_offset < target_frame_time: is_returned = video_cap.grab() # if didn't return frame, video has ended if not is_returned: break time_offset += base_frame_time if time_offset < target_frame_time: continue time_offset -= target_frame_time # if not at start_index, skip doing anything with frame total_frame_count += 1 if total_frame_count <= skip_first_frames: continue else: total_frames_evaluated += 1 # if should not be selected, skip doing anything with frame if total_frames_evaluated%select_every_nth != 0: continue # opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use # follow up: can videos ever have an alpha channel? # To my testing: No. opencv has no support for alpha unused, frame = video_cap.retrieve() frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # convert frame to comfyui's expected format # TODO: frame contains no exif information. Check if opencv2 has already applied frame = np.array(frame, dtype=np.float32) torch.from_numpy(frame).div_(255) if prev_frame is not None: inp = yield prev_frame if inp is not None: #ensure the finally block is called return prev_frame = frame frames_added += 1 if pbar is not None: pbar.update_absolute(frames_added, yieldable_frames) # if cap exists and we've reached it, stop processing frames if frame_load_cap > 0 and frames_added >= frame_load_cap: break if meta_batch is not None: meta_batch.inputs.pop(unique_id) meta_batch.has_closed_inputs = True if prev_frame is not None: yield prev_frame def ffmpeg_frame_generator(video, force_rate, frame_load_cap, start_time, custom_width, custom_height, downscale_ratio=8, meta_batch=None, unique_id=None): args_dummy = [ffmpeg_path, "-i", video, '-c', 'copy', '-frames:v', '1', "-f", "null", "-"] size_base = None fps_base = None try: dummy_res = subprocess.run(args_dummy, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, check=True) except subprocess.CalledProcessError as e: raise Exception("An error occurred in the ffmpeg subprocess:\n" \ + e.stderr.decode(*ENCODE_ARGS)) lines = dummy_res.stderr.decode(*ENCODE_ARGS) for line in lines.split('\n'): match = re.search("^ *Stream .* Video.*, ([1-9]|\\d{2,})x(\\d+)", line) if match is not None: size_base = [int(match.group(1)), int(match.group(2))] fps_match = re.search(", ([\\d\\.]+) fps", line) if fps_match: fps_base = float(fps_match.group(1)) else: fps_base = 1 alpha = re.search("(yuva|rgba)", line) is not None break else: raise Exception("Failed to parse video/image information. FFMPEG output:\n" + lines) durs_match = re.search("Duration: (\\d+:\\d+:\\d+\\.\\d+),", lines) if durs_match: durs = durs_match.group(1).split(':') duration = int(durs[0])*360 + int(durs[1])*60 + float(durs[2]) else: duration = 0 if start_time > 0: if start_time > 4: post_seek = ['-ss', '4'] pre_seek = ['-ss', str(start_time - 4)] else: post_seek = ['-ss', str(start_time)] pre_seek = [] else: pre_seek = [] post_seek = [] args_all_frames = [ffmpeg_path, "-v", "error", "-an"] + pre_seek + \ ["-i", video, "-pix_fmt", "rgba64le"] + post_seek vfilters = [] if force_rate != 0: vfilters.append("fps=fps="+str(force_rate)) if custom_width != 0 or custom_height != 0: size = target_size(size_base[0], size_base[1], custom_width, custom_height, downscale_ratio=downscale_ratio) ar = float(size[0])/float(size[1]) if abs(size_base[0]*ar-size_base[1]) >= 1: #Aspect ratio is changed. Crop to new aspect ratio before scale vfilters.append(f"crop=if(gt({ar}\\,a)\\,iw\\,ih*{ar}):if(gt({ar}\\,a)\\,iw/{ar}\\,ih)") size_arg = ':'.join(map(str,size)) vfilters.append(f"scale={size_arg}") else: size = size_base if len(vfilters) > 0: args_all_frames += ["-vf", ",".join(vfilters)] yieldable_frames = (force_rate or fps_base)*duration if frame_load_cap > 0: args_all_frames += ["-frames:v", str(frame_load_cap)] yieldable_frames = min(yieldable_frames, frame_load_cap) yield (size_base[0], size_base[1], fps_base, duration, fps_base * duration, 1/(force_rate or fps_base), yieldable_frames, size[0], size[1], alpha) args_all_frames += ["-f", "rawvideo", "-"] pbar = ProgressBar(yieldable_frames) try: with subprocess.Popen(args_all_frames, stdout=subprocess.PIPE) as proc: #Manually buffer enough bytes for an image bpi = size[0] * size[1] * 8 current_bytes = bytearray(bpi) current_offset=0 prev_frame = None while True: bytes_read = proc.stdout.read(bpi - current_offset) if bytes_read is None:#sleep to wait for more data time.sleep(.1) continue if len(bytes_read) == 0:#EOF break current_bytes[current_offset:len(bytes_read)] = bytes_read current_offset+=len(bytes_read) if current_offset == bpi: if prev_frame is not None: yield prev_frame pbar.update(1) prev_frame = np.frombuffer(current_bytes, dtype=np.dtype(np.uint16).newbyteorder("<")).reshape(size[1], size[0], 4) / (2**16-1) if not alpha: prev_frame = prev_frame[:, :, :-1] current_offset = 0 except BrokenPipeError as e: raise Exception("An error occured in the ffmpeg subprocess:\n" \ + proc.stderr.read().decode(*ENCODE_ARGS)) if meta_batch is not None: meta_batch.inputs.pop(unique_id) meta_batch.has_closed_inputs = True if prev_frame is not None: yield prev_frame #Python 3.12 adds an itertools.batched, but it's easily replicated for legacy support def batched(it, n): while batch := tuple(itertools.islice(it, n)): yield batch def batched_vae_encode(images, vae, frames_per_batch): for batch in batched(images, frames_per_batch): image_batch = torch.from_numpy(np.array(batch)) yield from vae.encode(image_batch).numpy() def resized_cv_frame_gen(custom_width, custom_height, downscale_ratio, **kwargs): gen = cv_frame_generator(**kwargs) info = next(gen) width, height = info[0], info[1] frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 if kwargs.get('meta_batch', None) is not None: frames_per_batch = min(frames_per_batch, kwargs['meta_batch'].frames_per_batch) if custom_width != 0 or custom_height != 0 or downscale_ratio is not None: new_size = target_size(width, height, custom_width, custom_height, downscale_ratio) yield (*info, new_size[0], new_size[1], False) if new_size[0] != width or new_size[1] != height: def rescale(frame): s = torch.from_numpy(np.fromiter(frame, np.dtype((np.float32, (height, width, 3))))) s = s.movedim(-1,1) s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center") return s.movedim(1,-1).numpy() yield from itertools.chain.from_iterable(map(rescale, batched(gen, frames_per_batch))) return else: yield (*info, info[0], info[1], False) yield from gen def load_video(meta_batch=None, unique_id=None, memory_limit_mb=None, vae=None, generator=resized_cv_frame_gen, format='None', **kwargs): if 'force_size' in kwargs: kwargs.pop('force_size') logger.warn("force_size has been removed. Did you reload the webpage after updating?") format = get_format(format) kwargs['video'] = strip_path(kwargs['video']) if vae is not None: downscale_ratio = getattr(vae, "downscale_ratio", 8) else: downscale_ratio = format.get('dim', (1,))[0] if meta_batch is None or unique_id not in meta_batch.inputs: gen = generator(meta_batch=meta_batch, unique_id=unique_id, downscale_ratio=downscale_ratio, **kwargs) (width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = next(gen) if meta_batch is not None: meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) if yieldable_frames: meta_batch.total_frames = min(meta_batch.total_frames, yieldable_frames) else: (gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = meta_batch.inputs[unique_id] memory_limit = None if memory_limit_mb is not None: memory_limit *= 2 ** 20 else: #TODO: verify if garbage collection should be performed here. #leaves ~128 MB unreserved for safety try: memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27 except: logger.warn("Failed to calculate available memory. Memory load limit has been disabled") memory_limit = BIGMAX if vae is not None: #space required to load as f32, exist as latent with wiggle room, decode to f32 max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10))) else: #TODO: use better estimate for when vae is not None #Consider completely ignoring for load_latent case? max_loadable_frames = int(memory_limit//(width*height*3*(.1))) if meta_batch is not None: if 'frames' in format: if meta_batch.frames_per_batch % format['frames'][0] != format['frames'][1]: error = (meta_batch.frames_per_batch - format['frames'][1]) % format['frames'][0] suggested = meta_batch.frames_per_batch - error if error > format['frames'][0] / 2: suggested += format['frames'][0] raise RuntimeError(f"The chosen frames per batch is incompatible with the selected format. Try {suggested}") if meta_batch.frames_per_batch > max_loadable_frames: raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory") gen = itertools.islice(gen, meta_batch.frames_per_batch) else: original_gen = gen gen = itertools.islice(gen, max_loadable_frames) frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 if vae is not None: gen = batched_vae_encode(gen, vae, frames_per_batch) vw,vh = new_width//downscale_ratio, new_height//downscale_ratio channels = getattr(vae, 'latent_channels', 4) images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (channels,vh,vw))))) else: #Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2 images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (new_height, new_width, 4 if alpha else 3))))) if meta_batch is None and memory_limit is not None: try: next(original_gen) raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.") except StopIteration: pass if len(images) == 0: raise RuntimeError("No frames generated") if 'frames' in format and len(images) % format['frames'][0] != format['frames'][1]: err_msg = f"The number of frames loaded {len(images)}, does not match the requirements of the currently selected format." if len(format['frames']) > 2 and format['frames'][2]: raise RuntimeError(err_msg) div, mod = format['frames'][:2] frames = (len(images) - mod) // div * div + mod images = images[:frames] #Commenting out log message since it's displayed in UI. consider further #logger.warn(err_msg + f" Output has been truncated to {len(images)} frames.") if 'start_time' in kwargs: start_time = kwargs['start_time'] else: start_time = kwargs['skip_first_frames'] * target_frame_time target_frame_time *= kwargs.get('select_every_nth', 1) #Setup lambda for lazy audio capture audio = lazy_get_audio(kwargs['video'], start_time, kwargs['frame_load_cap']*target_frame_time) #Adjust target_frame_time for select_every_nth video_info = { "source_fps": fps, "source_frame_count": total_frames, "source_duration": duration, "source_width": width, "source_height": height, "loaded_fps": 1/target_frame_time, "loaded_frame_count": len(images), "loaded_duration": len(images) * target_frame_time, "loaded_width": new_width, "loaded_height": new_height, } if vae is None: return (images, len(images), audio, video_info) else: return ({"samples": images}, len(images), audio, video_info) class LoadVideoUpload: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [] for f in os.listdir(input_dir): if os.path.isfile(os.path.join(input_dir, f)): file_parts = f.split('.') if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions): files.append(f) return {"required": { "video": (sorted(files),), "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), "skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), "select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), }, "optional": { "meta_batch": ("VHS_BatchManager",), "vae": ("VAE",), "format": get_load_formats(), }, "hidden": { "force_size": "STRING", "unique_id": "UNIQUE_ID" }, } CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO") RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info") FUNCTION = "load_video" def load_video(self, **kwargs): kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video'])) return load_video(**kwargs) @classmethod def IS_CHANGED(s, video, **kwargs): image_path = folder_paths.get_annotated_filepath(video) return calculate_file_hash(image_path) @classmethod def VALIDATE_INPUTS(s, video): if not folder_paths.exists_annotated_filepath(video): return "Invalid video file: {}".format(video) return True class LoadVideoPath: @classmethod def INPUT_TYPES(s): return { "required": { "video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), "skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), "select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), }, "optional": { "meta_batch": ("VHS_BatchManager",), "vae": ("VAE",), "format": get_load_formats(), }, "hidden": { "force_size": "STRING", "unique_id": "UNIQUE_ID" }, } CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO") RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info") FUNCTION = "load_video" def load_video(self, **kwargs): if kwargs['video'] is None or validate_path(kwargs['video']) != True: raise Exception("video is not a valid path: " + kwargs['video']) if is_url(kwargs['video']): kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video'] return load_video(**kwargs) @classmethod def IS_CHANGED(s, video, **kwargs): return hash_path(video) @classmethod def VALIDATE_INPUTS(s, video): return validate_path(video, allow_none=True) class LoadVideoFFmpegUpload: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [] for f in os.listdir(input_dir): if os.path.isfile(os.path.join(input_dir, f)): file_parts = f.split('.') if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions): files.append(f) return {"required": { "video": (sorted(files),), "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), "start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}), }, "optional": { "meta_batch": ("VHS_BatchManager",), "vae": ("VAE",), "format": get_load_formats(), }, "hidden": { "force_size": "STRING", "unique_id": "UNIQUE_ID" }, } CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO") RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info") FUNCTION = "load_video" def load_video(self, **kwargs): kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video'])) image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator) if image.size(3) == 4: return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info) return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info) @classmethod def IS_CHANGED(s, video, **kwargs): image_path = folder_paths.get_annotated_filepath(video) return calculate_file_hash(image_path) @classmethod def VALIDATE_INPUTS(s, video): if not folder_paths.exists_annotated_filepath(video): return "Invalid video file: {}".format(video) return True class LoadVideoFFmpegPath: @classmethod def INPUT_TYPES(s): return { "required": { "video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), "start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}), }, "optional": { "meta_batch": ("VHS_BatchManager",), "vae": ("VAE",), "format": get_load_formats(), }, "hidden": { "force_size": "STRING", "unique_id": "UNIQUE_ID" }, } CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO") RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info") FUNCTION = "load_video" def load_video(self, **kwargs): if kwargs['video'] is None or validate_path(kwargs['video']) != True: raise Exception("video is not a valid path: " + kwargs['video']) if is_url(kwargs['video']): kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video'] image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator) if isinstance(image, dict): return (image, None, audio, video_info) if image.size(3) == 4: return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info) return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info) @classmethod def IS_CHANGED(s, video, **kwargs): return hash_path(video) @classmethod def VALIDATE_INPUTS(s, video): return validate_path(video, allow_none=True) class LoadImagePath: @classmethod def INPUT_TYPES(s): return { "required": { "image": ("STRING", {"placeholder": "X://insert/path/here.png", "vhs_path_extensions": list(FolderOfImages.IMG_EXTENSIONS)}), "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}), "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}), }, "optional": { "vae": ("VAE",), }, "hidden": { "force_size": "STRING", }, } CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" RETURN_TYPES = (imageOrLatent, "MASK") RETURN_NAMES = ("IMAGE", "mask") FUNCTION = "load_image" def load_image(self, **kwargs): if kwargs['image'] is None or validate_path(kwargs['image']) != True: raise Exception("image is not a valid path: " + kwargs['image']) kwargs.update({'video': kwargs['image'], 'force_rate': 0, 'frame_load_cap': 0, 'start_time': 0}) kwargs.pop('image') image, _, _, _ = load_video(**kwargs, generator=ffmpeg_frame_generator) if isinstance(image, dict): return (image, None) if image.size(3) == 4: return (image[:,:,:,:3], 1-image[:,:,:,3]) return (image, torch.zeros(image.size(0), 64, 64, device="cpu")) @classmethod def IS_CHANGED(s, image, **kwargs): return hash_path(image) @classmethod def VALIDATE_INPUTS(s, image): return validate_path(image, allow_none=True)