Spaces:
Paused
Paused
import os | |
import sys | |
import json | |
import subprocess | |
import numpy as np | |
import re | |
import datetime | |
from typing import List | |
import torch | |
from PIL import Image, ExifTags | |
from PIL.PngImagePlugin import PngInfo | |
from pathlib import Path | |
from string import Template | |
import itertools | |
import functools | |
import folder_paths | |
from .logger import logger | |
from .image_latent_nodes import * | |
from .load_video_nodes import LoadVideoUpload, LoadVideoPath, LoadVideoFFmpegUpload, LoadVideoFFmpegPath, LoadImagePath | |
from .load_images_nodes import LoadImagesFromDirectoryUpload, LoadImagesFromDirectoryPath | |
from .batched_nodes import VAEEncodeBatched, VAEDecodeBatched | |
from .utils import ffmpeg_path, get_audio, hash_path, validate_path, requeue_workflow, \ | |
gifski_path, calculate_file_hash, strip_path, try_download_video, is_url, \ | |
imageOrLatent, BIGMAX, merge_filter_args, ENCODE_ARGS, floatOrInt, cached | |
from comfy.utils import ProgressBar | |
if 'VHS_video_formats' not in folder_paths.folder_names_and_paths: | |
folder_paths.folder_names_and_paths["VHS_video_formats"] = ((),{".json"}) | |
if len(folder_paths.folder_names_and_paths['VHS_video_formats'][1]) == 0: | |
folder_paths.folder_names_and_paths["VHS_video_formats"][1].add(".json") | |
audio_extensions = ['mp3', 'mp4', 'wav', 'ogg'] | |
def gen_format_widgets(video_format): | |
for k in video_format: | |
if k.endswith("_pass"): | |
for i in range(len(video_format[k])): | |
if isinstance(video_format[k][i], list): | |
item = [video_format[k][i]] | |
yield item | |
video_format[k][i] = item[0] | |
else: | |
if isinstance(video_format[k], list): | |
item = [video_format[k]] | |
yield item | |
video_format[k] = item[0] | |
base_formats_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "video_formats") | |
def get_video_formats(): | |
format_files = {} | |
for format_name in folder_paths.get_filename_list("VHS_video_formats"): | |
format_files[format_name] = folder_paths.get_full_path("VHS_video_formats", format_name) | |
for item in os.scandir(base_formats_dir): | |
if not item.is_file() or not item.name.endswith('.json'): | |
continue | |
format_files[item.name[:-5]] = item.path | |
formats = [] | |
format_widgets = {} | |
for format_name, path in format_files.items(): | |
with open(path, 'r') as stream: | |
video_format = json.load(stream) | |
if "gifski_pass" in video_format and gifski_path is None: | |
#Skip format | |
continue | |
widgets = [w[0] for w in gen_format_widgets(video_format)] | |
formats.append("video/" + format_name) | |
if (len(widgets) > 0): | |
format_widgets["video/"+ format_name] = widgets | |
return formats, format_widgets | |
def apply_format_widgets(format_name, kwargs): | |
if os.path.exists(os.path.join(base_formats_dir, format_name + ".json")): | |
video_format_path = os.path.join(base_formats_dir, format_name + ".json") | |
else: | |
video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name) | |
with open(video_format_path, 'r') as stream: | |
video_format = json.load(stream) | |
for w in gen_format_widgets(video_format): | |
if w[0][0] not in kwargs: | |
if len(w[0]) > 2 and 'default' in w[0][2]: | |
default = w[0][2]['default'] | |
else: | |
if type(w[0][1]) is list: | |
default = w[0][1][0] | |
else: | |
#NOTE: This doesn't respect max/min, but should be good enough as a fallback to a fallback to a fallback | |
default = {"BOOLEAN": False, "INT": 0, "FLOAT": 0, "STRING": ""}[w[0][1]] | |
kwargs[w[0][0]] = default | |
logger.warn(f"Missing input for {w[0][0]} has been set to {default}") | |
if len(w[0]) > 3: | |
w[0] = Template(w[0][3]).substitute(val=kwargs[w[0][0]]) | |
else: | |
w[0] = str(kwargs[w[0][0]]) | |
return video_format | |
def tensor_to_int(tensor, bits): | |
#TODO: investigate benefit of rounding by adding 0.5 before clip/cast | |
tensor = tensor.cpu().numpy() * (2**bits-1) | |
return np.clip(tensor, 0, (2**bits-1)) | |
def tensor_to_shorts(tensor): | |
return tensor_to_int(tensor, 16).astype(np.uint16) | |
def tensor_to_bytes(tensor): | |
return tensor_to_int(tensor, 8).astype(np.uint8) | |
def ffmpeg_process(args, video_format, video_metadata, file_path, env): | |
res = None | |
frame_data = yield | |
total_frames_output = 0 | |
if video_format.get('save_metadata', 'False') != 'False': | |
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) | |
metadata = json.dumps(video_metadata) | |
metadata_path = os.path.join(folder_paths.get_temp_directory(), "metadata.txt") | |
#metadata from file should escape = ; # \ and newline | |
metadata = metadata.replace("\\","\\\\") | |
metadata = metadata.replace(";","\\;") | |
metadata = metadata.replace("#","\\#") | |
metadata = metadata.replace("=","\\=") | |
metadata = metadata.replace("\n","\\\n") | |
metadata = "comment=" + metadata | |
with open(metadata_path, "w") as f: | |
f.write(";FFMETADATA1\n") | |
f.write(metadata) | |
m_args = args[:1] + ["-i", metadata_path] + args[1:] + ["-metadata", "creation_time=now"] | |
with subprocess.Popen(m_args + [file_path], stderr=subprocess.PIPE, | |
stdin=subprocess.PIPE, env=env) as proc: | |
try: | |
while frame_data is not None: | |
proc.stdin.write(frame_data) | |
#TODO: skip flush for increased speed | |
frame_data = yield | |
total_frames_output+=1 | |
proc.stdin.flush() | |
proc.stdin.close() | |
res = proc.stderr.read() | |
except BrokenPipeError as e: | |
err = proc.stderr.read() | |
#Check if output file exists. If it does, the re-execution | |
#will also fail. This obscures the cause of the error | |
#and seems to never occur concurrent to the metadata issue | |
if os.path.exists(file_path): | |
raise Exception("An error occurred in the ffmpeg subprocess:\n" \ | |
+ err.decode(*ENCODE_ARGS)) | |
#Res was not set | |
print(err.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
logger.warn("An error occurred when saving with metadata") | |
if res != b'': | |
with subprocess.Popen(args + [file_path], stderr=subprocess.PIPE, | |
stdin=subprocess.PIPE, env=env) as proc: | |
try: | |
while frame_data is not None: | |
proc.stdin.write(frame_data) | |
frame_data = yield | |
total_frames_output+=1 | |
proc.stdin.flush() | |
proc.stdin.close() | |
res = proc.stderr.read() | |
except BrokenPipeError as e: | |
res = proc.stderr.read() | |
raise Exception("An error occurred in the ffmpeg subprocess:\n" \ | |
+ res.decode(*ENCODE_ARGS)) | |
yield total_frames_output | |
if len(res) > 0: | |
print(res.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
def gifski_process(args, video_format, file_path, env): | |
frame_data = yield | |
with subprocess.Popen(args + video_format['main_pass'] + ['-f', 'yuv4mpegpipe', '-'], | |
stderr=subprocess.PIPE, stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, env=env) as procff: | |
with subprocess.Popen([gifski_path] + video_format['gifski_pass'] | |
+ ['-q', '-o', file_path, '-'], stderr=subprocess.PIPE, | |
stdin=procff.stdout, stdout=subprocess.PIPE, | |
env=env) as procgs: | |
try: | |
while frame_data is not None: | |
procff.stdin.write(frame_data) | |
frame_data = yield | |
procff.stdin.flush() | |
procff.stdin.close() | |
resff = procff.stderr.read() | |
resgs = procgs.stderr.read() | |
outgs = procgs.stdout.read() | |
except BrokenPipeError as e: | |
procff.stdin.close() | |
resff = procff.stderr.read() | |
resgs = procgs.stderr.read() | |
raise Exception("An error occurred while creating gifski output\n" \ | |
+ "Make sure you are using gifski --version >=1.32.0\nffmpeg: " \ | |
+ resff.decode(*ENCODE_ARGS) + '\ngifski: ' + resgs.decode(*ENCODE_ARGS)) | |
if len(resff) > 0: | |
print(resff.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
if len(resgs) > 0: | |
print(resgs.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
#should always be empty as the quiet flag is passed | |
if len(outgs) > 0: | |
print(outgs.decode(*ENCODE_ARGS)) | |
def to_pingpong(inp): | |
if not hasattr(inp, "__getitem__"): | |
inp = list(inp) | |
yield from inp | |
for i in range(len(inp)-2,0,-1): | |
yield inp[i] | |
class VideoCombine: | |
def INPUT_TYPES(s): | |
ffmpeg_formats, format_widgets = get_video_formats() | |
return { | |
"required": { | |
"images": (imageOrLatent,), | |
"frame_rate": ( | |
floatOrInt, | |
{"default": 8, "min": 1, "step": 1}, | |
), | |
"loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}), | |
"filename_prefix": ("STRING", {"default": "AnimateDiff"}), | |
"format": (["image/gif", "image/webp"] + ffmpeg_formats, {'formats': format_widgets}), | |
"pingpong": ("BOOLEAN", {"default": False}), | |
"save_output": ("BOOLEAN", {"default": True}), | |
}, | |
"optional": { | |
"audio": ("AUDIO",), | |
"meta_batch": ("VHS_BatchManager",), | |
"vae": ("VAE",), | |
}, | |
"hidden": { | |
"prompt": "PROMPT", | |
"extra_pnginfo": "EXTRA_PNGINFO", | |
"unique_id": "UNIQUE_ID" | |
}, | |
} | |
RETURN_TYPES = ("VHS_FILENAMES",) | |
RETURN_NAMES = ("Filenames",) | |
OUTPUT_NODE = True | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
FUNCTION = "combine_video" | |
def combine_video( | |
self, | |
frame_rate: int, | |
loop_count: int, | |
images=None, | |
latents=None, | |
filename_prefix="AnimateDiff", | |
format="image/gif", | |
pingpong=False, | |
save_output=True, | |
prompt=None, | |
extra_pnginfo=None, | |
audio=None, | |
unique_id=None, | |
manual_format_widgets=None, | |
meta_batch=None, | |
vae=None, | |
**kwargs | |
): | |
if latents is not None: | |
images = latents | |
if images is None: | |
return ((save_output, []),) | |
if vae is not None: | |
if isinstance(images, dict): | |
images = images['samples'] | |
else: | |
vae = None | |
if isinstance(images, torch.Tensor) and images.size(0) == 0: | |
return ((save_output, []),) | |
num_frames = len(images) | |
pbar = ProgressBar(num_frames) | |
if vae is not None: | |
downscale_ratio = getattr(vae, "downscale_ratio", 8) | |
width = images.size(-1)*downscale_ratio | |
height = images.size(-2)*downscale_ratio | |
frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 | |
#Python 3.12 adds an itertools.batched, but it's easily replicated for legacy support | |
def batched(it, n): | |
while batch := tuple(itertools.islice(it, n)): | |
yield batch | |
def batched_encode(images, vae, frames_per_batch): | |
for batch in batched(iter(images), frames_per_batch): | |
image_batch = torch.from_numpy(np.array(batch)) | |
yield from vae.decode(image_batch) | |
images = batched_encode(images, vae, frames_per_batch) | |
first_image = next(images) | |
#repush first_image | |
images = itertools.chain([first_image], images) | |
#A single image has 3 dimensions. Discard higher dimensions | |
while len(first_image.shape) > 3: | |
first_image = first_image[0] | |
else: | |
first_image = images[0] | |
images = iter(images) | |
# get output information | |
output_dir = ( | |
folder_paths.get_output_directory() | |
if save_output | |
else folder_paths.get_temp_directory() | |
) | |
( | |
full_output_folder, | |
filename, | |
_, | |
subfolder, | |
_, | |
) = folder_paths.get_save_image_path(filename_prefix, output_dir) | |
output_files = [] | |
metadata = PngInfo() | |
video_metadata = {} | |
if prompt is not None: | |
metadata.add_text("prompt", json.dumps(prompt)) | |
video_metadata["prompt"] = json.dumps(prompt) | |
if extra_pnginfo is not None: | |
for x in extra_pnginfo: | |
metadata.add_text(x, json.dumps(extra_pnginfo[x])) | |
video_metadata[x] = extra_pnginfo[x] | |
extra_options = extra_pnginfo.get('workflow', {}).get('extra', {}) | |
else: | |
extra_options = {} | |
metadata.add_text("CreationTime", datetime.datetime.now().isoformat(" ")[:19]) | |
if meta_batch is not None and unique_id in meta_batch.outputs: | |
(counter, output_process) = meta_batch.outputs[unique_id] | |
else: | |
# comfy counter workaround | |
max_counter = 0 | |
# Loop through the existing files | |
matcher = re.compile(f"{re.escape(filename)}_(\\d+)\\D*\\..+", re.IGNORECASE) | |
for existing_file in os.listdir(full_output_folder): | |
# Check if the file matches the expected format | |
match = matcher.fullmatch(existing_file) | |
if match: | |
# Extract the numeric portion of the filename | |
file_counter = int(match.group(1)) | |
# Update the maximum counter value if necessary | |
if file_counter > max_counter: | |
max_counter = file_counter | |
# Increment the counter by 1 to get the next available value | |
counter = max_counter + 1 | |
output_process = None | |
# save first frame as png to keep metadata | |
first_image_file = f"{filename}_{counter:05}.png" | |
file_path = os.path.join(full_output_folder, first_image_file) | |
if extra_options.get('VHS_MetadataImage', True) != False: | |
Image.fromarray(tensor_to_bytes(first_image)).save( | |
file_path, | |
pnginfo=metadata, | |
compress_level=4, | |
) | |
output_files.append(file_path) | |
format_type, format_ext = format.split("/") | |
if format_type == "image": | |
if meta_batch is not None: | |
raise Exception("Pillow('image/') formats are not compatible with batched output") | |
image_kwargs = {} | |
if format_ext == "gif": | |
image_kwargs['disposal'] = 2 | |
if format_ext == "webp": | |
#Save timestamp information | |
exif = Image.Exif() | |
exif[ExifTags.IFD.Exif] = {36867: datetime.datetime.now().isoformat(" ")[:19]} | |
image_kwargs['exif'] = exif | |
image_kwargs['lossless'] = True | |
file = f"{filename}_{counter:05}.{format_ext}" | |
file_path = os.path.join(full_output_folder, file) | |
if pingpong: | |
images = to_pingpong(images) | |
def frames_gen(images): | |
for i in images: | |
pbar.update(1) | |
yield Image.fromarray(tensor_to_bytes(i)) | |
frames = frames_gen(images) | |
# Use pillow directly to save an animated image | |
next(frames).save( | |
file_path, | |
format=format_ext.upper(), | |
save_all=True, | |
append_images=frames, | |
duration=round(1000 / frame_rate), | |
loop=loop_count, | |
compress_level=4, | |
**image_kwargs | |
) | |
output_files.append(file_path) | |
else: | |
# Use ffmpeg to save a video | |
if ffmpeg_path is None: | |
raise ProcessLookupError(f"ffmpeg is required for video outputs and could not be found.\nIn order to use video outputs, you must either:\n- Install imageio-ffmpeg with pip,\n- Place a ffmpeg executable in {os.path.abspath('')}, or\n- Install ffmpeg and add it to the system path.") | |
#Acquire additional format_widget values | |
if manual_format_widgets is None: | |
if prompt is not None: | |
kwargs, passed_kwargs = prompt[unique_id]['inputs'], kwargs | |
kwargs.update(passed_kwargs) | |
else: | |
manual_format_widgets = {} | |
if kwargs is None: | |
missing = {} | |
for k in kwargs.keys(): | |
if k in manual_format_widgets: | |
kwargs[k] = manual_format_widgets[k] | |
else: | |
missing[k] = kwargs[k] | |
if len(missing) > 0: | |
logger.warn("Extra format values were not provided, the following defaults will be used: " + str(kwargs) + "\nThis is likely due to usage of ComfyUI-to-python. These values can be manually set by supplying a manual_format_widgets argument") | |
video_format = apply_format_widgets(format_ext, kwargs) | |
has_alpha = first_image.shape[-1] == 4 | |
dim_alignment = video_format.get("dim_alignment", 2) | |
if (first_image.shape[1] % dim_alignment) or (first_image.shape[0] % dim_alignment): | |
#output frames must be padded | |
to_pad = (-first_image.shape[1] % dim_alignment, | |
-first_image.shape[0] % dim_alignment) | |
padding = (to_pad[0]//2, to_pad[0] - to_pad[0]//2, | |
to_pad[1]//2, to_pad[1] - to_pad[1]//2) | |
padfunc = torch.nn.ReplicationPad2d(padding) | |
def pad(image): | |
image = image.permute((2,0,1))#HWC to CHW | |
padded = padfunc(image.to(dtype=torch.float32)) | |
return padded.permute((1,2,0)) | |
images = map(pad, images) | |
new_dims = (-first_image.shape[1] % dim_alignment + first_image.shape[1], | |
-first_image.shape[0] % dim_alignment + first_image.shape[0]) | |
dimensions = f"{new_dims[0]}x{new_dims[1]}" | |
logger.warn("Output images were not of valid resolution and have had padding applied") | |
else: | |
dimensions = f"{first_image.shape[1]}x{first_image.shape[0]}" | |
if loop_count > 0: | |
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(num_frames)] | |
else: | |
loop_args = [] | |
if pingpong: | |
if meta_batch is not None: | |
logger.error("pingpong is incompatible with batched output") | |
images = to_pingpong(images) | |
if video_format.get('input_color_depth', '8bit') == '16bit': | |
images = map(tensor_to_shorts, images) | |
if has_alpha: | |
i_pix_fmt = 'rgba64' | |
else: | |
i_pix_fmt = 'rgb48' | |
else: | |
images = map(tensor_to_bytes, images) | |
if has_alpha: | |
i_pix_fmt = 'rgba' | |
else: | |
i_pix_fmt = 'rgb24' | |
file = f"{filename}_{counter:05}.{video_format['extension']}" | |
file_path = os.path.join(full_output_folder, file) | |
bitrate_arg = [] | |
bitrate = video_format.get('bitrate') | |
if bitrate is not None: | |
bitrate_arg = ["-b:v", str(bitrate) + "M" if video_format.get('megabit') == 'True' else str(bitrate) + "K"] | |
args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", i_pix_fmt, | |
"-s", dimensions, "-r", str(frame_rate), "-i", "-"] \ | |
+ loop_args | |
images = map(lambda x: x.tobytes(), images) | |
env=os.environ.copy() | |
if "environment" in video_format: | |
env.update(video_format["environment"]) | |
if "pre_pass" in video_format: | |
if meta_batch is not None: | |
#Performing a prepass requires keeping access to all frames. | |
#Potential solutions include keeping just output frames in | |
#memory or using 3 passes with intermediate file, but | |
#very long gifs probably shouldn't be encouraged | |
raise Exception("Formats which require a pre_pass are incompatible with Batch Manager.") | |
images = [b''.join(images)] | |
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) | |
pre_pass_args = args[:13] + video_format['pre_pass'] | |
merge_filter_args(pre_pass_args) | |
try: | |
subprocess.run(pre_pass_args, input=images[0], env=env, | |
capture_output=True, check=True) | |
except subprocess.CalledProcessError as e: | |
raise Exception("An error occurred in the ffmpeg prepass:\n" \ | |
+ e.stderr.decode(*ENCODE_ARGS)) | |
if "inputs_main_pass" in video_format: | |
args = args[:13] + video_format['inputs_main_pass'] + args[13:] | |
if output_process is None: | |
if 'gifski_pass' in video_format: | |
output_process = gifski_process(args, video_format, file_path, env) | |
else: | |
args += video_format['main_pass'] + bitrate_arg | |
merge_filter_args(args) | |
output_process = ffmpeg_process(args, video_format, video_metadata, file_path, env) | |
#Proceed to first yield | |
output_process.send(None) | |
if meta_batch is not None: | |
meta_batch.outputs[unique_id] = (counter, output_process) | |
for image in images: | |
pbar.update(1) | |
output_process.send(image) | |
if meta_batch is not None: | |
requeue_workflow((meta_batch.unique_id, not meta_batch.has_closed_inputs)) | |
if meta_batch is None or meta_batch.has_closed_inputs: | |
#Close pipe and wait for termination. | |
try: | |
total_frames_output = output_process.send(None) | |
output_process.send(None) | |
except StopIteration: | |
pass | |
if meta_batch is not None: | |
meta_batch.outputs.pop(unique_id) | |
if len(meta_batch.outputs) == 0: | |
meta_batch.reset() | |
else: | |
#batch is unfinished | |
#TODO: Check if empty output breaks other custom nodes | |
return {"ui": {"unfinished_batch": [True]}, "result": ((save_output, []),)} | |
output_files.append(file_path) | |
a_waveform = None | |
if audio is not None: | |
try: | |
#safely check if audio produced by VHS_LoadVideo actually exists | |
a_waveform = audio['waveform'] | |
except: | |
pass | |
if a_waveform is not None: | |
# Create audio file if input was provided | |
output_file_with_audio = f"{filename}_{counter:05}-audio.{video_format['extension']}" | |
output_file_with_audio_path = os.path.join(full_output_folder, output_file_with_audio) | |
if "audio_pass" not in video_format: | |
logger.warn("Selected video format does not have explicit audio support") | |
video_format["audio_pass"] = ["-c:a", "libopus"] | |
# FFmpeg command with audio re-encoding | |
#TODO: expose audio quality options if format widgets makes it in | |
#Reconsider forcing apad/shortest | |
channels = audio['waveform'].size(1) | |
min_audio_dur = total_frames_output / frame_rate + 1 | |
if video_format.get('trim_to_audio', 'False') != 'False': | |
apad = [] | |
else: | |
apad = ["-af", "apad=whole_dur="+str(min_audio_dur)] | |
mux_args = [ffmpeg_path, "-v", "error", "-n", "-i", file_path, | |
"-ar", str(audio['sample_rate']), "-ac", str(channels), | |
"-f", "f32le", "-i", "-", "-c:v", "copy"] \ | |
+ video_format["audio_pass"] \ | |
+ apad + ["-shortest", output_file_with_audio_path] | |
audio_data = audio['waveform'].squeeze(0).transpose(0,1) \ | |
.numpy().tobytes() | |
merge_filter_args(mux_args, '-af') | |
try: | |
res = subprocess.run(mux_args, input=audio_data, | |
env=env, capture_output=True, check=True) | |
except subprocess.CalledProcessError as e: | |
raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
+ e.stderr.decode(*ENCODE_ARGS)) | |
if res.stderr: | |
print(res.stderr.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
output_files.append(output_file_with_audio_path) | |
#Return this file with audio to the webui. | |
#It will be muted unless opened or saved with right click | |
file = output_file_with_audio | |
if extra_options.get('VHS_KeepIntermediate', True) == False: | |
for intermediate in output_files[1:-1]: | |
if os.path.exists(intermediate): | |
os.remove(intermediate) | |
preview = { | |
"filename": file, | |
"subfolder": subfolder, | |
"type": "output" if save_output else "temp", | |
"format": format, | |
"frame_rate": frame_rate, | |
"workflow": first_image_file, | |
"fullpath": output_files[-1], | |
} | |
if num_frames == 1 and 'png' in format and '%03d' in file: | |
preview['format'] = 'image/png' | |
preview['filename'] = file.replace('%03d', '001') | |
return {"ui": {"gifs": [preview]}, "result": ((save_output, output_files),)} | |
class LoadAudio: | |
def INPUT_TYPES(s): | |
#Hide ffmpeg formats if ffmpeg isn't available | |
return { | |
"required": { | |
"audio_file": ("STRING", {"default": "input/", "vhs_path_extensions": ['wav','mp3','ogg','m4a','flac']}), | |
}, | |
"optional" : {"seek_seconds": ("FLOAT", {"default": 0, "min": 0})} | |
} | |
RETURN_TYPES = ("AUDIO",) | |
RETURN_NAMES = ("audio",) | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
FUNCTION = "load_audio" | |
def load_audio(self, audio_file, seek_seconds): | |
audio_file = strip_path(audio_file) | |
if audio_file is None or validate_path(audio_file) != True: | |
raise Exception("audio_file is not a valid path: " + audio_file) | |
if is_url(audio_file): | |
audio_file = try_download_video(audio_file) or audio_file | |
#Eagerly fetch the audio since the user must be using it if the | |
#node executes, unlike Load Video | |
return (get_audio(audio_file, start_time=seek_seconds),) | |
def IS_CHANGED(s, audio_file, seek_seconds): | |
return hash_path(audio_file) | |
def VALIDATE_INPUTS(s, audio_file, **kwargs): | |
return validate_path(audio_file, allow_none=True) | |
class LoadAudioUpload: | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = [] | |
for f in os.listdir(input_dir): | |
if os.path.isfile(os.path.join(input_dir, f)): | |
file_parts = f.split('.') | |
if len(file_parts) > 1 and (file_parts[-1] in audio_extensions): | |
files.append(f) | |
return {"required": { | |
"audio": (sorted(files),), | |
"start_time": ("FLOAT" , {"default": 0, "min": 0, "max": 10000000, "step": 0.01}), | |
"duration": ("FLOAT" , {"default": 0, "min": 0, "max": 10000000, "step": 0.01}), | |
}, | |
} | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
RETURN_TYPES = ("AUDIO", ) | |
RETURN_NAMES = ("audio",) | |
FUNCTION = "load_audio" | |
def load_audio(self, start_time, duration, **kwargs): | |
audio_file = folder_paths.get_annotated_filepath(strip_path(kwargs['audio'])) | |
if audio_file is None or validate_path(audio_file) != True: | |
raise Exception("audio_file is not a valid path: " + audio_file) | |
return (get_audio(audio_file, start_time, duration),) | |
def IS_CHANGED(s, audio, start_time, duration): | |
audio_file = folder_paths.get_annotated_filepath(strip_path(audio)) | |
return hash_path(audio_file) | |
def VALIDATE_INPUTS(s, audio, **kwargs): | |
audio_file = folder_paths.get_annotated_filepath(strip_path(audio)) | |
return validate_path(audio_file, allow_none=True) | |
class AudioToVHSAudio: | |
"""Legacy method for external nodes that utilized VHS_AUDIO, | |
VHS_AUDIO is deprecated as a format and should no longer be used""" | |
def INPUT_TYPES(s): | |
return {"required": {"audio": ("AUDIO",)}} | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
RETURN_TYPES = ("VHS_AUDIO", ) | |
RETURN_NAMES = ("vhs_audio",) | |
FUNCTION = "convert_audio" | |
def convert_audio(self, audio): | |
ar = str(audio['sample_rate']) | |
ac = str(audio['waveform'].size(1)) | |
mux_args = [ffmpeg_path, "-f", "f32le", "-ar", ar, "-ac", ac, | |
"-i", "-", "-f", "wav", "-"] | |
audio_data = audio['waveform'].squeeze(0).transpose(0,1) \ | |
.numpy().tobytes() | |
try: | |
res = subprocess.run(mux_args, input=audio_data, | |
capture_output=True, check=True) | |
except subprocess.CalledProcessError as e: | |
raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
+ e.stderr.decode(*ENCODE_ARGS)) | |
if res.stderr: | |
print(res.stderr.decode(*ENCODE_ARGS), end="", file=sys.stderr) | |
return (lambda: res.stdout,) | |
class VHSAudioToAudio: | |
"""Legacy method for external nodes that utilized VHS_AUDIO, | |
VHS_AUDIO is deprecated as a format and should no longer be used""" | |
def INPUT_TYPES(s): | |
return {"required": {"vhs_audio": ("VHS_AUDIO",)}} | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’/audio" | |
RETURN_TYPES = ("AUDIO", ) | |
RETURN_NAMES = ("audio",) | |
FUNCTION = "convert_audio" | |
def convert_audio(self, vhs_audio): | |
if not vhs_audio or not vhs_audio(): | |
raise Exception("audio input is not valid") | |
args = [ffmpeg_path, "-i", '-'] | |
try: | |
res = subprocess.run(args + ["-f", "f32le", "-"], input=vhs_audio(), | |
capture_output=True, check=True) | |
audio = torch.frombuffer(bytearray(res.stdout), dtype=torch.float32) | |
except subprocess.CalledProcessError as e: | |
raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
+ e.stderr.decode(*ENCODE_ARGS)) | |
match = re.search(', (\\d+) Hz, (\\w+), ',res.stderr.decode(*ENCODE_ARGS)) | |
if match: | |
ar = int(match.group(1)) | |
#NOTE: Just throwing an error for other channel types right now | |
#Will deal with issues if they come | |
ac = {"mono": 1, "stereo": 2}[match.group(2)] | |
else: | |
ar = 44100 | |
ac = 2 | |
audio = audio.reshape((-1,ac)).transpose(0,1).unsqueeze(0) | |
return ({'waveform': audio, 'sample_rate': ar},) | |
class PruneOutputs: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"filenames": ("VHS_FILENAMES",), | |
"options": (["Intermediate", "Intermediate and Utility"],) | |
} | |
} | |
RETURN_TYPES = () | |
OUTPUT_NODE = True | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
FUNCTION = "prune_outputs" | |
def prune_outputs(self, filenames, options): | |
if len(filenames[1]) == 0: | |
return () | |
assert(len(filenames[1]) <= 3 and len(filenames[1]) >= 2) | |
delete_list = [] | |
if options in ["Intermediate", "Intermediate and Utility", "All"]: | |
delete_list += filenames[1][1:-1] | |
if options in ["Intermediate and Utility", "All"]: | |
delete_list.append(filenames[1][0]) | |
if options in ["All"]: | |
delete_list.append(filenames[1][-1]) | |
output_dirs = [folder_paths.get_output_directory(), | |
folder_paths.get_temp_directory()] | |
for file in delete_list: | |
#Check that path is actually an output directory | |
if (os.path.commonpath([output_dirs[0], file]) != output_dirs[0]) \ | |
and (os.path.commonpath([output_dirs[1], file]) != output_dirs[1]): | |
raise Exception("Tried to prune output from invalid directory: " + file) | |
if os.path.exists(file): | |
os.remove(file) | |
return () | |
class BatchManager: | |
def __init__(self, frames_per_batch=-1): | |
self.frames_per_batch = frames_per_batch | |
self.inputs = {} | |
self.outputs = {} | |
self.unique_id = None | |
self.has_closed_inputs = False | |
self.total_frames = float('inf') | |
def reset(self): | |
self.close_inputs() | |
for key in self.outputs: | |
if getattr(self.outputs[key][-1], "gi_suspended", False): | |
try: | |
self.outputs[key][-1].send(None) | |
except StopIteration: | |
pass | |
self.__init__(self.frames_per_batch) | |
def has_open_inputs(self): | |
return len(self.inputs) > 0 | |
def close_inputs(self): | |
for key in self.inputs: | |
if getattr(self.inputs[key][-1], "gi_suspended", False): | |
try: | |
self.inputs[key][-1].send(1) | |
except StopIteration: | |
pass | |
self.inputs = {} | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"frames_per_batch": ("INT", {"default": 16, "min": 1, "max": BIGMAX, "step": 1}) | |
}, | |
"hidden": { | |
"prompt": "PROMPT", | |
"unique_id": "UNIQUE_ID" | |
}, | |
} | |
RETURN_TYPES = ("VHS_BatchManager",) | |
RETURN_NAMES = ("meta_batch",) | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
FUNCTION = "update_batch" | |
def update_batch(self, frames_per_batch, prompt=None, unique_id=None): | |
if unique_id is not None and prompt is not None: | |
requeue = prompt[unique_id]['inputs'].get('requeue', 0) | |
else: | |
requeue = 0 | |
if requeue == 0: | |
self.reset() | |
self.frames_per_batch = frames_per_batch | |
self.unique_id = unique_id | |
else: | |
num_batches = (self.total_frames+self.frames_per_batch-1)//frames_per_batch | |
print(f'Meta-Batch {requeue}/{num_batches}') | |
#onExecuted seems to not be called unless some message is sent | |
return (self,) | |
class VideoInfo: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"video_info": ("VHS_VIDEOINFO",), | |
} | |
} | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
RETURN_TYPES = ("FLOAT","INT", "FLOAT", "INT", "INT", "FLOAT","INT", "FLOAT", "INT", "INT") | |
RETURN_NAMES = ( | |
"source_fpsπ¨", | |
"source_frame_countπ¨", | |
"source_durationπ¨", | |
"source_widthπ¨", | |
"source_heightπ¨", | |
"loaded_fpsπ¦", | |
"loaded_frame_countπ¦", | |
"loaded_durationπ¦", | |
"loaded_widthπ¦", | |
"loaded_heightπ¦", | |
) | |
FUNCTION = "get_video_info" | |
def get_video_info(self, video_info): | |
keys = ["fps", "frame_count", "duration", "width", "height"] | |
source_info = [] | |
loaded_info = [] | |
for key in keys: | |
source_info.append(video_info[f"source_{key}"]) | |
loaded_info.append(video_info[f"loaded_{key}"]) | |
return (*source_info, *loaded_info) | |
class VideoInfoSource: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"video_info": ("VHS_VIDEOINFO",), | |
} | |
} | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
RETURN_TYPES = ("FLOAT","INT", "FLOAT", "INT", "INT",) | |
RETURN_NAMES = ( | |
"fpsπ¨", | |
"frame_countπ¨", | |
"durationπ¨", | |
"widthπ¨", | |
"heightπ¨", | |
) | |
FUNCTION = "get_video_info" | |
def get_video_info(self, video_info): | |
keys = ["fps", "frame_count", "duration", "width", "height"] | |
source_info = [] | |
for key in keys: | |
source_info.append(video_info[f"source_{key}"]) | |
return (*source_info,) | |
class VideoInfoLoaded: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"video_info": ("VHS_VIDEOINFO",), | |
} | |
} | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
RETURN_TYPES = ("FLOAT","INT", "FLOAT", "INT", "INT",) | |
RETURN_NAMES = ( | |
"fpsπ¦", | |
"frame_countπ¦", | |
"durationπ¦", | |
"widthπ¦", | |
"heightπ¦", | |
) | |
FUNCTION = "get_video_info" | |
def get_video_info(self, video_info): | |
keys = ["fps", "frame_count", "duration", "width", "height"] | |
loaded_info = [] | |
for key in keys: | |
loaded_info.append(video_info[f"loaded_{key}"]) | |
return (*loaded_info,) | |
class SelectFilename: | |
def INPUT_TYPES(s): | |
return {"required": {"filenames": ("VHS_FILENAMES",), "index": ("INT", {"default": -1, "step": 1, "min": -1})}} | |
RETURN_TYPES = ("STRING",) | |
RETURN_NAMES =("Filename",) | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
FUNCTION = "select_filename" | |
def select_filename(self, filenames, index): | |
return (filenames[1][index],) | |
class Unbatch: | |
class Any(str): | |
def __ne__(self, other): | |
return False | |
def INPUT_TYPES(s): | |
return {"required": {"batched": ("*",)}} | |
RETURN_TYPES = (Any('*'),) | |
INPUT_IS_LIST = True | |
RETURN_NAMES =("unbatched",) | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
FUNCTION = "unbatch" | |
def unbatch(self, batched): | |
if isinstance(batched[0], torch.Tensor): | |
return (torch.cat(batched),) | |
if isinstance(batched[0], dict): | |
out = batched[0].copy() | |
out['samples'] = torch.cat([x['samples'] for x in batched]) | |
out.pop('batch_index', None) | |
return (out,) | |
return (functools.reduce(lambda x,y: x+y, batched),) | |
def VALIDATE_INPUTS(cls, input_types): | |
return True | |
class SelectLatest: | |
def INPUT_TYPES(s): | |
return {"required": {"filename_prefix": ("STRING", {'default': 'output/AnimateDiff', 'vhs_path_extensions': []}), | |
"filename_postfix": ("STRING", {"placeholder": ".webm"})}} | |
RETURN_TYPES = ("STRING",) | |
RETURN_NAMES =("Filename",) | |
CATEGORY = "Video Helper Suite π₯π ₯π π ’" | |
FUNCTION = "select_latest" | |
EXPERIMENTAL = True | |
def select_latest(self, filename_prefix, filename_postfix): | |
assert False, "Not Reachable" | |
NODE_CLASS_MAPPINGS = { | |
"VHS_VideoCombine": VideoCombine, | |
"VHS_LoadVideo": LoadVideoUpload, | |
"VHS_LoadVideoPath": LoadVideoPath, | |
"VHS_LoadVideoFFmpeg": LoadVideoFFmpegUpload, | |
"VHS_LoadVideoFFmpegPath": LoadVideoFFmpegPath, | |
"VHS_LoadImagePath": LoadImagePath, | |
"VHS_LoadImages": LoadImagesFromDirectoryUpload, | |
"VHS_LoadImagesPath": LoadImagesFromDirectoryPath, | |
"VHS_LoadAudio": LoadAudio, | |
"VHS_LoadAudioUpload": LoadAudioUpload, | |
"VHS_AudioToVHSAudio": AudioToVHSAudio, | |
"VHS_VHSAudioToAudio": VHSAudioToAudio, | |
"VHS_PruneOutputs": PruneOutputs, | |
"VHS_BatchManager": BatchManager, | |
"VHS_VideoInfo": VideoInfo, | |
"VHS_VideoInfoSource": VideoInfoSource, | |
"VHS_VideoInfoLoaded": VideoInfoLoaded, | |
"VHS_SelectFilename": SelectFilename, | |
# Batched Nodes | |
"VHS_VAEEncodeBatched": VAEEncodeBatched, | |
"VHS_VAEDecodeBatched": VAEDecodeBatched, | |
# Latent and Image nodes | |
"VHS_SplitLatents": SplitLatents, | |
"VHS_SplitImages": SplitImages, | |
"VHS_SplitMasks": SplitMasks, | |
"VHS_MergeLatents": MergeLatents, | |
"VHS_MergeImages": MergeImages, | |
"VHS_MergeMasks": MergeMasks, | |
"VHS_GetLatentCount": GetLatentCount, | |
"VHS_GetImageCount": GetImageCount, | |
"VHS_GetMaskCount": GetMaskCount, | |
"VHS_DuplicateLatents": RepeatLatents, | |
"VHS_DuplicateImages": RepeatImages, | |
"VHS_DuplicateMasks": RepeatMasks, | |
"VHS_SelectEveryNthLatent": SelectEveryNthLatent, | |
"VHS_SelectEveryNthImage": SelectEveryNthImage, | |
"VHS_SelectEveryNthMask": SelectEveryNthMask, | |
"VHS_SelectLatents": SelectLatents, | |
"VHS_SelectImages": SelectImages, | |
"VHS_SelectMasks": SelectMasks, | |
"VHS_Unbatch": Unbatch, | |
"VHS_SelectLatest": SelectLatest, | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
"VHS_VideoCombine": "Video Combine π₯π ₯π π ’", | |
"VHS_LoadVideo": "Load Video (Upload) π₯π ₯π π ’", | |
"VHS_LoadVideoPath": "Load Video (Path) π₯π ₯π π ’", | |
"VHS_LoadVideoFFmpeg": "Load Video FFmpeg (Upload) π₯π ₯π π ’", | |
"VHS_LoadVideoFFmpegPath": "Load Video FFmpeg (Path) π₯π ₯π π ’", | |
"VHS_LoadImagePath": "Load Image (Path) π₯π ₯π π ’", | |
"VHS_LoadImages": "Load Images (Upload) π₯π ₯π π ’", | |
"VHS_LoadImagesPath": "Load Images (Path) π₯π ₯π π ’", | |
"VHS_LoadAudio": "Load Audio (Path)π₯π ₯π π ’", | |
"VHS_LoadAudioUpload": "Load Audio (Upload)π₯π ₯π π ’", | |
"VHS_AudioToVHSAudio": "Audio to legacy VHS_AUDIOπ₯π ₯π π ’", | |
"VHS_VHSAudioToAudio": "Legacy VHS_AUDIO to Audioπ₯π ₯π π ’", | |
"VHS_PruneOutputs": "Prune Outputs π₯π ₯π π ’", | |
"VHS_BatchManager": "Meta Batch Manager π₯π ₯π π ’", | |
"VHS_VideoInfo": "Video Info π₯π ₯π π ’", | |
"VHS_VideoInfoSource": "Video Info (Source) π₯π ₯π π ’", | |
"VHS_VideoInfoLoaded": "Video Info (Loaded) π₯π ₯π π ’", | |
"VHS_SelectFilename": "Select Filename π₯π ₯π π ’", | |
# Batched Nodes | |
"VHS_VAEEncodeBatched": "VAE Encode Batched π₯π ₯π π ’", | |
"VHS_VAEDecodeBatched": "VAE Decode Batched π₯π ₯π π ’", | |
# Latent and Image nodes | |
"VHS_SplitLatents": "Split Latents π₯π ₯π π ’", | |
"VHS_SplitImages": "Split Images π₯π ₯π π ’", | |
"VHS_SplitMasks": "Split Masks π₯π ₯π π ’", | |
"VHS_MergeLatents": "Merge Latents π₯π ₯π π ’", | |
"VHS_MergeImages": "Merge Images π₯π ₯π π ’", | |
"VHS_MergeMasks": "Merge Masks π₯π ₯π π ’", | |
"VHS_GetLatentCount": "Get Latent Count π₯π ₯π π ’", | |
"VHS_GetImageCount": "Get Image Count π₯π ₯π π ’", | |
"VHS_GetMaskCount": "Get Mask Count π₯π ₯π π ’", | |
"VHS_DuplicateLatents": "Repeat Latents π₯π ₯π π ’", | |
"VHS_DuplicateImages": "Repeat Images π₯π ₯π π ’", | |
"VHS_DuplicateMasks": "Repeat Masks π₯π ₯π π ’", | |
"VHS_SelectEveryNthLatent": "Select Every Nth Latent π₯π ₯π π ’", | |
"VHS_SelectEveryNthImage": "Select Every Nth Image π₯π ₯π π ’", | |
"VHS_SelectEveryNthMask": "Select Every Nth Mask π₯π ₯π π ’", | |
"VHS_SelectLatents": "Select Latents π₯π ₯π π ’", | |
"VHS_SelectImages": "Select Images π₯π ₯π π ’", | |
"VHS_SelectMasks": "Select Masks π₯π ₯π π ’", | |
"VHS_Unbatch": "Unbatch π₯π ₯π π ’", | |
"VHS_SelectLatest": "Select Latest π₯π ₯π π ’", | |
} | |