Spaces:
Paused
Paused
import os | |
import itertools | |
import numpy as np | |
import torch | |
from PIL import Image, ImageOps | |
import cv2 | |
import psutil | |
import subprocess | |
import re | |
import time | |
import folder_paths | |
from comfy.utils import common_upscale, ProgressBar | |
import nodes | |
from comfy.k_diffusion.utils import FolderOfImages | |
from .logger import logger | |
from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory,\ | |
lazy_get_audio, hash_path, validate_path, strip_path, try_download_video, \ | |
is_url, imageOrLatent, ffmpeg_path, ENCODE_ARGS, floatOrInt | |
video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov'] | |
VHSLoadFormats = { | |
'None': {}, | |
'AnimateDiff': {'target_rate': 8, 'dim': (8,0,512,512)}, | |
'Mochi': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(6,1)}, | |
'LTXV': {'target_rate': 24, 'dim': (32,0,768,512), 'frames':(8,1)}, | |
'Hunyuan': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(4,1)}, | |
'Cosmos': {'target_rate': 24, 'dim': (16,0,1280,704), 'frames':(8,1)}, | |
'Wan': {'target_rate': 16, 'dim': (8,0,832,480), 'frames':(4,1)}, | |
} | |
""" | |
External plugins may add additional formats to nodes.VHSLoadFormats | |
In addition to shorthand options, direct widget names will map a given dict to options. | |
Adding a third arguement to a frames tuple can enable strict checks on number | |
of loaded frames, i.e (8,1,True) | |
""" | |
if not hasattr(nodes, 'VHSLoadFormats'): | |
nodes.VHSLoadFormats = {} | |
def get_load_formats(): | |
#TODO: check if {**extra_config.VHSLoafFormats, **VHSLoadFormats} has minimum version | |
formats = {} | |
formats.update(nodes.VHSLoadFormats) | |
formats.update(VHSLoadFormats) | |
return (list(formats.keys()), | |
{'default': 'AnimateDiff', 'formats': formats}) | |
def get_format(format): | |
if format in VHSLoadFormats: | |
return VHSLoadFormats[format] | |
return nodes.VHSLoadFormats.get(format, {}) | |
def is_gif(filename) -> bool: | |
file_parts = filename.split('.') | |
return len(file_parts) > 1 and file_parts[-1] == "gif" | |
def target_size(width, height, custom_width, custom_height, downscale_ratio=8) -> tuple[int, int]: | |
if downscale_ratio is None: | |
downscale_ratio = 8 | |
if custom_width == 0 and custom_height == 0: | |
pass | |
elif custom_height == 0: | |
height *= custom_width/width | |
width = custom_width | |
elif custom_width == 0: | |
width *= custom_height/height | |
height = custom_height | |
else: | |
width = custom_width | |
height = custom_height | |
width = int(width/downscale_ratio + 0.5) * downscale_ratio | |
height = int(height/downscale_ratio + 0.5) * downscale_ratio | |
return (width, height) | |
def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, | |
select_every_nth, meta_batch=None, unique_id=None): | |
video_cap = cv2.VideoCapture(video) | |
if not video_cap.isOpened() or not video_cap.grab(): | |
raise ValueError(f"{video} could not be loaded with cv.") | |
# extract video metadata | |
fps = video_cap.get(cv2.CAP_PROP_FPS) | |
width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = total_frames / fps | |
width = 0 | |
if width <=0 or height <=0: | |
_, frame = video_cap.retrieve() | |
height, width, _ = frame.shape | |
# set video_cap to look at start_index frame | |
total_frame_count = 0 | |
total_frames_evaluated = -1 | |
frames_added = 0 | |
base_frame_time = 1 / fps | |
prev_frame = None | |
if force_rate == 0: | |
target_frame_time = base_frame_time | |
else: | |
target_frame_time = 1/force_rate | |
if total_frames > 0: | |
if force_rate != 0: | |
yieldable_frames = int(total_frames / fps * force_rate) | |
else: | |
yieldable_frames = total_frames | |
if select_every_nth: | |
yieldable_frames //= select_every_nth | |
if frame_load_cap != 0: | |
yieldable_frames = min(frame_load_cap, yieldable_frames) | |
else: | |
yieldable_frames = 0 | |
yield (width, height, fps, duration, total_frames, target_frame_time, yieldable_frames) | |
pbar = ProgressBar(yieldable_frames) | |
time_offset=target_frame_time | |
while video_cap.isOpened(): | |
if time_offset < target_frame_time: | |
is_returned = video_cap.grab() | |
# if didn't return frame, video has ended | |
if not is_returned: | |
break | |
time_offset += base_frame_time | |
if time_offset < target_frame_time: | |
continue | |
time_offset -= target_frame_time | |
# if not at start_index, skip doing anything with frame | |
total_frame_count += 1 | |
if total_frame_count <= skip_first_frames: | |
continue | |
else: | |
total_frames_evaluated += 1 | |
# if should not be selected, skip doing anything with frame | |
if total_frames_evaluated%select_every_nth != 0: | |
continue | |
# opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use | |
# follow up: can videos ever have an alpha channel? | |
# To my testing: No. opencv has no support for alpha | |
unused, frame = video_cap.retrieve() | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# convert frame to comfyui's expected format | |
# TODO: frame contains no exif information. Check if opencv2 has already applied | |
frame = np.array(frame, dtype=np.float32) | |
torch.from_numpy(frame).div_(255) | |
if prev_frame is not None: | |
inp = yield prev_frame | |
if inp is not None: | |
#ensure the finally block is called | |
return | |
prev_frame = frame | |
frames_added += 1 | |
if pbar is not None: | |
pbar.update_absolute(frames_added, yieldable_frames) | |
# if cap exists and we've reached it, stop processing frames | |
if frame_load_cap > 0 and frames_added >= frame_load_cap: | |
break | |
if meta_batch is not None: | |
meta_batch.inputs.pop(unique_id) | |
meta_batch.has_closed_inputs = True | |
if prev_frame is not None: | |
yield prev_frame | |
def ffmpeg_frame_generator(video, force_rate, frame_load_cap, start_time, | |
custom_width, custom_height, downscale_ratio=8, | |
meta_batch=None, unique_id=None): | |
args_dummy = [ffmpeg_path, "-i", video, '-c', 'copy', '-frames:v', '1', "-f", "null", "-"] | |
size_base = None | |
fps_base = None | |
try: | |
dummy_res = subprocess.run(args_dummy, stdout=subprocess.DEVNULL, | |
stderr=subprocess.PIPE, check=True) | |
except subprocess.CalledProcessError as e: | |
raise Exception("An error occurred in the ffmpeg subprocess:\n" \ | |
+ e.stderr.decode(*ENCODE_ARGS)) | |
lines = dummy_res.stderr.decode(*ENCODE_ARGS) | |
for line in lines.split('\n'): | |
match = re.search("^ *Stream .* Video.*, ([1-9]|\\d{2,})x(\\d+)", line) | |
if match is not None: | |
size_base = [int(match.group(1)), int(match.group(2))] | |
fps_match = re.search(", ([\\d\\.]+) fps", line) | |
if fps_match: | |
fps_base = float(fps_match.group(1)) | |
else: | |
fps_base = 1 | |
alpha = re.search("(yuva|rgba)", line) is not None | |
break | |
else: | |
raise Exception("Failed to parse video/image information. FFMPEG output:\n" + lines) | |
durs_match = re.search("Duration: (\\d+:\\d+:\\d+\\.\\d+),", lines) | |
if durs_match: | |
durs = durs_match.group(1).split(':') | |
duration = int(durs[0])*360 + int(durs[1])*60 + float(durs[2]) | |
else: | |
duration = 0 | |
if start_time > 0: | |
if start_time > 4: | |
post_seek = ['-ss', '4'] | |
pre_seek = ['-ss', str(start_time - 4)] | |
else: | |
post_seek = ['-ss', str(start_time)] | |
pre_seek = [] | |
else: | |
pre_seek = [] | |
post_seek = [] | |
args_all_frames = [ffmpeg_path, "-v", "error", "-an"] + pre_seek + \ | |
["-i", video, "-pix_fmt", "rgba64le"] + post_seek | |
vfilters = [] | |
if force_rate != 0: | |
vfilters.append("fps=fps="+str(force_rate)) | |
if custom_width != 0 or custom_height != 0: | |
size = target_size(size_base[0], size_base[1], custom_width, | |
custom_height, downscale_ratio=downscale_ratio) | |
ar = float(size[0])/float(size[1]) | |
if abs(size_base[0]*ar-size_base[1]) >= 1: | |
#Aspect ratio is changed. Crop to new aspect ratio before scale | |
vfilters.append(f"crop=if(gt({ar}\\,a)\\,iw\\,ih*{ar}):if(gt({ar}\\,a)\\,iw/{ar}\\,ih)") | |
size_arg = ':'.join(map(str,size)) | |
vfilters.append(f"scale={size_arg}") | |
else: | |
size = size_base | |
if len(vfilters) > 0: | |
args_all_frames += ["-vf", ",".join(vfilters)] | |
yieldable_frames = (force_rate or fps_base)*duration | |
if frame_load_cap > 0: | |
args_all_frames += ["-frames:v", str(frame_load_cap)] | |
yieldable_frames = min(yieldable_frames, frame_load_cap) | |
yield (size_base[0], size_base[1], fps_base, duration, fps_base * duration, | |
1/(force_rate or fps_base), yieldable_frames, size[0], size[1], alpha) | |
args_all_frames += ["-f", "rawvideo", "-"] | |
pbar = ProgressBar(yieldable_frames) | |
try: | |
with subprocess.Popen(args_all_frames, stdout=subprocess.PIPE) as proc: | |
#Manually buffer enough bytes for an image | |
bpi = size[0] * size[1] * 8 | |
current_bytes = bytearray(bpi) | |
current_offset=0 | |
prev_frame = None | |
while True: | |
bytes_read = proc.stdout.read(bpi - current_offset) | |
if bytes_read is None:#sleep to wait for more data | |
time.sleep(.1) | |
continue | |
if len(bytes_read) == 0:#EOF | |
break | |
current_bytes[current_offset:len(bytes_read)] = bytes_read | |
current_offset+=len(bytes_read) | |
if current_offset == bpi: | |
if prev_frame is not None: | |
yield prev_frame | |
pbar.update(1) | |
prev_frame = np.frombuffer(current_bytes, dtype=np.dtype(np.uint16).newbyteorder("<")).reshape(size[1], size[0], 4) / (2**16-1) | |
if not alpha: | |
prev_frame = prev_frame[:, :, :-1] | |
current_offset = 0 | |
except BrokenPipeError as e: | |
raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
+ proc.stderr.read().decode(*ENCODE_ARGS)) | |
if meta_batch is not None: | |
meta_batch.inputs.pop(unique_id) | |
meta_batch.has_closed_inputs = True | |
if prev_frame is not None: | |
yield prev_frame | |
#Python 3.12 adds an itertools.batched, but it's easily replicated for legacy support | |
def batched(it, n): | |
while batch := tuple(itertools.islice(it, n)): | |
yield batch | |
def batched_vae_encode(images, vae, frames_per_batch): | |
for batch in batched(images, frames_per_batch): | |
image_batch = torch.from_numpy(np.array(batch)) | |
yield from vae.encode(image_batch).numpy() | |
def resized_cv_frame_gen(custom_width, custom_height, downscale_ratio, **kwargs): | |
gen = cv_frame_generator(**kwargs) | |
info = next(gen) | |
width, height = info[0], info[1] | |
frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 | |
if kwargs.get('meta_batch', None) is not None: | |
frames_per_batch = min(frames_per_batch, kwargs['meta_batch'].frames_per_batch) | |
if custom_width != 0 or custom_height != 0 or downscale_ratio is not None: | |
new_size = target_size(width, height, custom_width, custom_height, downscale_ratio) | |
yield (*info, new_size[0], new_size[1], False) | |
if new_size[0] != width or new_size[1] != height: | |
def rescale(frame): | |
s = torch.from_numpy(np.fromiter(frame, np.dtype((np.float32, (height, width, 3))))) | |
s = s.movedim(-1,1) | |
s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center") | |
return s.movedim(1,-1).numpy() | |
yield from itertools.chain.from_iterable(map(rescale, batched(gen, frames_per_batch))) | |
return | |
else: | |
yield (*info, info[0], info[1], False) | |
yield from gen | |
def load_video(meta_batch=None, unique_id=None, memory_limit_mb=None, vae=None, | |
generator=resized_cv_frame_gen, format='None', **kwargs): | |
if 'force_size' in kwargs: | |
kwargs.pop('force_size') | |
logger.warn("force_size has been removed. Did you reload the webpage after updating?") | |
format = get_format(format) | |
kwargs['video'] = strip_path(kwargs['video']) | |
if vae is not None: | |
downscale_ratio = getattr(vae, "downscale_ratio", 8) | |
else: | |
downscale_ratio = format.get('dim', (1,))[0] | |
if meta_batch is None or unique_id not in meta_batch.inputs: | |
gen = generator(meta_batch=meta_batch, unique_id=unique_id, downscale_ratio=downscale_ratio, **kwargs) | |
(width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = next(gen) | |
if meta_batch is not None: | |
meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) | |
if yieldable_frames: | |
meta_batch.total_frames = min(meta_batch.total_frames, yieldable_frames) | |
else: | |
(gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = meta_batch.inputs[unique_id] | |
memory_limit = None | |
if memory_limit_mb is not None: | |
memory_limit *= 2 ** 20 | |
else: | |
#TODO: verify if garbage collection should be performed here. | |
#leaves ~128 MB unreserved for safety | |
try: | |
memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27 | |
except: | |
logger.warn("Failed to calculate available memory. Memory load limit has been disabled") | |
memory_limit = BIGMAX | |
if vae is not None: | |
#space required to load as f32, exist as latent with wiggle room, decode to f32 | |
max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10))) | |
else: | |
#TODO: use better estimate for when vae is not None | |
#Consider completely ignoring for load_latent case? | |
max_loadable_frames = int(memory_limit//(width*height*3*(.1))) | |
if meta_batch is not None: | |
if 'frames' in format: | |
if meta_batch.frames_per_batch % format['frames'][0] != format['frames'][1]: | |
error = (meta_batch.frames_per_batch - format['frames'][1]) % format['frames'][0] | |
suggested = meta_batch.frames_per_batch - error | |
if error > format['frames'][0] / 2: | |
suggested += format['frames'][0] | |
raise RuntimeError(f"The chosen frames per batch is incompatible with the selected format. Try {suggested}") | |
if meta_batch.frames_per_batch > max_loadable_frames: | |
raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory") | |
gen = itertools.islice(gen, meta_batch.frames_per_batch) | |
else: | |
original_gen = gen | |
gen = itertools.islice(gen, max_loadable_frames) | |
frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 | |
if vae is not None: | |
gen = batched_vae_encode(gen, vae, frames_per_batch) | |
vw,vh = new_width//downscale_ratio, new_height//downscale_ratio | |
channels = getattr(vae, 'latent_channels', 4) | |
images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (channels,vh,vw))))) | |
else: | |
#Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2 | |
images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (new_height, new_width, 4 if alpha else 3))))) | |
if meta_batch is None and memory_limit is not None: | |
try: | |
next(original_gen) | |
raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.") | |
except StopIteration: | |
pass | |
if len(images) == 0: | |
raise RuntimeError("No frames generated") | |
if 'frames' in format and len(images) % format['frames'][0] != format['frames'][1]: | |
err_msg = f"The number of frames loaded {len(images)}, does not match the requirements of the currently selected format." | |
if len(format['frames']) > 2 and format['frames'][2]: | |
raise RuntimeError(err_msg) | |
div, mod = format['frames'][:2] | |
frames = (len(images) - mod) // div * div + mod | |
images = images[:frames] | |
#Commenting out log message since it's displayed in UI. consider further | |
#logger.warn(err_msg + f" Output has been truncated to {len(images)} frames.") | |
if 'start_time' in kwargs: | |
start_time = kwargs['start_time'] | |
else: | |
start_time = kwargs['skip_first_frames'] * target_frame_time | |
target_frame_time *= kwargs.get('select_every_nth', 1) | |
#Setup lambda for lazy audio capture | |
audio = lazy_get_audio(kwargs['video'], start_time, kwargs['frame_load_cap']*target_frame_time) | |
#Adjust target_frame_time for select_every_nth | |
video_info = { | |
"source_fps": fps, | |
"source_frame_count": total_frames, | |
"source_duration": duration, | |
"source_width": width, | |
"source_height": height, | |
"loaded_fps": 1/target_frame_time, | |
"loaded_frame_count": len(images), | |
"loaded_duration": len(images) * target_frame_time, | |
"loaded_width": new_width, | |
"loaded_height": new_height, | |
} | |
if vae is None: | |
return (images, len(images), audio, video_info) | |
else: | |
return ({"samples": images}, len(images), audio, video_info) | |
class LoadVideoUpload: | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = [] | |
for f in os.listdir(input_dir): | |
if os.path.isfile(os.path.join(input_dir, f)): | |
file_parts = f.split('.') | |
if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions): | |
files.append(f) | |
return {"required": { | |
"video": (sorted(files),), | |
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
"skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), | |
"select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), | |
}, | |
"optional": { | |
"meta_batch": ("VHS_BatchManager",), | |
"vae": ("VAE",), | |
"format": get_load_formats(), | |
}, | |
"hidden": { | |
"force_size": "STRING", | |
"unique_id": "UNIQUE_ID" | |
}, | |
} | |
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO") | |
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info") | |
FUNCTION = "load_video" | |
def load_video(self, **kwargs): | |
kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video'])) | |
return load_video(**kwargs) | |
def IS_CHANGED(s, video, **kwargs): | |
image_path = folder_paths.get_annotated_filepath(video) | |
return calculate_file_hash(image_path) | |
def VALIDATE_INPUTS(s, video): | |
if not folder_paths.exists_annotated_filepath(video): | |
return "Invalid video file: {}".format(video) | |
return True | |
class LoadVideoPath: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), | |
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
"skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), | |
"select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), | |
}, | |
"optional": { | |
"meta_batch": ("VHS_BatchManager",), | |
"vae": ("VAE",), | |
"format": get_load_formats(), | |
}, | |
"hidden": { | |
"force_size": "STRING", | |
"unique_id": "UNIQUE_ID" | |
}, | |
} | |
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO") | |
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info") | |
FUNCTION = "load_video" | |
def load_video(self, **kwargs): | |
if kwargs['video'] is None or validate_path(kwargs['video']) != True: | |
raise Exception("video is not a valid path: " + kwargs['video']) | |
if is_url(kwargs['video']): | |
kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video'] | |
return load_video(**kwargs) | |
def IS_CHANGED(s, video, **kwargs): | |
return hash_path(video) | |
def VALIDATE_INPUTS(s, video): | |
return validate_path(video, allow_none=True) | |
class LoadVideoFFmpegUpload: | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = [] | |
for f in os.listdir(input_dir): | |
if os.path.isfile(os.path.join(input_dir, f)): | |
file_parts = f.split('.') | |
if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions): | |
files.append(f) | |
return {"required": { | |
"video": (sorted(files),), | |
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
"start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}), | |
}, | |
"optional": { | |
"meta_batch": ("VHS_BatchManager",), | |
"vae": ("VAE",), | |
"format": get_load_formats(), | |
}, | |
"hidden": { | |
"force_size": "STRING", | |
"unique_id": "UNIQUE_ID" | |
}, | |
} | |
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO") | |
RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info") | |
FUNCTION = "load_video" | |
def load_video(self, **kwargs): | |
kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video'])) | |
image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator) | |
if image.size(3) == 4: | |
return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info) | |
return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info) | |
def IS_CHANGED(s, video, **kwargs): | |
image_path = folder_paths.get_annotated_filepath(video) | |
return calculate_file_hash(image_path) | |
def VALIDATE_INPUTS(s, video): | |
if not folder_paths.exists_annotated_filepath(video): | |
return "Invalid video file: {}".format(video) | |
return True | |
class LoadVideoFFmpegPath: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), | |
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
"start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}), | |
}, | |
"optional": { | |
"meta_batch": ("VHS_BatchManager",), | |
"vae": ("VAE",), | |
"format": get_load_formats(), | |
}, | |
"hidden": { | |
"force_size": "STRING", | |
"unique_id": "UNIQUE_ID" | |
}, | |
} | |
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO") | |
RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info") | |
FUNCTION = "load_video" | |
def load_video(self, **kwargs): | |
if kwargs['video'] is None or validate_path(kwargs['video']) != True: | |
raise Exception("video is not a valid path: " + kwargs['video']) | |
if is_url(kwargs['video']): | |
kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video'] | |
image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator) | |
if isinstance(image, dict): | |
return (image, None, audio, video_info) | |
if image.size(3) == 4: | |
return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info) | |
return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info) | |
def IS_CHANGED(s, video, **kwargs): | |
return hash_path(video) | |
def VALIDATE_INPUTS(s, video): | |
return validate_path(video, allow_none=True) | |
class LoadImagePath: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"image": ("STRING", {"placeholder": "X://insert/path/here.png", "vhs_path_extensions": list(FolderOfImages.IMG_EXTENSIONS)}), | |
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}), | |
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}), | |
}, | |
"optional": { | |
"vae": ("VAE",), | |
}, | |
"hidden": { | |
"force_size": "STRING", | |
}, | |
} | |
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
RETURN_TYPES = (imageOrLatent, "MASK") | |
RETURN_NAMES = ("IMAGE", "mask") | |
FUNCTION = "load_image" | |
def load_image(self, **kwargs): | |
if kwargs['image'] is None or validate_path(kwargs['image']) != True: | |
raise Exception("image is not a valid path: " + kwargs['image']) | |
kwargs.update({'video': kwargs['image'], 'force_rate': 0, 'frame_load_cap': 0, | |
'start_time': 0}) | |
kwargs.pop('image') | |
image, _, _, _ = load_video(**kwargs, generator=ffmpeg_frame_generator) | |
if isinstance(image, dict): | |
return (image, None) | |
if image.size(3) == 4: | |
return (image[:,:,:,:3], 1-image[:,:,:,3]) | |
return (image, torch.zeros(image.size(0), 64, 64, device="cpu")) | |
def IS_CHANGED(s, image, **kwargs): | |
return hash_path(image) | |
def VALIDATE_INPUTS(s, image): | |
return validate_path(image, allow_none=True) | |