“fred-dev”
startup
4f44506
import os
import itertools
import numpy as np
import torch
from PIL import Image, ImageOps
import cv2
import psutil
import subprocess
import re
import time
import folder_paths
from comfy.utils import common_upscale, ProgressBar
import nodes
from comfy.k_diffusion.utils import FolderOfImages
from .logger import logger
from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory,\
lazy_get_audio, hash_path, validate_path, strip_path, try_download_video, \
is_url, imageOrLatent, ffmpeg_path, ENCODE_ARGS, floatOrInt
video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov']
VHSLoadFormats = {
'None': {},
'AnimateDiff': {'target_rate': 8, 'dim': (8,0,512,512)},
'Mochi': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(6,1)},
'LTXV': {'target_rate': 24, 'dim': (32,0,768,512), 'frames':(8,1)},
'Hunyuan': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(4,1)},
'Cosmos': {'target_rate': 24, 'dim': (16,0,1280,704), 'frames':(8,1)},
'Wan': {'target_rate': 16, 'dim': (8,0,832,480), 'frames':(4,1)},
}
"""
External plugins may add additional formats to nodes.VHSLoadFormats
In addition to shorthand options, direct widget names will map a given dict to options.
Adding a third arguement to a frames tuple can enable strict checks on number
of loaded frames, i.e (8,1,True)
"""
if not hasattr(nodes, 'VHSLoadFormats'):
nodes.VHSLoadFormats = {}
def get_load_formats():
#TODO: check if {**extra_config.VHSLoafFormats, **VHSLoadFormats} has minimum version
formats = {}
formats.update(nodes.VHSLoadFormats)
formats.update(VHSLoadFormats)
return (list(formats.keys()),
{'default': 'AnimateDiff', 'formats': formats})
def get_format(format):
if format in VHSLoadFormats:
return VHSLoadFormats[format]
return nodes.VHSLoadFormats.get(format, {})
def is_gif(filename) -> bool:
file_parts = filename.split('.')
return len(file_parts) > 1 and file_parts[-1] == "gif"
def target_size(width, height, custom_width, custom_height, downscale_ratio=8) -> tuple[int, int]:
if downscale_ratio is None:
downscale_ratio = 8
if custom_width == 0 and custom_height == 0:
pass
elif custom_height == 0:
height *= custom_width/width
width = custom_width
elif custom_width == 0:
width *= custom_height/height
height = custom_height
else:
width = custom_width
height = custom_height
width = int(width/downscale_ratio + 0.5) * downscale_ratio
height = int(height/downscale_ratio + 0.5) * downscale_ratio
return (width, height)
def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames,
select_every_nth, meta_batch=None, unique_id=None):
video_cap = cv2.VideoCapture(video)
if not video_cap.isOpened() or not video_cap.grab():
raise ValueError(f"{video} could not be loaded with cv.")
# extract video metadata
fps = video_cap.get(cv2.CAP_PROP_FPS)
width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
width = 0
if width <=0 or height <=0:
_, frame = video_cap.retrieve()
height, width, _ = frame.shape
# set video_cap to look at start_index frame
total_frame_count = 0
total_frames_evaluated = -1
frames_added = 0
base_frame_time = 1 / fps
prev_frame = None
if force_rate == 0:
target_frame_time = base_frame_time
else:
target_frame_time = 1/force_rate
if total_frames > 0:
if force_rate != 0:
yieldable_frames = int(total_frames / fps * force_rate)
else:
yieldable_frames = total_frames
if select_every_nth:
yieldable_frames //= select_every_nth
if frame_load_cap != 0:
yieldable_frames = min(frame_load_cap, yieldable_frames)
else:
yieldable_frames = 0
yield (width, height, fps, duration, total_frames, target_frame_time, yieldable_frames)
pbar = ProgressBar(yieldable_frames)
time_offset=target_frame_time
while video_cap.isOpened():
if time_offset < target_frame_time:
is_returned = video_cap.grab()
# if didn't return frame, video has ended
if not is_returned:
break
time_offset += base_frame_time
if time_offset < target_frame_time:
continue
time_offset -= target_frame_time
# if not at start_index, skip doing anything with frame
total_frame_count += 1
if total_frame_count <= skip_first_frames:
continue
else:
total_frames_evaluated += 1
# if should not be selected, skip doing anything with frame
if total_frames_evaluated%select_every_nth != 0:
continue
# opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use
# follow up: can videos ever have an alpha channel?
# To my testing: No. opencv has no support for alpha
unused, frame = video_cap.retrieve()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# convert frame to comfyui's expected format
# TODO: frame contains no exif information. Check if opencv2 has already applied
frame = np.array(frame, dtype=np.float32)
torch.from_numpy(frame).div_(255)
if prev_frame is not None:
inp = yield prev_frame
if inp is not None:
#ensure the finally block is called
return
prev_frame = frame
frames_added += 1
if pbar is not None:
pbar.update_absolute(frames_added, yieldable_frames)
# if cap exists and we've reached it, stop processing frames
if frame_load_cap > 0 and frames_added >= frame_load_cap:
break
if meta_batch is not None:
meta_batch.inputs.pop(unique_id)
meta_batch.has_closed_inputs = True
if prev_frame is not None:
yield prev_frame
def ffmpeg_frame_generator(video, force_rate, frame_load_cap, start_time,
custom_width, custom_height, downscale_ratio=8,
meta_batch=None, unique_id=None):
args_dummy = [ffmpeg_path, "-i", video, '-c', 'copy', '-frames:v', '1', "-f", "null", "-"]
size_base = None
fps_base = None
try:
dummy_res = subprocess.run(args_dummy, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as e:
raise Exception("An error occurred in the ffmpeg subprocess:\n" \
+ e.stderr.decode(*ENCODE_ARGS))
lines = dummy_res.stderr.decode(*ENCODE_ARGS)
for line in lines.split('\n'):
match = re.search("^ *Stream .* Video.*, ([1-9]|\\d{2,})x(\\d+)", line)
if match is not None:
size_base = [int(match.group(1)), int(match.group(2))]
fps_match = re.search(", ([\\d\\.]+) fps", line)
if fps_match:
fps_base = float(fps_match.group(1))
else:
fps_base = 1
alpha = re.search("(yuva|rgba)", line) is not None
break
else:
raise Exception("Failed to parse video/image information. FFMPEG output:\n" + lines)
durs_match = re.search("Duration: (\\d+:\\d+:\\d+\\.\\d+),", lines)
if durs_match:
durs = durs_match.group(1).split(':')
duration = int(durs[0])*360 + int(durs[1])*60 + float(durs[2])
else:
duration = 0
if start_time > 0:
if start_time > 4:
post_seek = ['-ss', '4']
pre_seek = ['-ss', str(start_time - 4)]
else:
post_seek = ['-ss', str(start_time)]
pre_seek = []
else:
pre_seek = []
post_seek = []
args_all_frames = [ffmpeg_path, "-v", "error", "-an"] + pre_seek + \
["-i", video, "-pix_fmt", "rgba64le"] + post_seek
vfilters = []
if force_rate != 0:
vfilters.append("fps=fps="+str(force_rate))
if custom_width != 0 or custom_height != 0:
size = target_size(size_base[0], size_base[1], custom_width,
custom_height, downscale_ratio=downscale_ratio)
ar = float(size[0])/float(size[1])
if abs(size_base[0]*ar-size_base[1]) >= 1:
#Aspect ratio is changed. Crop to new aspect ratio before scale
vfilters.append(f"crop=if(gt({ar}\\,a)\\,iw\\,ih*{ar}):if(gt({ar}\\,a)\\,iw/{ar}\\,ih)")
size_arg = ':'.join(map(str,size))
vfilters.append(f"scale={size_arg}")
else:
size = size_base
if len(vfilters) > 0:
args_all_frames += ["-vf", ",".join(vfilters)]
yieldable_frames = (force_rate or fps_base)*duration
if frame_load_cap > 0:
args_all_frames += ["-frames:v", str(frame_load_cap)]
yieldable_frames = min(yieldable_frames, frame_load_cap)
yield (size_base[0], size_base[1], fps_base, duration, fps_base * duration,
1/(force_rate or fps_base), yieldable_frames, size[0], size[1], alpha)
args_all_frames += ["-f", "rawvideo", "-"]
pbar = ProgressBar(yieldable_frames)
try:
with subprocess.Popen(args_all_frames, stdout=subprocess.PIPE) as proc:
#Manually buffer enough bytes for an image
bpi = size[0] * size[1] * 8
current_bytes = bytearray(bpi)
current_offset=0
prev_frame = None
while True:
bytes_read = proc.stdout.read(bpi - current_offset)
if bytes_read is None:#sleep to wait for more data
time.sleep(.1)
continue
if len(bytes_read) == 0:#EOF
break
current_bytes[current_offset:len(bytes_read)] = bytes_read
current_offset+=len(bytes_read)
if current_offset == bpi:
if prev_frame is not None:
yield prev_frame
pbar.update(1)
prev_frame = np.frombuffer(current_bytes, dtype=np.dtype(np.uint16).newbyteorder("<")).reshape(size[1], size[0], 4) / (2**16-1)
if not alpha:
prev_frame = prev_frame[:, :, :-1]
current_offset = 0
except BrokenPipeError as e:
raise Exception("An error occured in the ffmpeg subprocess:\n" \
+ proc.stderr.read().decode(*ENCODE_ARGS))
if meta_batch is not None:
meta_batch.inputs.pop(unique_id)
meta_batch.has_closed_inputs = True
if prev_frame is not None:
yield prev_frame
#Python 3.12 adds an itertools.batched, but it's easily replicated for legacy support
def batched(it, n):
while batch := tuple(itertools.islice(it, n)):
yield batch
def batched_vae_encode(images, vae, frames_per_batch):
for batch in batched(images, frames_per_batch):
image_batch = torch.from_numpy(np.array(batch))
yield from vae.encode(image_batch).numpy()
def resized_cv_frame_gen(custom_width, custom_height, downscale_ratio, **kwargs):
gen = cv_frame_generator(**kwargs)
info = next(gen)
width, height = info[0], info[1]
frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1
if kwargs.get('meta_batch', None) is not None:
frames_per_batch = min(frames_per_batch, kwargs['meta_batch'].frames_per_batch)
if custom_width != 0 or custom_height != 0 or downscale_ratio is not None:
new_size = target_size(width, height, custom_width, custom_height, downscale_ratio)
yield (*info, new_size[0], new_size[1], False)
if new_size[0] != width or new_size[1] != height:
def rescale(frame):
s = torch.from_numpy(np.fromiter(frame, np.dtype((np.float32, (height, width, 3)))))
s = s.movedim(-1,1)
s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center")
return s.movedim(1,-1).numpy()
yield from itertools.chain.from_iterable(map(rescale, batched(gen, frames_per_batch)))
return
else:
yield (*info, info[0], info[1], False)
yield from gen
def load_video(meta_batch=None, unique_id=None, memory_limit_mb=None, vae=None,
generator=resized_cv_frame_gen, format='None', **kwargs):
if 'force_size' in kwargs:
kwargs.pop('force_size')
logger.warn("force_size has been removed. Did you reload the webpage after updating?")
format = get_format(format)
kwargs['video'] = strip_path(kwargs['video'])
if vae is not None:
downscale_ratio = getattr(vae, "downscale_ratio", 8)
else:
downscale_ratio = format.get('dim', (1,))[0]
if meta_batch is None or unique_id not in meta_batch.inputs:
gen = generator(meta_batch=meta_batch, unique_id=unique_id, downscale_ratio=downscale_ratio, **kwargs)
(width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = next(gen)
if meta_batch is not None:
meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha)
if yieldable_frames:
meta_batch.total_frames = min(meta_batch.total_frames, yieldable_frames)
else:
(gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = meta_batch.inputs[unique_id]
memory_limit = None
if memory_limit_mb is not None:
memory_limit *= 2 ** 20
else:
#TODO: verify if garbage collection should be performed here.
#leaves ~128 MB unreserved for safety
try:
memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27
except:
logger.warn("Failed to calculate available memory. Memory load limit has been disabled")
memory_limit = BIGMAX
if vae is not None:
#space required to load as f32, exist as latent with wiggle room, decode to f32
max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10)))
else:
#TODO: use better estimate for when vae is not None
#Consider completely ignoring for load_latent case?
max_loadable_frames = int(memory_limit//(width*height*3*(.1)))
if meta_batch is not None:
if 'frames' in format:
if meta_batch.frames_per_batch % format['frames'][0] != format['frames'][1]:
error = (meta_batch.frames_per_batch - format['frames'][1]) % format['frames'][0]
suggested = meta_batch.frames_per_batch - error
if error > format['frames'][0] / 2:
suggested += format['frames'][0]
raise RuntimeError(f"The chosen frames per batch is incompatible with the selected format. Try {suggested}")
if meta_batch.frames_per_batch > max_loadable_frames:
raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory")
gen = itertools.islice(gen, meta_batch.frames_per_batch)
else:
original_gen = gen
gen = itertools.islice(gen, max_loadable_frames)
frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1
if vae is not None:
gen = batched_vae_encode(gen, vae, frames_per_batch)
vw,vh = new_width//downscale_ratio, new_height//downscale_ratio
channels = getattr(vae, 'latent_channels', 4)
images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (channels,vh,vw)))))
else:
#Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2
images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (new_height, new_width, 4 if alpha else 3)))))
if meta_batch is None and memory_limit is not None:
try:
next(original_gen)
raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.")
except StopIteration:
pass
if len(images) == 0:
raise RuntimeError("No frames generated")
if 'frames' in format and len(images) % format['frames'][0] != format['frames'][1]:
err_msg = f"The number of frames loaded {len(images)}, does not match the requirements of the currently selected format."
if len(format['frames']) > 2 and format['frames'][2]:
raise RuntimeError(err_msg)
div, mod = format['frames'][:2]
frames = (len(images) - mod) // div * div + mod
images = images[:frames]
#Commenting out log message since it's displayed in UI. consider further
#logger.warn(err_msg + f" Output has been truncated to {len(images)} frames.")
if 'start_time' in kwargs:
start_time = kwargs['start_time']
else:
start_time = kwargs['skip_first_frames'] * target_frame_time
target_frame_time *= kwargs.get('select_every_nth', 1)
#Setup lambda for lazy audio capture
audio = lazy_get_audio(kwargs['video'], start_time, kwargs['frame_load_cap']*target_frame_time)
#Adjust target_frame_time for select_every_nth
video_info = {
"source_fps": fps,
"source_frame_count": total_frames,
"source_duration": duration,
"source_width": width,
"source_height": height,
"loaded_fps": 1/target_frame_time,
"loaded_frame_count": len(images),
"loaded_duration": len(images) * target_frame_time,
"loaded_width": new_width,
"loaded_height": new_height,
}
if vae is None:
return (images, len(images), audio, video_info)
else:
return ({"samples": images}, len(images), audio, video_info)
class LoadVideoUpload:
@classmethod
def INPUT_TYPES(s):
input_dir = folder_paths.get_input_directory()
files = []
for f in os.listdir(input_dir):
if os.path.isfile(os.path.join(input_dir, f)):
file_parts = f.split('.')
if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions):
files.append(f)
return {"required": {
"video": (sorted(files),),
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}),
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}),
"skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}),
"select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}),
},
"optional": {
"meta_batch": ("VHS_BatchManager",),
"vae": ("VAE",),
"format": get_load_formats(),
},
"hidden": {
"force_size": "STRING",
"unique_id": "UNIQUE_ID"
},
}
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢"
RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO")
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info")
FUNCTION = "load_video"
def load_video(self, **kwargs):
kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video']))
return load_video(**kwargs)
@classmethod
def IS_CHANGED(s, video, **kwargs):
image_path = folder_paths.get_annotated_filepath(video)
return calculate_file_hash(image_path)
@classmethod
def VALIDATE_INPUTS(s, video):
if not folder_paths.exists_annotated_filepath(video):
return "Invalid video file: {}".format(video)
return True
class LoadVideoPath:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}),
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}),
"skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}),
"select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}),
},
"optional": {
"meta_batch": ("VHS_BatchManager",),
"vae": ("VAE",),
"format": get_load_formats(),
},
"hidden": {
"force_size": "STRING",
"unique_id": "UNIQUE_ID"
},
}
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢"
RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO")
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info")
FUNCTION = "load_video"
def load_video(self, **kwargs):
if kwargs['video'] is None or validate_path(kwargs['video']) != True:
raise Exception("video is not a valid path: " + kwargs['video'])
if is_url(kwargs['video']):
kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video']
return load_video(**kwargs)
@classmethod
def IS_CHANGED(s, video, **kwargs):
return hash_path(video)
@classmethod
def VALIDATE_INPUTS(s, video):
return validate_path(video, allow_none=True)
class LoadVideoFFmpegUpload:
@classmethod
def INPUT_TYPES(s):
input_dir = folder_paths.get_input_directory()
files = []
for f in os.listdir(input_dir):
if os.path.isfile(os.path.join(input_dir, f)):
file_parts = f.split('.')
if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions):
files.append(f)
return {"required": {
"video": (sorted(files),),
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}),
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}),
"start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}),
},
"optional": {
"meta_batch": ("VHS_BatchManager",),
"vae": ("VAE",),
"format": get_load_formats(),
},
"hidden": {
"force_size": "STRING",
"unique_id": "UNIQUE_ID"
},
}
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢"
RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO")
RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info")
FUNCTION = "load_video"
def load_video(self, **kwargs):
kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video']))
image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator)
if image.size(3) == 4:
return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info)
return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info)
@classmethod
def IS_CHANGED(s, video, **kwargs):
image_path = folder_paths.get_annotated_filepath(video)
return calculate_file_hash(image_path)
@classmethod
def VALIDATE_INPUTS(s, video):
if not folder_paths.exists_annotated_filepath(video):
return "Invalid video file: {}".format(video)
return True
class LoadVideoFFmpegPath:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
"force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}),
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}),
"frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}),
"start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}),
},
"optional": {
"meta_batch": ("VHS_BatchManager",),
"vae": ("VAE",),
"format": get_load_formats(),
},
"hidden": {
"force_size": "STRING",
"unique_id": "UNIQUE_ID"
},
}
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢"
RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO")
RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info")
FUNCTION = "load_video"
def load_video(self, **kwargs):
if kwargs['video'] is None or validate_path(kwargs['video']) != True:
raise Exception("video is not a valid path: " + kwargs['video'])
if is_url(kwargs['video']):
kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video']
image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator)
if isinstance(image, dict):
return (image, None, audio, video_info)
if image.size(3) == 4:
return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info)
return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info)
@classmethod
def IS_CHANGED(s, video, **kwargs):
return hash_path(video)
@classmethod
def VALIDATE_INPUTS(s, video):
return validate_path(video, allow_none=True)
class LoadImagePath:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("STRING", {"placeholder": "X://insert/path/here.png", "vhs_path_extensions": list(FolderOfImages.IMG_EXTENSIONS)}),
"custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}),
"custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}),
},
"optional": {
"vae": ("VAE",),
},
"hidden": {
"force_size": "STRING",
},
}
CATEGORY = "Video Helper Suite 🎥🅥🅗🅢"
RETURN_TYPES = (imageOrLatent, "MASK")
RETURN_NAMES = ("IMAGE", "mask")
FUNCTION = "load_image"
def load_image(self, **kwargs):
if kwargs['image'] is None or validate_path(kwargs['image']) != True:
raise Exception("image is not a valid path: " + kwargs['image'])
kwargs.update({'video': kwargs['image'], 'force_rate': 0, 'frame_load_cap': 0,
'start_time': 0})
kwargs.pop('image')
image, _, _, _ = load_video(**kwargs, generator=ffmpeg_frame_generator)
if isinstance(image, dict):
return (image, None)
if image.size(3) == 4:
return (image[:,:,:,:3], 1-image[:,:,:,3])
return (image, torch.zeros(image.size(0), 64, 64, device="cpu"))
@classmethod
def IS_CHANGED(s, image, **kwargs):
return hash_path(image)
@classmethod
def VALIDATE_INPUTS(s, image):
return validate_path(image, allow_none=True)