Spaces:
Running
Running
import os | |
import gc | |
import cv2 | |
import time | |
import tempfile | |
import mimetypes | |
import traceback | |
import numpy as np | |
import gradio as gr | |
# --- Logging Helper --- | |
def log_and_print(message, current_log=""): | |
"""Prints a message to the console and appends it to the log string.""" | |
print(message) # Print to console | |
return current_log + message + "\n" # Append to log string with newline | |
# --- Helper Function: Crop Image by Percentage --- | |
def crop_image_by_percent(image, crop_top_percent=0.0, crop_bottom_percent=0.0): | |
""" | |
Crops the top and/or bottom portion of an image based on percentage. | |
Args: | |
image: The input image (NumPy array). | |
crop_top_percent: Percentage of height to crop from the top (0-100). | |
crop_bottom_percent: Percentage of height to crop from the bottom (0-100). | |
Returns: | |
The cropped image (NumPy array), or the original image if cropping is not needed | |
or percentages are invalid. Returns None if the input image is invalid. | |
""" | |
if image is None or image.size == 0: | |
# print("Warning: Invalid input image to crop_image_by_percent.") | |
return None # Return None for invalid input | |
if crop_top_percent < 0 or crop_top_percent > 100 or \ | |
crop_bottom_percent < 0 or crop_bottom_percent > 100: | |
print(f"Warning: Invalid crop percentages ({crop_top_percent}%, {crop_bottom_percent}%). Must be between 0 and 100. Skipping crop.") | |
return image | |
if crop_top_percent == 0 and crop_bottom_percent == 0: | |
return image # No cropping needed | |
if crop_top_percent + crop_bottom_percent >= 100: | |
print(f"Warning: Total crop percentage ({crop_top_percent + crop_bottom_percent}%) is 100% or more. Skipping crop.") | |
return image | |
try: | |
h, w = image.shape[:2] | |
pixels_to_crop_top = int(h * crop_top_percent / 100.0) | |
pixels_to_crop_bottom = int(h * crop_bottom_percent / 100.0) | |
start_row = pixels_to_crop_top | |
end_row = h - pixels_to_crop_bottom | |
# Ensure indices are valid after calculation | |
if start_row >= end_row or start_row < 0 or end_row > h: | |
print(f"Warning: Invalid calculated crop rows (start={start_row}, end={end_row} for height={h}). Skipping crop.") | |
return image | |
cropped_image = image[start_row:end_row, :] | |
# print(f"Debug: Cropped by percentage from {image.shape} to {cropped_image.shape}") | |
return cropped_image | |
except Exception as e: | |
print(f"Unexpected error during percentage cropping: {e}. Returning original image.") | |
traceback.print_exc() | |
return image | |
# --- Helper Function: Crop Black Borders --- | |
def crop_black_borders(image, enable_cropping=True, strict_no_black_edges=False): | |
""" | |
Crops black borders from an image. | |
Args: | |
image: The input image (NumPy array). | |
enable_cropping: If False, returns the original image. | |
strict_no_black_edges: If True, iteratively removes any remaining single black | |
pixel lines from the edges after the initial crop. | |
Returns: | |
The cropped image (NumPy array), or the original image if cropping is disabled. | |
Returns None if the input is invalid or strict cropping removes everything. | |
""" | |
if not enable_cropping: | |
return image | |
if image is None or image.size == 0: | |
return None | |
try: | |
# Check image channels before converting color | |
gray = None | |
mask_coords_found = False | |
coords = None | |
# Attempt grayscale conversion first (common case) | |
if len(image.shape) == 3 and image.shape[2] == 3: | |
try: | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
coords = cv2.findNonZero(gray) | |
if coords is not None: | |
mask_coords_found = True | |
except cv2.error as e_gray: | |
# print(f"Note: cvtColor to GRAY failed ({e_gray}), trying mask method.") | |
gray = None # Reset gray if conversion failed | |
elif len(image.shape) == 2: | |
gray = image # Already grayscale | |
coords = cv2.findNonZero(gray) | |
if coords is not None: | |
mask_coords_found = True | |
# Fallback or alternative: Use mask if grayscale failed or shape is unusual | |
if not mask_coords_found: | |
try: | |
# Create a mask where any channel/value is > 0 | |
mask = np.any(image > 0, axis=-1) if len(image.shape) == 3 else (image > 0) | |
coords = cv2.findNonZero(mask.astype(np.uint8)) | |
if coords is not None: | |
mask_coords_found = True | |
except Exception as e_crop_fallback: | |
# print(f"Could not create mask for cropping fallback: {e_crop_fallback}. Returning original.") | |
return image # Cannot proceed if mask fails too | |
if not mask_coords_found or coords is None: | |
# print("Debug: No non-black pixels found via any method, returning original.") | |
return image # Return original if all black or coords failed | |
x, y, w, h = cv2.boundingRect(coords) | |
if w <= 0 or h <= 0: | |
# print(f"Debug: Invalid bounding rect ({w}x{h}), returning original.") | |
return image | |
# Initial crop based on bounding rectangle | |
cropped_image = image[y:y+h, x:x+w] | |
# --- START: Strict Edge Cropping Logic --- | |
if strict_no_black_edges and cropped_image is not None and cropped_image.size > 0: | |
# Iteratively remove black edges until none remain or image is empty | |
initial_shape = cropped_image.shape | |
iterations = 0 | |
MAX_ITERATIONS = max(initial_shape) # Safety break | |
while iterations < MAX_ITERATIONS: | |
iterations += 1 | |
# Re-check size in loop | |
if cropped_image is None or cropped_image.size == 0: | |
# print("Debug: Strict cropping resulted in empty image.") | |
return None # Image got cropped away entirely | |
# Convert current crop to grayscale for edge checks | |
if len(cropped_image.shape) == 3: | |
if cropped_image.shape[2] == 1: # Handle case if somehow it becomes grayscale with 3 dims | |
gray_cropped = cropped_image[:, :, 0] | |
else: | |
try: | |
gray_cropped = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2GRAY) | |
except cv2.error: | |
# print("Warning: Failed to convert to gray during strict crop, stopping strict loop.") | |
break # Stop if conversion fails | |
elif len(cropped_image.shape) == 2: | |
gray_cropped = cropped_image # Already grayscale | |
else: | |
# print("Warning: Unexpected image dimensions during strict crop, stopping strict loop.") | |
break # Stop if shape is weird | |
# Check current edges | |
h_cr, w_cr = gray_cropped.shape[:2] | |
if h_cr <= 1 or w_cr <= 1: break # Cannot crop further if only 1 pixel wide/high | |
top_row = gray_cropped[0, :] | |
bottom_row = gray_cropped[-1, :] | |
left_col = gray_cropped[:, 0] | |
right_col = gray_cropped[:, -1] | |
top_has_black = np.any(top_row == 0) | |
bottom_has_black = np.any(bottom_row == 0) | |
left_has_black = np.any(left_col == 0) | |
right_has_black = np.any(right_col == 0) | |
# If no edges have black pixels, we are done | |
if not (top_has_black or bottom_has_black or left_has_black or right_has_black): | |
# print(f"Debug: Strict cropping finished after {iterations-1} adjustments.") | |
break # Exit the while loop | |
# Adjust cropping based on which edge(s) have black pixels | |
y_start_new, y_end_new = 0, h_cr | |
x_start_new, x_end_new = 0, w_cr | |
if top_has_black: y_start_new += 1 | |
if bottom_has_black: y_end_new -= 1 | |
if left_has_black: x_start_new += 1 | |
if right_has_black: x_end_new -= 1 | |
# Check if new bounds are valid before slicing | |
if y_start_new < y_end_new and x_start_new < x_end_new: | |
cropped_image = cropped_image[y_start_new:y_end_new, x_start_new:x_end_new] | |
else: | |
# print("Debug: Strict cropping bounds became invalid, stopping.") | |
cropped_image = None # Signal that cropping failed | |
break # Exit loop | |
if iterations >= MAX_ITERATIONS: | |
print("Warning: Strict cropping reached max iterations, potential issue.") | |
if cropped_image is not None and initial_shape != cropped_image.shape: | |
print(f"Info: Strict cropping adjusted size from {initial_shape} to {cropped_image.shape}") | |
# --- END: Strict Edge Cropping Logic --- | |
return cropped_image # Return the potentially strictly cropped image | |
except cv2.error as e: | |
print(f"OpenCV Error during black border cropping: {e}. Returning uncropped image.") | |
return image | |
except Exception as e: | |
print(f"Unexpected error during black border cropping: {e}. Returning uncropped image.") | |
traceback.print_exc() | |
return image | |
# --- Helper Function: Multi-Band Blending (Conceptual - Needs careful implementation) --- | |
def multi_band_blending(img1, img2, mask, num_levels=5): | |
# img1, img2: The two images to blend (float32, full canvas size) | |
# mask: The blending mask (float32, 0 to 1 transition, full canvas size, representing weight for img1) | |
# num_levels: Number of pyramid levels | |
log_message = "" # Add local logging if needed | |
# Ensure inputs are float32 (caller should ensure this, but double check) | |
if img1.dtype != np.float32: img1 = img1.astype(np.float32) | |
if img2.dtype != np.float32: img2 = img2.astype(np.float32) | |
if mask.dtype != np.float32: | |
log_message = log_and_print(f"Warning: Mask input to multi_band_blending was {mask.dtype}, converting to float32.\n", log_message) | |
if mask.max() > 1: # Assuming uint8 if max > 1 | |
mask = mask.astype(np.float32) / 255.0 | |
else: # Assuming already float but maybe not float32 | |
mask = mask.astype(np.float32) | |
# Ensure mask has same number of channels as images | |
if len(mask.shape) == 2 and len(img1.shape) == 3: | |
mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) | |
elif len(mask.shape) == 3 and mask.shape[2] == 1 and len(img1.shape) == 3: | |
mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) | |
elif mask.shape != img1.shape and mask.shape[:2] == img1.shape[:2]: | |
# If mask has 3 channels but img has 1 (unlikely but possible) | |
# Or other channel mismatches not covered above. Simpler to just ensure it matches. | |
log_message = log_and_print(f"Warning: Mask shape {mask.shape} mismatch with image shape {img1.shape}. Attempting replication.\n", log_message) | |
mask = cv2.cvtColor(cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR) # Force to 3 channel based on img1 | |
# 1. Build Gaussian pyramids for img1, img2 | |
gp1 = [img1] | |
gp2 = [img2] | |
# Temporary list to store pyrDown results to avoid modifying list during iteration | |
gp1_next = [] | |
gp2_next = [] | |
actual_levels = 0 | |
for i in range(num_levels): | |
prev_h, prev_w = gp1[-1].shape[:2] | |
if prev_h < 2 or prev_w < 2: | |
log_message = log_and_print(f"Warning: Stopping image pyramid build at level {i} due to small size ({prev_h}x{prev_w}).\n", log_message) | |
break # Stop building pyramids for images | |
try: | |
down1 = cv2.pyrDown(gp1[-1]) | |
down2 = cv2.pyrDown(gp2[-1]) | |
gp1_next.append(down1) | |
gp2_next.append(down2) | |
actual_levels += 1 # Increment count of successfully built levels | |
except cv2.error as e_pyrdown: | |
log_message = log_and_print(f"Error during pyrDown at level {i+1}: {e_pyrdown}. Stopping pyramid build.\n", log_message) | |
break # Stop if pyrDown fails | |
# Update the main lists after the loop | |
gp1.extend(gp1_next); del gp1_next | |
gp2.extend(gp2_next); del gp2_next | |
gc.collect() | |
# Adjust num_levels to the actual number built | |
num_levels = actual_levels | |
# If pyramid build failed completely or input was too small | |
if num_levels == 0: | |
log_message = log_and_print("Error: Cannot build any pyramid levels. Using simple weighted average.\n", log_message) | |
blended_img = img1 * mask + img2 * (1.0 - mask) | |
blended_img = np.clip(blended_img, 0, 255).astype(np.uint8) | |
# print(log_message) # Optional: print warnings | |
if 'gp1' in locals(): del gp1 | |
if 'gp2' in locals(): del gp2 | |
gc.collect() | |
return blended_img # Fallback | |
# 2. Build Laplacian pyramids for img1, img2 | |
# Smallest Gaussian level acts as base of Laplacian pyramid | |
lp1 = [gp1[num_levels]] | |
lp2 = [gp2[num_levels]] | |
for i in range(num_levels, 0, -1): | |
# Target size is the size of the *next larger* Gaussian level | |
target_size = (gp1[i-1].shape[1], gp1[i-1].shape[0]) | |
# log_message = log_and_print(f"Using resize instead of pyrUp for Laplacian level {i}\n", log_message) # Optional log | |
ge1 = cv2.resize(gp1[i], target_size, interpolation=cv2.INTER_LINEAR) | |
ge2 = cv2.resize(gp2[i], target_size, interpolation=cv2.INTER_LINEAR) | |
# Ensure dimensions match EXACTLY before subtraction | |
# Sometimes pyrUp result might be 1 pixel off from the actual gp[i-1] size | |
h_target, w_target = gp1[i-1].shape[:2] | |
h_ge, w_ge = ge1.shape[:2] | |
# Crop or pad ge1/ge2 to match gp1[i-1]/gp2[i-1] dimensions | |
if ge1.shape[:2] != (h_target, w_target): | |
#print(f"Level {i} pyrUp/resize shape mismatch: ge1={ge1.shape}, target={gp1[i-1].shape}. Adjusting ge1.") | |
ge1_adj = np.zeros_like(gp1[i-1], dtype=ge1.dtype) | |
copy_h = min(h_target, h_ge) | |
copy_w = min(w_target, w_ge) | |
ge1_adj[:copy_h, :copy_w] = ge1[:copy_h, :copy_w] | |
ge1 = ge1_adj | |
del ge1_adj | |
if ge2.shape[:2] != (h_target, w_target): | |
#print(f"Level {i} pyrUp/resize shape mismatch: ge2={ge2.shape}, target={gp2[i-1].shape}. Adjusting ge2.") | |
ge2_adj = np.zeros_like(gp2[i-1], dtype=ge2.dtype) | |
copy_h = min(h_target, ge2.shape[0]) # Use ge2.shape[0] here | |
copy_w = min(w_target, ge2.shape[1]) # Use ge2.shape[1] here | |
ge2_adj[:copy_h, :copy_w] = ge2[:copy_h, :copy_w] | |
ge2 = ge2_adj | |
del ge2_adj | |
# Calculate Laplacian: Higher resolution Gaussian - Expanded lower resolution Gaussian | |
laplacian1 = cv2.subtract(gp1[i-1], ge1) | |
laplacian2 = cv2.subtract(gp2[i-1], ge2) | |
lp1.append(laplacian1) | |
lp2.append(laplacian2) | |
del ge1, ge2, laplacian1, laplacian2 | |
gc.collect() | |
# del gp1, gp2 | |
# gc.collect() | |
# lp1/lp2 lists are now [SmallestGaussian, LapN, LapN-1, ..., Lap1] (N=num_levels) | |
lp1.reverse() # Reverse to [Lap1, ..., LapN, SmallestGaussian] | |
lp2.reverse() | |
# 3. Build Gaussian pyramid for the mask | |
gm = [mask] | |
gm_next = [] | |
actual_mask_levels = 0 | |
for i in range(num_levels): # Build mask pyramid only up to the actual image levels | |
prev_h, prev_w = gm[-1].shape[:2] | |
if prev_h < 2 or prev_w < 2: | |
log_message = log_and_print(f"Warning: Stopping mask pyramid build at level {i}.\n", log_message) | |
# num_levels should already be adjusted, but ensure mask levels don't exceed | |
break | |
try: | |
down_mask = cv2.pyrDown(gm[-1]) | |
gm_next.append(down_mask) | |
actual_mask_levels += 1 | |
except cv2.error as e_pyrdown_mask: | |
log_message = log_and_print(f"Error during mask pyrDown at level {i+1}: {e_pyrdown_mask}. Stopping mask pyramid build.\n", log_message) | |
break | |
gm.extend(gm_next); del gm_next | |
gc.collect() | |
# Ensure mask pyramid has the same number of levels as laplacian (+ base) | |
if len(gm) != num_levels + 1: | |
log_message = log_and_print(f"Error: Mask pyramid levels ({len(gm)}) does not match expected ({num_levels + 1}). Using simple average.\n", log_message) | |
# Fallback if mask pyramid construction failed unexpectedly | |
blended_img = img1 * mask + img2 * (1.0 - mask) | |
blended_img = np.clip(blended_img, 0, 255).astype(np.uint8) | |
if 'lp1' in locals(): del lp1 | |
if 'lp2' in locals(): del lp2 | |
if 'gm' in locals(): del gm | |
gc.collect() | |
return blended_img | |
# 4. Blend Laplacian levels | |
ls = [] # Blended Laplacian pyramid | |
for i in range(num_levels): # Blend Lap1 to LapN | |
lap1 = lp1[i] | |
lap2 = lp2[i] | |
mask_level = gm[i] # Use corresponding mask level (gm[0] for lp1[0]=Lap1, etc.) | |
# Ensure mask shape matches laplacian shape for this level | |
if mask_level.shape[:2] != lap1.shape[:2]: | |
# print(f"Level {i} mask/lap shape mismatch: mask={mask_level.shape}, lap={lap1.shape}. Resizing mask.") | |
mask_level = cv2.resize(mask_level, (lap1.shape[1], lap1.shape[0]), interpolation=cv2.INTER_LINEAR) | |
# Ensure channels match after resize | |
if len(mask_level.shape) == 2 and len(lap1.shape) == 3: | |
mask_level = cv2.cvtColor(mask_level, cv2.COLOR_GRAY2BGR) | |
elif len(mask_level.shape) == 3 and mask_level.shape[2] == 1 and len(lap1.shape) == 3: | |
mask_level = cv2.cvtColor(mask_level, cv2.COLOR_GRAY2BGR) | |
# Clip mask just in case resize interpolation goes slightly out of [0,1] | |
mask_level = np.clip(mask_level, 0.0, 1.0) | |
# Blend: L = L1*Gm + L2*(1-Gm) | |
blended_lap = lap1 * mask_level + lap2 * (1.0 - mask_level) | |
ls.append(blended_lap) | |
del lap1, lap2, mask_level, blended_lap | |
gc.collect() | |
# Blend the smallest Gaussian level (base of the pyramid) | |
base1 = lp1[num_levels] # Smallest Gaussian stored at the end of reversed lp1 | |
base2 = lp2[num_levels] | |
mask_base = gm[num_levels] # Use the smallest mask (corresponding to the smallest Gaussian level) | |
if mask_base.shape[:2] != base1.shape[:2]: | |
# print(f"Base level mask/base shape mismatch: mask={mask_base.shape}, base={base1.shape}. Resizing mask.") | |
mask_base = cv2.resize(mask_base, (base1.shape[1], base1.shape[0]), interpolation=cv2.INTER_LINEAR) | |
if len(mask_base.shape) == 2 and len(base1.shape) == 3: mask_base = cv2.cvtColor(mask_base, cv2.COLOR_GRAY2BGR) | |
elif len(mask_base.shape) == 3 and mask_base.shape[2]==1 and len(base1.shape) == 3: mask_base = cv2.cvtColor(mask_base, cv2.COLOR_GRAY2BGR) | |
mask_base = np.clip(mask_base, 0.0, 1.0) | |
# Blend the base Gaussian level: B = B1*Gm_N + B2*(1-Gm_N) | |
blended_base = base1 * mask_base + base2 * (1.0 - mask_base) | |
ls.append(blended_base) # ls is now [BlendedLap1, ..., BlendedLapN, BlendedBase] | |
# del lp1, lp2, gm, base1, base2, mask_base, blended_base | |
del base1, base2, mask_base, blended_base | |
gc.collect() | |
# 5. Reconstruct the final image from the blended Laplacian pyramid | |
# Start with the smallest blended base | |
blended_img = ls[num_levels] | |
for i in range(num_levels - 1, -1, -1): # Iterate from N-1 down to 0 | |
# Target size is the size of the *current* blended Laplacian level (ls[i]) | |
target_size = (ls[i].shape[1], ls[i].shape[0]) | |
# log_message = log_and_print(f"Using resize instead of pyrUp for reconstruction level {i}\n", log_message) # Optional log | |
expanded_prev = cv2.resize(blended_img, target_size, interpolation=cv2.INTER_LINEAR) | |
# Delete previous level's blended_img (important for memory) | |
del blended_img | |
gc.collect() | |
# Ensure dimensions match EXACTLY before adding | |
h_target_rec, w_target_rec = ls[i].shape[:2] | |
h_exp, w_exp = expanded_prev.shape[:2] | |
if expanded_prev.shape[:2] != (h_target_rec, w_target_rec): | |
# print(f"Reconstruction level {i} shape mismatch: expanded={expanded_prev.shape}, target={ls[i].shape}. Adjusting expanded.") | |
expanded_adj = np.zeros_like(ls[i], dtype=expanded_prev.dtype) | |
copy_h_rec = min(h_target_rec, h_exp) | |
copy_w_rec = min(w_target_rec, w_exp) | |
expanded_adj[:copy_h_rec, :copy_w_rec] = expanded_prev[:copy_h_rec, :copy_w_rec] | |
expanded_prev = expanded_adj | |
del expanded_adj | |
# Add the blended Laplacian for the current level | |
current_laplacian = ls[i] # Get reference before add | |
blended_img = cv2.add(expanded_prev, current_laplacian) | |
del expanded_prev, current_laplacian # Remove laplacian reference ls[i] | |
ls[i] = None # Explicitly break the reference in the list too? Might help GC. | |
gc.collect() | |
# Clip final result and convert back to uint8 | |
blended_img = np.clip(blended_img, 0, 255) | |
blended_img = blended_img.astype(np.uint8) | |
# Optional: print warnings collected during the process | |
# if log_message: print("MultiBand Blend Logs:\n" + log_message) | |
# Cleanup intermediate pyramids (important for memory) | |
del gp1, gp2, lp1, lp2, gm, ls | |
if 'laplacian1' in locals(): del laplacian1 | |
if 'laplacian2' in locals(): del laplacian2 | |
if 'ge1' in locals(): del ge1 | |
if 'ge2' in locals(): del ge2 | |
if 'mask_level' in locals(): del mask_level | |
if 'base1' in locals(): del base1 | |
if 'base2' in locals(): del base2 | |
if 'mask_base' in locals(): del mask_base | |
if 'blended_lap' in locals(): del blended_lap | |
if 'blended_base' in locals(): del blended_base | |
if 'expanded_prev' in locals(): del expanded_prev | |
gc.collect() | |
return blended_img | |
# --- Stitching Function: Focus on the pairwise images --- | |
def stitch_pairwise_images(img_composite, img_new, | |
transform_model_str="Homography", | |
blend_method="multi-band", | |
enable_gain_compensation=True, | |
orb_nfeatures=2000, | |
match_ratio_thresh=0.75, | |
ransac_reproj_thresh=5.0, | |
max_distance_coeff=0.5, | |
max_blending_width=10000, | |
max_blending_height=10000, | |
blend_smooth_ksize=15, | |
num_blend_levels=4 | |
): | |
""" | |
Stitches a new image (img_new) onto an existing composite image (img_composite) | |
using an explicit, step-by-step pipeline (e.g., ORB features). | |
Allows choosing the geometric transformation model. | |
Returns the new composite. | |
""" | |
log_message = log_and_print("--- Starting pairwise stitch between composite and new image ---\n", "") | |
start_time_pairwise = time.time() | |
# --- Input Validation --- | |
if img_composite is None or img_new is None: | |
log_message = log_and_print("Error: One or both input images are None for the pairwise stitching step.\n", log_message) | |
return None, log_message | |
if img_composite.size == 0 or img_new.size == 0: | |
log_message = log_and_print("Error: One or both input images are empty for the pairwise stitching step.\n", log_message) | |
return None, log_message | |
h1, w1 = img_composite.shape[:2] | |
h2, w2 = img_new.shape[:2] | |
log_message = log_and_print(f"Pairwise Stitch: Img1({w1}x{h1}), Img2({w2}x{h2})\n", log_message) | |
log_message = log_and_print(f"Params: Transform={transform_model_str}, ORB Feats={orb_nfeatures}, Ratio Thresh={match_ratio_thresh}\n", log_message) | |
log_message = log_and_print(f"Params Cont'd: RANSAC Thresh={ransac_reproj_thresh}, Max Distance Coeff={max_distance_coeff}\n", log_message) | |
log_message = log_and_print(f"Blending: Method={blend_method}, GainComp={enable_gain_compensation}, SmoothKSize={blend_smooth_ksize}, MB Levels={num_blend_levels}\n", log_message) | |
final_output_img = None # Initialize result variable | |
# Initialize other variables to None for better cleanup management | |
img1_u8, img2_u8 = None, None | |
kp1, des1, kp2, des2 = None, None, None, None | |
all_matches, good_matches = None, None | |
src_pts, dst_pts = None, None | |
H_matrix_3x3_for_canvas = None # Will hold the 3x3 matrix for canvas calculation (Affine or Homography) | |
final_warp_M = None # Will hold the actual 2x3 or 3x3 matrix for warping | |
mask_trans = None # Mask from estimation function (homography or affine) | |
pts1, dst_pts1_transformed = None, None | |
pts2, all_pts = None, None | |
output_img = None | |
warped_img1_u8 = None | |
mask_warped, mask_img2, overlap_mask = None, None, None | |
gain_applied_warped_img1_u8 = None | |
output_img_before_mb_float, blend_mask_float = None, None | |
img1_for_blend, img2_for_blend = None, None | |
is_affine = False # Flag to determine warp function | |
try: | |
# --- Feature Detection and Matching --- | |
img1_u8 = img_composite.clip(0, 255).astype(np.uint8) if img_composite.dtype != np.uint8 else img_composite | |
img2_u8 = img_new.clip(0, 255).astype(np.uint8) if img_new.dtype != np.uint8 else img_new | |
orb = cv2.ORB_create(nfeatures=orb_nfeatures) | |
kp1, des1 = orb.detectAndCompute(img1_u8, None) # keypoints and descriptors | |
kp2, des2 = orb.detectAndCompute(img2_u8, None) | |
if des1 is None or des2 is None or len(kp1) < 2 or len(kp2) < 2: | |
log_message = log_and_print("Error: Not enough keypoints or descriptors found.\n", log_message) | |
if 'kp1' in locals(): del kp1 | |
if 'des1' in locals(): del des1 | |
if 'kp2' in locals(): del kp2 | |
if 'des2' in locals(): del des2 | |
del img1_u8, img2_u8 | |
gc.collect() | |
return None, log_message | |
log_message = log_and_print(f"Found {len(kp1)} keypoints in Img1, {len(kp2)} in Img2.\n", log_message) | |
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) | |
# Check if descriptors are suitable for knnMatch (should be if ORB) | |
if des1.dtype != np.uint8: des1 = des1.astype(np.uint8) | |
if des2.dtype != np.uint8: des2 = des2.astype(np.uint8) | |
all_matches = bf.knnMatch(des1, des2, k=2) | |
del des1, des2; des1, des2 = None, None # Explicit delete | |
gc.collect() | |
good_matches = [] | |
if all_matches is not None: | |
MAX_DISTANCE = max_distance_coeff * np.sqrt(w1**2 + h1**2) | |
# Filter out potential empty match pairs | |
valid_matches = [pair for pair in all_matches if isinstance(pair, (list, tuple)) and len(pair) == 2] | |
for m, n in valid_matches: | |
if m.distance < match_ratio_thresh * n.distance: | |
src_pt = np.array(kp1[m.queryIdx].pt) | |
dst_pt = np.array(kp2[m.trainIdx].pt) | |
distance = np.linalg.norm(dst_pt - src_pt) | |
if distance < MAX_DISTANCE: | |
good_matches.append(m) | |
del valid_matches | |
del all_matches; all_matches = None | |
gc.collect() | |
log_message = log_and_print(f"Found {len(good_matches)} good matches after ratio test.\n", log_message) | |
MIN_MATCH_COUNT = 10 # Keep a minimum threshold | |
# --- Transformation Estimation (Homography or Affine) --- | |
if len(good_matches) >= MIN_MATCH_COUNT: | |
src_pts = np.float32([ kp1[m.queryIdx].pt for m in good_matches ]).reshape(-1,1,2) | |
dst_pts = np.float32([ kp2[m.trainIdx].pt for m in good_matches ]).reshape(-1,1,2) | |
del kp1, kp2, good_matches; kp1, kp2, good_matches = None, None, None # Explicit delete | |
gc.collect() | |
estimation_failed = False | |
# Try Affine if selected | |
if transform_model_str == "Affine_Partial" or transform_model_str == "Affine_Full": | |
is_affine = True # Assume success initially | |
affine_matrix_2x3 = None | |
mask_a = None | |
try: | |
if transform_model_str == "Affine_Partial": | |
log_message = log_and_print(f"Attempting Affine Partial Estimation (RANSAC Thresh={ransac_reproj_thresh})...\n", log_message) | |
affine_matrix_2x3, mask_a = cv2.estimateAffinePartial2D(src_pts, dst_pts, method=cv2.RANSAC, ransacReprojThreshold=ransac_reproj_thresh) | |
else: # Affine_Full | |
log_message = log_and_print(f"Attempting Affine Full Estimation (RANSAC Thresh={ransac_reproj_thresh})...\n", log_message) | |
affine_matrix_2x3, mask_a = cv2.estimateAffine2D(src_pts, dst_pts, method=cv2.RANSAC, ransacReprojThreshold=ransac_reproj_thresh) | |
if affine_matrix_2x3 is None: | |
raise ValueError(f"{transform_model_str} estimation returned None") | |
# Convert 2x3 affine to 3x3 for canvas calculation consistency | |
H_matrix_3x3_for_canvas = np.vstack([affine_matrix_2x3, [0, 0, 1]]).astype(np.float64) | |
final_warp_M = affine_matrix_2x3.astype(np.float64) # Keep 2x3 for warpAffine | |
mask_trans = mask_a # Store the mask | |
except Exception as e_affine: | |
log_message = log_and_print(f"Error during {transform_model_str} estimation: {e_affine}. Falling back to Homography.\n", log_message) | |
is_affine = False # Reset flag, will proceed to Homography block below | |
estimation_failed = True # Mark that the chosen affine failed | |
# Clean up affine specific vars if they exist | |
if 'affine_matrix_2x3' in locals(): del affine_matrix_2x3 | |
if 'mask_a' in locals(): del mask_a | |
H_matrix_3x3_for_canvas = None | |
final_warp_M = None | |
mask_trans = None | |
# NOTE: We are choosing to fall back instead of returning None immediately. | |
# If you prefer to fail hard if the selected affine fails, uncomment the next line: | |
# return None, log_message | |
# Try Homography if selected OR if Affine failed and we are falling back | |
if not is_affine or estimation_failed: # If Homography was chosen or Affine failed | |
if estimation_failed: # Log if we are falling back | |
log_message = log_and_print("Falling back to Homography estimation...\n", log_message) | |
else: # Log if Homography was the original choice | |
log_message = log_and_print("Attempting Homography Estimation...\n", log_message) | |
is_affine = False # Ensure flag is False for Homography path | |
H_matrix_homog = None | |
mask_h = None | |
try: | |
log_message = log_and_print(f"Estimating Homography (RANSAC Thresh={ransac_reproj_thresh})...\n", log_message) | |
H_matrix_homog, mask_h = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=ransac_reproj_thresh) | |
if H_matrix_homog is None: | |
raise ValueError("Homography estimation returned None") | |
H_matrix_3x3_for_canvas = H_matrix_homog.astype(np.float64) # Use this for canvas calc | |
final_warp_M = H_matrix_homog.astype(np.float64) # Use 3x3 for warpPerspective | |
mask_trans = mask_h # Store the mask | |
except Exception as e_homog: | |
log_message = log_and_print(f"Error during Homography estimation: {e_homog}\n", log_message) | |
# Clean up if Homography itself fails | |
if 'H_matrix_homog' in locals(): del H_matrix_homog | |
if 'mask_h' in locals(): del mask_h | |
del src_pts, dst_pts | |
gc.collect() | |
return None, log_message # Fail if Homography (chosen or fallback) fails | |
# --- Log Inliers from the successful estimation --- | |
model_name = "Affine" if is_affine else "Homography" | |
if mask_trans is not None: | |
inlier_count = np.sum(mask_trans) | |
log_message = log_and_print(f"{model_name} estimated with {inlier_count} inliers.\n", log_message) | |
if inlier_count < MIN_MATCH_COUNT: | |
log_message = log_and_print(f"Warning: Inlier count ({inlier_count}) < MIN_MATCH_COUNT for {model_name}. Result might be poor.\n", log_message) | |
del mask_trans; mask_trans = None # Delete the mask now | |
gc.collect() | |
else: | |
log_message = log_and_print(f"Warning: {model_name} mask was None.\n", log_message) | |
# --- Cleanup source/destination points --- | |
del src_pts, dst_pts; src_pts, dst_pts = None, None | |
gc.collect() | |
# --- Canvas Calculation and Warping --- | |
pts1 = np.float32([[0,0],[0,h1-1],[w1-1,h1-1],[w1-1,0]]).reshape(-1,1,2) | |
try: | |
# Use the 3x3 matrix (derived from affine or directly from homography) for perspectiveTransform | |
# Ensure it's float64 | |
if H_matrix_3x3_for_canvas.dtype != np.float64: H_matrix_3x3_for_canvas = H_matrix_3x3_for_canvas.astype(np.float64) | |
dst_pts1_transformed = cv2.perspectiveTransform(pts1, H_matrix_3x3_for_canvas) | |
if dst_pts1_transformed is None: raise ValueError("perspectiveTransform returned None") | |
except Exception as e_tf: | |
model_name_tf = "Affine-derived" if is_affine else "Homography" | |
log_message = log_and_print(f"Error during perspectiveTransform (using {model_name_tf} 3x3 matrix): {e_tf}\n", log_message) | |
# Clean up before returning | |
del pts1 | |
if 'H_matrix_3x3_for_canvas' in locals(): del H_matrix_3x3_for_canvas | |
if 'final_warp_M' in locals(): del final_warp_M # Was holding the warp matrix | |
gc.collect() | |
return None, log_message | |
del pts1; pts1 = None | |
pts2 = np.float32([[0,0],[0,h2-1],[w2-1,h2-1],[w2-1,0]]).reshape(-1,1,2) | |
# Ensure dst_pts1_transformed is float32 for concatenation if needed | |
all_pts = np.concatenate((pts2, dst_pts1_transformed.astype(np.float32)), axis=0) | |
del pts2, dst_pts1_transformed; pts2, dst_pts1_transformed = None, None | |
padding = 2 | |
x_min, y_min = np.int32(all_pts.min(axis=0).ravel() - padding) | |
x_max, y_max = np.int32(all_pts.max(axis=0).ravel() + padding) | |
del all_pts; all_pts = None | |
gc.collect() | |
translation_dist = [-x_min, -y_min] | |
H_translation = np.array([[1, 0, translation_dist[0]], [0, 1, translation_dist[1]], [0,0,1]], dtype=np.float64) | |
output_width = x_max - x_min | |
output_height = y_max - y_min | |
if output_width <= 0 or output_height <= 0 or output_width > max_blending_width or output_height > max_blending_height: | |
log_message = log_and_print(f"Error: Invalid output dimensions ({output_width}x{output_height}). Max allowed ({max_blending_width}x{max_blending_height})\n", log_message) | |
# Clean up before returning | |
if 'H_matrix_3x3_for_canvas' in locals(): del H_matrix_3x3_for_canvas | |
if 'final_warp_M' in locals(): del final_warp_M | |
if 'H_translation' in locals(): del H_translation | |
gc.collect() | |
return None, log_message | |
log_message = log_and_print(f"Calculated canvas size: {output_width}x{output_height}\n", log_message) | |
# --- Memory Check for Blending --- | |
canvas_pixels = output_width * output_height | |
# Define a threshold based on available memory, e.g., 250 million pixels | |
# 15000*15000 = 225M, 30000*15000 = 450M | |
pixel_threshold = 225_000_000 | |
effective_blend_method = blend_method | |
if blend_method == "multi-band" and canvas_pixels > pixel_threshold: | |
log_message = log_and_print(f"Warning: Canvas size ({output_width}x{output_height}, {canvas_pixels/1e6:.1f}M pixels) exceeds threshold ({pixel_threshold/1e6:.1f}M pixels) for multi-band blending.\n", log_message) | |
log_message = log_and_print("Switching to 'Linear' blending for this step to conserve memory.\n", log_message) | |
effective_blend_method = "linear" | |
# Create output canvas | |
output_img = np.zeros((output_height, output_width, 3), dtype=np.uint8) | |
# --- Calculate final transformation matrix for warping --- | |
# This incorporates the translation onto the canvas | |
final_warp_matrix_translated = None | |
if is_affine: | |
# We need the 2x3 matrix: (H_translation @ H_affine_3x3)[:2,:] | |
final_warp_matrix_translated = (H_translation @ H_matrix_3x3_for_canvas)[:2, :] | |
else: | |
# We need the 3x3 matrix: H_translation @ H_homography_3x3 | |
final_warp_matrix_translated = H_translation @ H_matrix_3x3_for_canvas # H_matrix_3x3 holds the homography here | |
# --- Warp img1 onto the canvas --- | |
try: | |
if is_affine: | |
log_message = log_and_print("Warping image 1 using warpAffine...\n", log_message) | |
warped_img1_u8 = cv2.warpAffine(img1_u8, final_warp_matrix_translated, (output_width, output_height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0,0,0)) | |
else: | |
log_message = log_and_print("Warping image 1 using warpPerspective...\n", log_message) | |
warped_img1_u8 = cv2.warpPerspective(img1_u8, final_warp_matrix_translated, (output_width, output_height), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0,0,0)) | |
except cv2.error as e_warp: | |
warp_type = 'Affine' if is_affine else 'Perspective' | |
log_message = log_and_print(f"Error during warping ({warp_type}): {e_warp}\n", log_message) | |
# Clean up before returning | |
if 'H_matrix_3x3_for_canvas' in locals(): del H_matrix_3x3_for_canvas | |
# final_warp_M was the matrix before translation | |
if 'final_warp_matrix_translated' in locals(): del final_warp_matrix_translated | |
if 'H_translation' in locals(): del H_translation | |
if 'img1_u8' in locals(): del img1_u8 | |
if 'output_img' in locals(): del output_img | |
gc.collect() | |
return None, log_message | |
# --- Clean up matrices and source image --- | |
del H_matrix_3x3_for_canvas, H_translation, final_warp_matrix_translated, img1_u8 | |
# Note: final_warp_M (the untranslated matrix) is no longer needed | |
if 'final_warp_M' in locals(): del final_warp_M | |
gc.collect() | |
# Place img2 onto the canvas | |
y_start, x_start = translation_dist[1], translation_dist[0] | |
y_end, x_end = y_start + h2, x_start + w2 | |
# Define slicing for img2 read and canvas write, handling out-of-bounds placement | |
img2_y_start, img2_x_start = 0, 0 | |
img2_y_end, img2_x_end = h2, w2 | |
canvas_y_start, canvas_x_start = y_start, x_start | |
canvas_y_end, canvas_x_end = y_end, x_end | |
# Clip coordinates | |
if canvas_y_start < 0: img2_y_start = -canvas_y_start; canvas_y_start = 0 | |
if canvas_x_start < 0: img2_x_start = -canvas_x_start; canvas_x_start = 0 | |
if canvas_y_end > output_height: img2_y_end = h2 - (canvas_y_end - output_height); canvas_y_end = output_height | |
if canvas_x_end > output_width: img2_x_end = w2 - (canvas_x_end - output_width); canvas_x_end = output_width | |
# Check if the calculated slices are valid | |
slice_h_canvas = canvas_y_end - canvas_y_start | |
slice_w_canvas = canvas_x_end - canvas_x_start | |
slice_h_img2 = img2_y_end - img2_y_start | |
slice_w_img2 = img2_x_end - img2_x_start | |
mask_img2 = np.zeros(output_img.shape[:2], dtype=np.uint8) # Mask for img2 placement | |
img2_part = None | |
if slice_h_canvas > 0 and slice_w_canvas > 0 and slice_h_canvas == slice_h_img2 and slice_w_canvas == slice_w_img2: | |
img2_part = img2_u8[img2_y_start:img2_y_end, img2_x_start:img2_x_end] | |
output_img[canvas_y_start:canvas_y_end, canvas_x_start:canvas_x_end] = img2_part | |
mask_img2[canvas_y_start:canvas_y_end, canvas_x_start:canvas_x_end] = 255 | |
# Don't delete img2_part yet if needed for blend restoration | |
else: | |
log_message = log_and_print("Warning: Could not place img2 correctly onto the canvas.\n", log_message) | |
del img2_u8; img2_u8 = None # Input img2 no longer needed | |
gc.collect() | |
# --- Create Masks for Blending --- | |
# Create mask for the warped image 1 (non-black pixels) | |
gray_warped = None | |
if warped_img1_u8 is not None: | |
gray_warped = cv2.cvtColor(warped_img1_u8, cv2.COLOR_BGR2GRAY) | |
# Ensure mask_warped is uint8 0 or 255 | |
# Check if gray_warped itself might be multi-channel if warp failed oddly? Should not happen. | |
if gray_warped is not None: | |
if len(gray_warped.shape) == 3: gray_warped = gray_warped[:,:,0] # Take one channel if needed | |
mask_warped = ((gray_warped > 0).astype(np.uint8)) * 255 | |
del gray_warped; gray_warped = None | |
gc.collect() | |
else: | |
mask_warped = np.zeros(output_img.shape[:2], dtype=np.uint8) # Empty mask if warp failed | |
# Find overlapping region mask (uint8 0 or 255) | |
overlap_mask = cv2.bitwise_and(mask_warped, mask_img2) | |
has_overlap = np.sum(overlap_mask > 0) > 0 # Check if any pixel > 0 | |
log_message = log_and_print(f"Overlap detected: {has_overlap}\n", log_message) | |
# --- Gain Compensation --- | |
gain = 1.0 | |
gain_applied_warped_img1_u8 = warped_img1_u8 # Initialize with original warped image | |
if enable_gain_compensation and has_overlap and warped_img1_u8 is not None: # Need warped image for gain comp | |
log_message = log_and_print("Gain Compensation Enabled. Calculating gain...\n", log_message) | |
try: | |
# --- Gain Calculation --- | |
gray_warped_for_gain = cv2.cvtColor(warped_img1_u8, cv2.COLOR_BGR2GRAY) | |
img2_gray = np.zeros_like(gray_warped_for_gain) | |
if slice_h_canvas > 0 and slice_w_canvas > 0: | |
if 0 <= canvas_y_start < canvas_y_end <= output_height and \ | |
0 <= canvas_x_start < canvas_x_end <= output_width: | |
# Ensure output_img part is valid before cvtColor | |
img_to_convert = output_img[canvas_y_start:canvas_y_end, canvas_x_start:canvas_x_end] | |
if img_to_convert.size > 0: | |
img2_part_gray = cv2.cvtColor(img_to_convert, cv2.COLOR_BGR2GRAY) | |
img2_gray[canvas_y_start:canvas_y_end, canvas_x_start:canvas_x_end] = img2_part_gray | |
del img2_part_gray | |
else: log_message = log_and_print("Warning: Empty slice for gain calculation img2_gray.\n", log_message) | |
else: log_message = log_and_print("Warning: Invalid slice indices for gain calculation img2_gray.\n", log_message) | |
overlap_mask_gain = overlap_mask # Use the already computed overlap mask | |
# Ensure masks are single channel before bitwise_and | |
if len(overlap_mask_gain.shape) == 3: overlap_mask_gain = overlap_mask_gain[:,:,0] | |
if len(gray_warped_for_gain.shape) == 3: gray_warped_for_gain = gray_warped_for_gain[:,:,0] | |
if len(img2_gray.shape) == 3: img2_gray = img2_gray[:,:,0] | |
gray_warped_roi = cv2.bitwise_and(gray_warped_for_gain, gray_warped_for_gain, mask=overlap_mask_gain) | |
img2_roi = cv2.bitwise_and(img2_gray, img2_gray, mask=overlap_mask_gain) | |
del gray_warped_for_gain, img2_gray | |
overlap_pixel_count = np.sum(overlap_mask_gain > 0) | |
if overlap_pixel_count > 0: | |
# Ensure ROIs are valid before calculating sum | |
mean1 = np.sum(gray_warped_roi[overlap_mask_gain > 0]) / overlap_pixel_count if gray_warped_roi is not None else 0 | |
mean2 = np.sum(img2_roi[overlap_mask_gain > 0]) / overlap_pixel_count if img2_roi is not None else 0 | |
if mean1 > 1e-5 and mean2 > 1e-5: | |
gain = mean2 / mean1 | |
log_message = log_and_print(f"Calculated Gain: {gain:.2f}\n", log_message) | |
gain = np.clip(gain, 0.5, 2.0) # Clamp gain | |
log_message = log_and_print(f"Clamped Gain: {gain:.2f}\n", log_message) | |
else: | |
gain = 1.0 | |
log_message = log_and_print("Gain compensation skipped (means close to zero or invalid ROI).\n", log_message) | |
else: | |
gain = 1.0 | |
log_message = log_and_print("Gain compensation skipped (no overlap pixels).\n", log_message) | |
del gray_warped_roi, img2_roi | |
gc.collect() | |
# --- End Gain Calculation --- | |
# Apply gain ONLY if calculated and different from 1.0 | |
if abs(gain - 1.0) > 1e-5: # Check float difference | |
gain_applied_float = warped_img1_u8.astype(np.float32) * gain | |
# *** Create new array for gain applied result *** | |
temp_gain_applied = gain_applied_float.clip(0, 255).astype(np.uint8) | |
# If gain_applied_warped_img1_u8 wasn't the original, delete it before reassigning | |
if gain_applied_warped_img1_u8 is not warped_img1_u8: | |
del gain_applied_warped_img1_u8 | |
gain_applied_warped_img1_u8 = temp_gain_applied # Assign the new gain-applied image | |
del gain_applied_float, temp_gain_applied | |
gc.collect() | |
log_message = log_and_print(f"Gain applied to warped image.\n", log_message) | |
else: | |
log_message = log_and_print("Gain is ~1.0, no gain applied.\n", log_message) | |
except Exception as e_gain_calc: | |
gain = 1.0 | |
log_message = log_and_print(f"Warning: Error during gain calculation ({e_gain_calc}). Setting gain=1.0.\n", log_message) | |
# Ensure gain_applied remains the original warped image on error | |
if gain_applied_warped_img1_u8 is not warped_img1_u8: | |
del gain_applied_warped_img1_u8 # Delete potentially modified one | |
gc.collect() | |
gain_applied_warped_img1_u8 = warped_img1_u8 # Reset to original | |
# Clean up potential partial variables | |
if 'gray_warped_for_gain' in locals(): del gray_warped_for_gain | |
if 'img2_gray' in locals(): del img2_gray | |
if 'gray_warped_roi' in locals(): del gray_warped_roi | |
if 'img2_roi' in locals(): del img2_roi | |
if 'gain_applied_float' in locals(): del gain_applied_float | |
gc.collect() | |
elif warped_img1_u8 is None: | |
log_message = log_and_print("Skipping Gain Compensation as warped image is None.\n", log_message) | |
# Ensure gain_applied_warped_img1_u8 holds the image to be used for blending | |
# (either original warped or gain-compensated version) | |
# --- Blending Choice --- | |
# Blend using the potentially gain-compensated image: gain_applied_warped_img1_u8 | |
if effective_blend_method == "multi-band" and has_overlap and gain_applied_warped_img1_u8 is not None: | |
log_message = log_and_print(f"Applying Multi-band blending (Levels={num_blend_levels})...\n", log_message) | |
try: | |
# --- Generate Blend Mask using Distance Transform --- | |
log_message = log_and_print("Generating multi-band mask using distance transform...\n", log_message) | |
# Ensure masks are single channel uint8 for distanceTransform | |
mask_warped_gray_mb = cv2.cvtColor(mask_warped, cv2.COLOR_BGR2GRAY) if len(mask_warped.shape) == 3 else mask_warped.copy() | |
mask_img2_gray_mb = cv2.cvtColor(mask_img2, cv2.COLOR_BGR2GRAY) if len(mask_img2.shape) == 3 else mask_img2.copy() | |
overlap_mask_gray_mb = cv2.cvtColor(overlap_mask, cv2.COLOR_BGR2GRAY) if len(overlap_mask.shape) == 3 else overlap_mask.copy() | |
if mask_warped_gray_mb.dtype != np.uint8: mask_warped_gray_mb = (mask_warped_gray_mb > 0).astype(np.uint8) * 255 | |
if mask_img2_gray_mb.dtype != np.uint8: mask_img2_gray_mb = (mask_img2_gray_mb > 0).astype(np.uint8) * 255 | |
if overlap_mask_gray_mb.dtype != np.uint8: overlap_mask_gray_mb = (overlap_mask_gray_mb > 0).astype(np.uint8) * 255 | |
# Calculate distance transforms | |
# Distance to the nearest zero pixel (i.e., distance from the background) | |
dist1 = cv2.distanceTransform(mask_warped_gray_mb, cv2.DIST_L2, 5) | |
dist2 = cv2.distanceTransform(mask_img2_gray_mb, cv2.DIST_L2, 5) | |
# Create float32 weight mask | |
weight1_norm = np.zeros(output_img.shape[:2], dtype=np.float32) | |
# Identify non-overlapping regions (ensure using single channel masks) | |
non_overlap_mask1 = cv2.bitwise_and(mask_warped_gray_mb, cv2.bitwise_not(overlap_mask_gray_mb)) | |
non_overlap_mask2 = cv2.bitwise_and(mask_img2_gray_mb, cv2.bitwise_not(overlap_mask_gray_mb)) | |
# Assign weights: 1.0 where only img1 exists, 0.0 where only img2 exists | |
weight1_norm[non_overlap_mask1 > 0] = 1.0 | |
weight1_norm[non_overlap_mask2 > 0] = 0.0 # Implicitly 0 initially, but good to be explicit | |
# Calculate weights in the overlap region based on relative distance | |
# Weight for img1 = dist1 / (dist1 + dist2) | |
overlap_indices = np.where(overlap_mask_gray_mb > 0) | |
num_overlap_pixels = len(overlap_indices[0]) | |
if num_overlap_pixels > 0: | |
d1_overlap = dist1[overlap_indices] | |
d2_overlap = dist2[overlap_indices] | |
total_dist = d1_overlap + d2_overlap | |
# Avoid division by zero where total_dist is very small (deep inside both masks) | |
# If total_dist is near zero, assign weight based on which original mask was stronger? | |
# Using dist1 / (total_dist + epsilon) is simpler and generally works. | |
weights_overlap = d1_overlap / (total_dist + 1e-7) # Epsilon for stability | |
weight1_norm[overlap_indices] = np.clip(weights_overlap, 0.0, 1.0) | |
log_message = log_and_print(f"Calculated distance transform weights for {num_overlap_pixels} overlap pixels.\n", log_message) | |
else: | |
log_message = log_and_print("Warning: No overlap pixels found for distance transform weight calculation.\n", log_message) | |
# Create boolean masks for later restoration steps | |
mask_warped_binary = (mask_warped_gray_mb > 0) | |
mask_img2_binary = (mask_img2_gray_mb > 0) | |
overlap_mask_binary = (overlap_mask_gray_mb > 0) | |
# Clean up intermediate arrays from distance transform step | |
del mask_warped_gray_mb, mask_img2_gray_mb, overlap_mask_gray_mb, dist1, dist2 | |
del non_overlap_mask1, non_overlap_mask2 | |
del overlap_indices | |
if 'd1_overlap' in locals(): del d1_overlap | |
if 'd2_overlap' in locals(): del d2_overlap | |
if 'total_dist' in locals(): del total_dist | |
if 'weights_overlap' in locals(): del weights_overlap | |
gc.collect() | |
# --- Apply Smoothing based on blend_smooth_ksize --- | |
blend_mask_float = weight1_norm # Start with the precise distance-based mask | |
if blend_smooth_ksize > 0 and blend_smooth_ksize % 2 == 1: | |
log_message = log_and_print(f"Smoothing multi-band blend mask with GaussianBlur ksize=({blend_smooth_ksize},{blend_smooth_ksize})...\n", log_message) | |
try: | |
# Need the boolean masks calculated above | |
# Strict non-overlap areas (boolean arrays) | |
strict_non_overlap_mask1 = np.logical_and(mask_warped_binary, np.logical_not(overlap_mask_binary)) | |
strict_non_overlap_mask2 = np.logical_and(mask_img2_binary, np.logical_not(overlap_mask_binary)) | |
# Blur the original distance-based mask | |
weight1_norm_blurred = cv2.GaussianBlur(weight1_norm, (blend_smooth_ksize, blend_smooth_ksize), 0) | |
# Clip the blurred mask to [0, 1] | |
blend_mask_float_blurred = np.clip(weight1_norm_blurred, 0.0, 1.0) | |
# Assign the potentially blurred values first | |
blend_mask_float = blend_mask_float_blurred | |
# Force 1.0 where only img1 should be | |
blend_mask_float[strict_non_overlap_mask1] = 1.0 | |
# Force 0.0 where only img2 should be | |
blend_mask_float[strict_non_overlap_mask2] = 0.0 | |
log_message = log_and_print("Multi-band mask smoothed and edges restored.\n", log_message) | |
except cv2.error as e_blur: | |
log_message = log_and_print(f"Warning: GaussianBlur failed for multi-band mask ({e_blur}). Using original distance-based mask.\n", log_message) | |
blend_mask_float = weight1_norm # Fallback to non-blurred | |
except Exception as e_blur_other: | |
log_message = log_and_print(f"Warning: Error during multi-band mask blur/restore ({e_blur_other}). Using original distance-based mask.\n", log_message) | |
blend_mask_float = weight1_norm # Fallback | |
finally: | |
# Clean up intermediate variables created in this block | |
if 'strict_non_overlap_mask1' in locals(): del strict_non_overlap_mask1 | |
if 'strict_non_overlap_mask2' in locals(): del strict_non_overlap_mask2 | |
if 'weight1_norm_blurred' in locals(): del weight1_norm_blurred | |
if 'blend_mask_float_blurred' in locals(): del blend_mask_float_blurred | |
gc.collect() | |
else: | |
log_message = log_and_print("Skipping multi-band mask smoothing (ksize not positive odd integer).\n", log_message) | |
# blend_mask_float is already weight1_norm (the precise one) | |
# --- End Smoothing --- | |
# --- Prepare for Blending --- | |
img1_for_blend = gain_applied_warped_img1_u8.astype(np.float32) | |
# Store the state of output_img BEFORE multi-band blending | |
output_img_before_mb_float = output_img.astype(np.float32) | |
img2_for_blend = output_img_before_mb_float # Use the float version | |
# --- Call Multi-Band Blending --- | |
blended_result_uint8 = multi_band_blending( | |
img1_for_blend, | |
img2_for_blend, | |
blend_mask_float, # The prepared mask | |
num_levels=num_blend_levels | |
) | |
# --- Restore Non-Overlap Regions --- | |
log_message = log_and_print("Restoring non-overlap regions after multi-band blending...\n", log_message) | |
# Re-identify strict non-overlap boolean masks (using the ones calculated earlier) | |
strict_non_overlap_mask1 = np.logical_and(mask_warped_binary, np.logical_not(overlap_mask_binary)) | |
strict_non_overlap_mask2 = np.logical_and(mask_img2_binary, np.logical_not(overlap_mask_binary)) | |
# Convert blended result to float for modification | |
output_img_float = blended_result_uint8.astype(np.float32) | |
# Copy original pixels back into the non-overlap regions | |
# For img1's non-overlap region, use the (potentially gain compensated) warped img1 | |
output_img_float[strict_non_overlap_mask1] = img1_for_blend[strict_non_overlap_mask1] | |
# For img2's non-overlap region, use the pixels from *before* blending | |
output_img_float[strict_non_overlap_mask2] = output_img_before_mb_float[strict_non_overlap_mask2] | |
# Convert back to uint8 for the final result for this step | |
output_img = np.clip(output_img_float, 0, 255).astype(np.uint8) | |
log_message = log_and_print("Non-overlap regions restored.\n", log_message) | |
# Optional final cleanup of absolute exterior (Post-blending mask) | |
combined_mask_binary = np.logical_or(mask_warped_binary, mask_img2_binary) | |
output_img[~combined_mask_binary] = 0 # Apply the sharp combined mask | |
log_message = log_and_print("Applied final exterior mask.\n", log_message) | |
# Cleanup | |
del img1_for_blend, img2_for_blend, output_img_before_mb_float, blend_mask_float | |
del blended_result_uint8, output_img_float | |
del mask_warped_binary, mask_img2_binary, overlap_mask_binary | |
del strict_non_overlap_mask1, strict_non_overlap_mask2 | |
if 'combined_mask_binary' in locals(): del combined_mask_binary | |
if 'weight1_norm' in locals(): del weight1_norm | |
gc.collect() | |
log_message = log_and_print(f"Multi-band blending with restoration successful.\n", log_message) | |
except Exception as e_blend: | |
log_message = log_and_print(f"Error during multi-band blending/restoration: {e_blend}. Falling back to simple overlay.\n{traceback.format_exc()}\n", log_message) | |
# Fallback uses gain_applied warped img1 over the original output_img | |
# Ensure mask_warped is usable by copyTo (needs same channel count or single channel) | |
mask_for_copy = mask_warped | |
if len(mask_warped.shape) == 2 and len(output_img.shape) == 3: | |
mask_for_copy = cv2.cvtColor(mask_warped, cv2.COLOR_GRAY2BGR) | |
elif len(mask_warped.shape) == 3 and len(output_img.shape) == 3 and mask_warped.shape[2] != output_img.shape[2]: | |
mask_for_copy = cv2.cvtColor(cv2.cvtColor(mask_warped, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR) # Force 3 channels | |
output_img = cv2.copyTo(gain_applied_warped_img1_u8, mask_for_copy, output_img) | |
if 'mask_for_copy' in locals() and mask_for_copy is not mask_warped: del mask_for_copy | |
# Ensure cleanup if error happened mid-process | |
if 'img1_for_blend' in locals(): del img1_for_blend | |
if 'img2_for_blend' in locals(): del img2_for_blend | |
if 'output_img_before_mb_float' in locals(): del output_img_before_mb_float | |
if 'blend_mask_float' in locals(): del blend_mask_float | |
if 'blended_result_uint8' in locals(): del blended_result_uint8 | |
if 'mask_warped_binary' in locals(): del mask_warped_binary # Clean up boolean masks too | |
if 'mask_img2_binary' in locals(): del mask_img2_binary | |
if 'overlap_mask_binary' in locals(): del overlap_mask_binary | |
gc.collect() | |
# --- Linear Blending --- | |
elif effective_blend_method == "linear" and has_overlap and gain_applied_warped_img1_u8 is not None: | |
log_message = log_and_print("Applying Linear blending...\n", log_message) | |
# Ensure overlap_mask is single channel for findContours | |
overlap_mask_lin = cv2.cvtColor(overlap_mask, cv2.COLOR_BGR2GRAY) if len(overlap_mask.shape) == 3 else overlap_mask | |
contours, _ = cv2.findContours(overlap_mask_lin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
if not contours: | |
log_message = log_and_print("Warning: No contours in overlap. Using simple overlay.\n", log_message) | |
mask_for_copy = mask_warped # Prepare mask for copyTo | |
if len(mask_warped.shape) == 2 and len(output_img.shape) == 3: mask_for_copy = cv2.cvtColor(mask_warped, cv2.COLOR_GRAY2BGR) | |
elif len(mask_warped.shape) == 3 and len(output_img.shape) == 3 and mask_warped.shape[2] != output_img.shape[2]: mask_for_copy = cv2.cvtColor(cv2.cvtColor(mask_warped, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR) | |
output_img = cv2.copyTo(gain_applied_warped_img1_u8, mask_for_copy, output_img) | |
if mask_for_copy is not mask_warped: del mask_for_copy | |
else: | |
main_contour = max(contours, key=cv2.contourArea) | |
x_overlap, y_overlap, w_overlap, h_overlap = cv2.boundingRect(main_contour) | |
# Clip bounding box to canvas dimensions | |
x_overlap = max(0, x_overlap); y_overlap = max(0, y_overlap) | |
w_overlap = min(w_overlap, output_width - x_overlap); h_overlap = min(h_overlap, output_height - y_overlap) | |
if w_overlap <= 0 or h_overlap <= 0: | |
log_message = log_and_print("Warning: Invalid overlap bounding box after clipping. Using simple overlay.\n", log_message) | |
mask_for_copy = mask_warped # Prepare mask for copyTo | |
if len(mask_warped.shape) == 2 and len(output_img.shape) == 3: mask_for_copy = cv2.cvtColor(mask_warped, cv2.COLOR_GRAY2BGR) | |
elif len(mask_warped.shape) == 3 and len(output_img.shape) == 3 and mask_warped.shape[2] != output_img.shape[2]: mask_for_copy = cv2.cvtColor(cv2.cvtColor(mask_warped, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR) | |
output_img = cv2.copyTo(gain_applied_warped_img1_u8, mask_for_copy, output_img) | |
if mask_for_copy is not mask_warped: del mask_for_copy | |
else: | |
# Create weight maps (float32) | |
weight1 = np.zeros(output_img.shape[:2], dtype=np.float32) | |
weight2 = np.zeros(output_img.shape[:2], dtype=np.float32) | |
blend_axis = 0 if w_overlap >= h_overlap else 1 | |
overlap_region_mask = overlap_mask_lin[y_overlap : y_overlap + h_overlap, x_overlap : x_overlap + w_overlap] | |
# Generate gradient for the overlap box | |
gradient = None | |
if blend_axis == 0: # Horizontal blend | |
gradient = np.tile(np.linspace(1.0, 0.0, w_overlap, dtype=np.float32), (h_overlap, 1)) | |
else: # Vertical blend | |
gradient = np.tile(np.linspace(1.0, 0.0, h_overlap, dtype=np.float32).reshape(-1, 1), (1, w_overlap)) | |
weight1_region = gradient | |
weight2_region = 1.0 - gradient | |
# Apply weights only where the overlap mask is valid within the bounding box | |
valid_overlap = overlap_region_mask > 0 | |
weight1[y_overlap : y_overlap + h_overlap, x_overlap : x_overlap + w_overlap][valid_overlap] = weight1_region[valid_overlap] | |
weight2[y_overlap : y_overlap + h_overlap, x_overlap : x_overlap + w_overlap][valid_overlap] = weight2_region[valid_overlap] | |
del weight1_region, weight2_region, gradient, valid_overlap, overlap_region_mask | |
gc.collect() | |
# Assign weights for non-overlapping regions (ensure masks are single channel) | |
mask_warped_lin = cv2.cvtColor(mask_warped, cv2.COLOR_BGR2GRAY) if len(mask_warped.shape) == 3 else mask_warped | |
mask_img2_lin = cv2.cvtColor(mask_img2, cv2.COLOR_BGR2GRAY) if len(mask_img2.shape) == 3 else mask_img2 | |
non_overlap_mask1 = cv2.bitwise_and(mask_warped_lin, cv2.bitwise_not(overlap_mask_lin)) | |
weight1[non_overlap_mask1 > 0] = 1.0 | |
non_overlap_mask2 = cv2.bitwise_and(mask_img2_lin, cv2.bitwise_not(overlap_mask_lin)) | |
weight2[non_overlap_mask2 > 0] = 1.0 # Weight for image 2 is 1.0 in its non-overlap area | |
# Normalize weights before potential smoothing | |
total_weight = weight1 + weight2 + 1e-6 # Add epsilon | |
weight1_norm = weight1 / total_weight | |
weight2_norm = weight2 / total_weight | |
del weight1, weight2, total_weight | |
gc.collect() | |
# --- Apply Smoothing based on blend_smooth_ksize --- | |
if blend_smooth_ksize > 0 and blend_smooth_ksize % 2 == 1: | |
log_message = log_and_print(f"Smoothing linear blend weights with GaussianBlur ksize=({blend_smooth_ksize},{blend_smooth_ksize})...\n", log_message) | |
try: | |
# Identify the actual blending area (where both weights contribute meaningfully and overlap exists) | |
overlap_area_mask_bool = (weight1_norm > 1e-6) & (weight2_norm > 1e-6) & (overlap_mask_lin > 0) | |
smoothed_w1 = cv2.GaussianBlur(weight1_norm, (blend_smooth_ksize, blend_smooth_ksize), 0) | |
smoothed_w2 = cv2.GaussianBlur(weight2_norm, (blend_smooth_ksize, blend_smooth_ksize), 0) | |
# Renormalize smoothed weights ONLY in the overlap area | |
total_smoothed_weight = smoothed_w1 + smoothed_w2 + 1e-6 | |
# Use temporary arrays to avoid modifying originals during calculation if needed | |
temp_w1 = weight1_norm.copy() # Work on copies | |
temp_w2 = weight2_norm.copy() | |
temp_w1[overlap_area_mask_bool] = (smoothed_w1 / total_smoothed_weight)[overlap_area_mask_bool] | |
temp_w2[overlap_area_mask_bool] = (smoothed_w2 / total_smoothed_weight)[overlap_area_mask_bool] | |
# Restore strict 1.0 / 0.0 weights in non-overlap areas | |
temp_w1[ non_overlap_mask1 > 0 ] = 1.0 | |
temp_w1[ non_overlap_mask2 > 0 ] = 0.0 | |
temp_w2[ non_overlap_mask1 > 0 ] = 0.0 | |
temp_w2[ non_overlap_mask2 > 0 ] = 1.0 | |
# Assign back to the working variables | |
weight1_norm = temp_w1 | |
weight2_norm = temp_w2 | |
del smoothed_w1, smoothed_w2, total_smoothed_weight, overlap_area_mask_bool, temp_w1, temp_w2 | |
gc.collect() | |
log_message = log_and_print("Linear weights smoothed and renormalized.\n", log_message) | |
except cv2.error as e_blur: | |
log_message = log_and_print(f"Warning: GaussianBlur failed for linear weights ({e_blur}). Using original weights.\n", log_message) | |
except Exception as e_blur_other: | |
log_message = log_and_print(f"Warning: Error during linear weight smoothing ({e_blur_other}). Using original weights.\n", log_message) | |
finally: | |
# Ensure cleanup of temp vars in this block | |
if 'smoothed_w1' in locals(): del smoothed_w1 | |
if 'smoothed_w2' in locals(): del smoothed_w2 | |
if 'total_smoothed_weight' in locals(): del total_smoothed_weight | |
if 'overlap_area_mask_bool' in locals(): del overlap_area_mask_bool | |
if 'temp_w1' in locals(): del temp_w1 | |
if 'temp_w2' in locals(): del temp_w2 | |
gc.collect() | |
else: | |
log_message = log_and_print("Skipping linear weight smoothing (ksize not positive odd integer).\n", log_message) | |
# --- End Smoothing --- | |
# Blend using potentially smoothed and renormalized weights | |
# Identify regions: where img1 only, img2 only, and blend region | |
non_overlap_mask1_bool = (non_overlap_mask1 > 0) | |
non_overlap_mask2_bool = (non_overlap_mask2 > 0) | |
blend_mask_bool = np.logical_not(np.logical_or(non_overlap_mask1_bool, non_overlap_mask2_bool)) & (overlap_mask_lin > 0) | |
# Copy non-overlapping part of image 1 directly where its weight is 1 | |
output_img[non_overlap_mask1_bool] = gain_applied_warped_img1_u8[non_overlap_mask1_bool] | |
# Non-overlapping part of image 2 is already in output_img from the initial placement | |
# Blend the overlapping/transition areas | |
blend_indices = np.where(blend_mask_bool) | |
num_blend_pixels = len(blend_indices[0]) | |
if num_blend_pixels > 0: | |
log_message = log_and_print(f"Blending {num_blend_pixels} pixels linearly...\n", log_message) | |
try: | |
# Ensure images are float32 for blending calculation | |
img1_blend_float = gain_applied_warped_img1_u8[blend_indices].astype(np.float32) | |
img2_blend_float = output_img[blend_indices].astype(np.float32) # Pixels already placed from img2 | |
# Get weights for the blend region and broadcast for element-wise multiplication | |
w1_blend_1d = weight1_norm[blend_indices] | |
w2_blend_1d = weight2_norm[blend_indices] | |
# Add new axis for broadcasting: (N,) -> (N, 1) to multiply with (N, 3) pixel data | |
w1_blend_broadcast = w1_blend_1d[:, np.newaxis] | |
w2_blend_broadcast = w2_blend_1d[:, np.newaxis] | |
# Perform the weighted sum | |
blended_float = w1_blend_broadcast * img1_blend_float + w2_blend_broadcast * img2_blend_float | |
blended_uint8 = blended_float.clip(0, 255).astype(np.uint8) | |
# Place the blended result back into the output image | |
output_img[blend_indices] = blended_uint8 | |
del img1_blend_float, img2_blend_float, w1_blend_1d, w2_blend_1d | |
del w1_blend_broadcast, w2_blend_broadcast, blended_float, blended_uint8 | |
gc.collect() | |
log_message = log_and_print("Linear blending successful.\n", log_message) | |
except MemoryError: | |
log_message = log_and_print("Warning: MemoryError during float blending. Using simple overlay for blend region.\n", log_message) | |
# Fallback: copy img1 over img2 in the blend region | |
blend_mask_uint8 = blend_mask_bool.astype(np.uint8) * 255 | |
mask_for_copy = cv2.cvtColor(blend_mask_uint8, cv2.COLOR_GRAY2BGR) if len(output_img.shape) == 3 else blend_mask_uint8 | |
if np.any(mask_for_copy): | |
output_img = cv2.copyTo(gain_applied_warped_img1_u8, mask_for_copy, output_img) | |
del blend_mask_uint8, mask_for_copy | |
gc.collect() | |
except Exception as e_blend_lin: | |
log_message = log_and_print(f"Warning: Error during float blending ({e_blend_lin}). Using simple overlay for blend region.\n", log_message) | |
blend_mask_uint8 = blend_mask_bool.astype(np.uint8) * 255 | |
mask_for_copy = cv2.cvtColor(blend_mask_uint8, cv2.COLOR_GRAY2BGR) if len(output_img.shape) == 3 else blend_mask_uint8 | |
if np.any(mask_for_copy): | |
output_img = cv2.copyTo(gain_applied_warped_img1_u8, mask_for_copy, output_img) | |
del blend_mask_uint8, mask_for_copy | |
gc.collect() | |
else: | |
log_message = log_and_print("Note: Linear blend mask was empty, skipping float blend step.\n", log_message) | |
# Clean up linear blending specific variables | |
del weight1_norm, weight2_norm, blend_mask_bool | |
del non_overlap_mask1, non_overlap_mask2, non_overlap_mask1_bool, non_overlap_mask2_bool | |
del mask_warped_lin, mask_img2_lin, overlap_mask_lin | |
if 'blend_indices' in locals(): del blend_indices | |
gc.collect() | |
# Clean up contour variables regardless of path taken inside linear blend | |
if 'contours' in locals(): del contours | |
if 'main_contour' in locals(): del main_contour | |
gc.collect() | |
# Simple overlay if no blending applied or specified OR if warped image was None | |
elif not has_overlap or effective_blend_method not in ["linear", "multi-band"] or gain_applied_warped_img1_u8 is None: | |
if gain_applied_warped_img1_u8 is None: | |
log_message = log_and_print("Warped image was None. Performing simple overlay (only showing img2).\n", log_message) | |
# In this case, output_img already contains img2 where it should be, and black elsewhere. | |
# No copyTo needed, as there's nothing to copy from. | |
elif not has_overlap: | |
log_message = log_and_print("No overlap. Performing simple overlay.\n", log_message) | |
else: | |
log_message = log_and_print(f"Blending method '{effective_blend_method}' or overlap condition not met. Performing simple overlay.\n", log_message) | |
if gain_applied_warped_img1_u8 is not None: # Only copy if we have something to copy | |
# Overlay gain_applied warped img1 onto output_img where mask_warped is non-zero | |
mask_for_copy = mask_warped # Prepare mask for copyTo | |
if len(mask_warped.shape) == 2 and len(output_img.shape) == 3: mask_for_copy = cv2.cvtColor(mask_warped, cv2.COLOR_GRAY2BGR) | |
elif len(mask_warped.shape) == 3 and len(output_img.shape) == 3 and mask_warped.shape[2] != output_img.shape[2]: mask_for_copy = cv2.cvtColor(cv2.cvtColor(mask_warped, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR) | |
output_img = cv2.copyTo(gain_applied_warped_img1_u8, mask_for_copy, output_img) | |
if mask_for_copy is not mask_warped: del mask_for_copy | |
# --- Final Result Assignment --- | |
final_output_img = output_img # Assign the final blended/overlaid image | |
end_time_pairwise = time.time() | |
log_message = log_and_print(f"Pairwise stitching finished. Time: {end_time_pairwise - start_time_pairwise:.2f}s\n", log_message) | |
else: # Not enough good matches | |
log_message = log_and_print(f"Error: Not enough good matches ({len(good_matches)} < {MIN_MATCH_COUNT}).\n", log_message) | |
# Minimal cleanup needed here, mostly handled in finally block | |
if 'kp1' in locals(): del kp1 | |
if 'kp2' in locals(): del kp2 | |
if 'good_matches' in locals(): del good_matches | |
except Exception as e: | |
log_message = log_and_print(f"Error during pairwise stitching: {e}\n{traceback.format_exc()}\n", log_message) | |
final_output_img = None # Ensure None is returned on error | |
finally: | |
# --- Comprehensive Cleanup --- | |
# Delete variables in roughly reverse order of creation / dependency | |
# Blend-specific intermediates | |
if 'img1_for_blend' in locals(): del img1_for_blend | |
if 'img2_for_blend' in locals(): del img2_for_blend | |
if 'output_img_before_mb_float' in locals(): del output_img_before_mb_float | |
if 'blend_mask_float' in locals(): del blend_mask_float | |
if 'weight1_norm' in locals(): del weight1_norm # From mask gen (MB or Linear) | |
if 'weight2_norm' in locals(): del weight2_norm # From Linear mask gen | |
# ... other linear/MB intermediate vars ... | |
# Gain/Warp intermediates | |
if 'gain_applied_warped_img1_u8' in locals() and gain_applied_warped_img1_u8 is not None: | |
# Only delete if it's a separate copy from warped_img1_u8 | |
if 'warped_img1_u8' in locals() and warped_img1_u8 is not None and gain_applied_warped_img1_u8 is not warped_img1_u8: | |
del gain_applied_warped_img1_u8 | |
# else it points to warped_img1_u8 or warped_img1_u8 is None/deleted already | |
if 'warped_img1_u8' in locals() and warped_img1_u8 is not None: del warped_img1_u8 | |
if 'mask_warped' in locals(): del mask_warped | |
if 'mask_img2' in locals(): del mask_img2 | |
if 'overlap_mask' in locals(): del overlap_mask | |
if 'img2_part' in locals(): del img2_part # From placing img2 | |
if 'output_img' in locals() and output_img is not None and output_img is not final_output_img: | |
# Delete intermediate output_img if it wasn't the final result (e.g., error occurred) | |
del output_img | |
# Transformation matrices and points | |
if 'H_matrix_3x3_for_canvas' in locals(): del H_matrix_3x3_for_canvas | |
if 'final_warp_M' in locals(): del final_warp_M | |
if 'mask_trans' in locals(): del mask_trans | |
if 'src_pts' in locals(): del src_pts | |
if 'dst_pts' in locals(): del dst_pts | |
# Feature matching intermediates | |
if 'kp1' in locals(): del kp1 | |
if 'kp2' in locals(): del kp2 | |
if 'des1' in locals(): del des1 | |
if 'des2' in locals(): del des2 | |
if 'good_matches' in locals(): del good_matches | |
if 'all_matches' in locals(): del all_matches | |
# Initial uint8 images | |
if 'img1_u8' in locals(): del img1_u8 | |
if 'img2_u8' in locals(): del img2_u8 | |
gc.collect() | |
return final_output_img, log_message | |
# --- Function for N-Image Stitching (Primarily for Image List Input) --- | |
def stitch_multiple_images(images, # List of NumPy images (BGR, potentially pre-cropped) | |
stitcher_mode_str="SCANS", | |
registration_resol=0.6, | |
seam_estimation_resol=0.1, | |
compositing_resol=-1.0, # Use -1.0 for default/auto | |
wave_correction=False, | |
exposure_comp_type_str="GAIN_BLOCKS", | |
enable_cropping=True, # This is for POST-stitch cropping | |
strict_no_black_edges=False, | |
# Pairwise/Fallback specific params | |
transform_model_str="Homography", | |
blend_method="multi-band", | |
enable_gain_compensation=True, | |
orb_nfeatures=2000, | |
match_ratio_thresh=0.75, | |
ransac_reproj_thresh=5.0, | |
max_distance_coeff=0.5, | |
max_blending_width=10000, | |
max_blending_height=10000, | |
blend_smooth_ksize=15, | |
num_blend_levels=4 | |
): | |
""" | |
Stitches a list of images (NumPy arrays). Tries cv2.Stitcher first (unless | |
stitcher_mode_str is 'DIRECT_PAIRWISE'), otherwise falls back to manual | |
pairwise stitching using the specified transform_model_str. | |
Returns ONE stitched image and log. | |
Input images should be in BGR format (already potentially cropped by caller). | |
Output is RGB. The 'enable_cropping' param here refers to final black border cropping. | |
""" | |
log = log_and_print(f"--- Starting Stitching Process for {len(images)} Provided Images ---\n", "") | |
total_start_time = time.time() | |
stitched_img_rgb = None # Initialize result | |
if len(images) < 2: | |
log = log_and_print("Error: Need at least two images to stitch.\n", log) | |
return None, log | |
# Check if any input image is None or empty after potential pre-cropping | |
valid_images = [] | |
for i, img in enumerate(images): | |
if img is None or img.size == 0: | |
log = log_and_print(f"Warning: Input image at index {i} is invalid (None or empty). Skipping it.\n", log) | |
else: | |
valid_images.append(img) | |
if len(valid_images) < 2: | |
log = log_and_print(f"Error: Not enough valid images ({len(valid_images)}) left after checking. Cannot stitch.\n", log) | |
del images, valid_images # Clean up | |
gc.collect() | |
return None, log | |
images = valid_images # Use the filtered list | |
log = log_and_print(f"Proceeding with {len(images)} valid images.\n", log) | |
log = log_and_print(f"Selected Stitcher Mode: {stitcher_mode_str}\n", log) | |
# Log the pairwise transform model choice, relevant if fallback or DIRECT_PAIRWISE | |
if stitcher_mode_str == "DIRECT_PAIRWISE": | |
log = log_and_print(f"Using Pairwise Transform Model: {transform_model_str}\n", log) | |
log = log_and_print(f"Pairwise Params: RANSAC Thresh={ransac_reproj_thresh}, Max Dist Coeff={max_distance_coeff}\n", log) | |
else: | |
log = log_and_print(f"Pairwise Transform Model (for fallback): {transform_model_str}\n", log) | |
log = log_and_print(f"Fallback Pairwise Params: RANSAC Thresh={ransac_reproj_thresh}, Max Dist Coeff={max_distance_coeff}\n", log) | |
log = log_and_print(f"Post-Crop: Enable={enable_cropping}, Strict Edges={strict_no_black_edges}\n", log) # Log new param | |
skip_cv2_stitcher = (stitcher_mode_str == "DIRECT_PAIRWISE") | |
stitched_img_bgr = None | |
stitcher_success = False | |
# 1. Try using cv2.Stitcher (unless skipped) | |
if not skip_cv2_stitcher: | |
log = log_and_print("\nAttempting stitching with built-in cv2.Stitcher...\n", log) | |
# Map string parameters to OpenCV constants for cv2.Stitcher modes | |
stitcher_mode_map = {"PANORAMA": cv2.Stitcher_PANORAMA, "SCANS": cv2.Stitcher_SCANS} | |
# Default to SCANS if invalid string for cv2.Stitcher mode itself | |
cv2_stitcher_mode_enum = stitcher_mode_map.get(stitcher_mode_str, cv2.Stitcher_SCANS) | |
log = log_and_print(f"Using OpenCV Stitcher Mode Enum: {cv2_stitcher_mode_enum} (from string: {stitcher_mode_str})\n", log) | |
exposure_comp_map = { | |
"NO": cv2.detail.ExposureCompensator_NO, | |
"GAIN": cv2.detail.ExposureCompensator_GAIN, | |
"GAIN_BLOCKS": cv2.detail.ExposureCompensator_GAIN_BLOCKS | |
} | |
exposure_comp_type = exposure_comp_map.get(exposure_comp_type_str, cv2.detail.ExposureCompensator_GAIN_BLOCKS) | |
log = log_and_print(f"Using Exposure Compensation: {exposure_comp_type_str}\n", log) | |
log = log_and_print(f"Wave Correction Enabled: {wave_correction}\n", log) | |
stitcher = None # Initialize stitcher object variable | |
try: | |
stitcher = cv2.Stitcher.create(cv2_stitcher_mode_enum) | |
if stitcher is None: | |
raise RuntimeError("cv2.Stitcher.create returned None.") | |
log = log_and_print(f"Setting Stitcher resolutions: Reg={registration_resol:.2f}, Seam={seam_estimation_resol:.2f}, Comp={compositing_resol:.2f}\n", log) | |
try: | |
if hasattr(stitcher, 'setRegistrationResol'): | |
stitcher.setRegistrationResol(float(registration_resol)) | |
if hasattr(stitcher, 'setSeamEstimationResol'): | |
stitcher.setSeamEstimationResol(float(seam_estimation_resol)) | |
if hasattr(stitcher, 'setCompositingResol'): | |
stitcher.setCompositingResol(float(compositing_resol)) | |
except Exception as e_res: | |
log = log_and_print(f"Warning: Could not set stitcher resolutions: {e_res}\n", log) | |
try: | |
if hasattr(stitcher, 'setWaveCorrection'): | |
stitcher.setWaveCorrection(wave_correction) | |
except Exception as e_wave: | |
log = log_and_print(f"Warning: Could not set wave correction: {e_wave}\n", log) | |
try: | |
if hasattr(stitcher, 'setExposureCompensator'): | |
compensator = cv2.detail.ExposureCompensator_createDefault(exposure_comp_type) | |
stitcher.setExposureCompensator(compensator) | |
del compensator # Release compensator object reference | |
except Exception as e_exp: | |
log = log_and_print(f"Warning: Could not set exposure compensator: {e_exp}\n", log) | |
# Ensure all images are uint8 before passing to stitcher | |
images_uint8 = [] | |
for img in images: | |
if img.dtype != np.uint8: | |
images_uint8.append(img.clip(0, 255).astype(np.uint8)) | |
else: | |
images_uint8.append(img) | |
status = cv2.Stitcher_ERR_NEED_MORE_IMGS # Initialize status to a known failure code | |
stitched_img_raw = None | |
try: | |
log = log_and_print("Executing stitcher.stitch()...\n", log) | |
status, stitched_img_raw = stitcher.stitch(images_uint8) # Input 'images' should be BGR uint8 | |
log = log_and_print(f"stitcher.stitch() returned status: {status}\n", log) # Log the status code | |
except cv2.error as e_stitch: | |
log = log_and_print(f"OpenCV Error occurred DURING stitcher.stitch() call: {e_stitch}\n", log) | |
log = log_and_print(f"Traceback:\n{traceback.format_exc()}\n", log) | |
log = log_and_print("Falling back to manual pairwise stitching method due to stitch() error.\n", log) | |
status = -99 # Set status to a custom failure code to ensure fallback | |
stitched_img_raw = None | |
except Exception as e_stitch_other: | |
log = log_and_print(f"Unexpected Error occurred DURING stitcher.stitch() call: {e_stitch_other}\n", log) | |
log = log_and_print(f"Traceback:\n{traceback.format_exc()}\n", log) | |
log = log_and_print("Falling back to manual pairwise stitching method due to unexpected stitch() error.\n", log) | |
status = -100 # Set status to a custom failure code | |
stitched_img_raw = None | |
finally: | |
del images_uint8 | |
gc.collect() | |
if status == cv2.Stitcher_OK: | |
log = log_and_print("cv2.Stitcher successful!\n", log) | |
if stitched_img_raw is not None and stitched_img_raw.size > 0: | |
log = log_and_print(f"Stitcher output dimensions (raw): {stitched_img_raw.shape}\n", log) | |
# Apply FINAL black border cropping if enabled | |
cropped_result = crop_black_borders(stitched_img_raw, enable_cropping, strict_no_black_edges) | |
if cropped_result is not None and cropped_result.size > 0 : | |
stitched_img_bgr = cropped_result | |
log = log_and_print(f"Final dimensions after POST-stitch cropping: {stitched_img_bgr.shape}\n", log) | |
else: | |
stitched_img_bgr = stitched_img_raw | |
log = log_and_print("POST-stitch cropping failed or disabled, using raw stitcher output.\n", log) | |
stitcher_success = True | |
del stitched_img_raw | |
if 'cropped_result' in locals() and cropped_result is not stitched_img_bgr: | |
del cropped_result | |
gc.collect() | |
else: | |
log = log_and_print("Error: cv2.Stitcher returned status OK but the image is empty.\n", log) | |
else: | |
error_codes = { getattr(cv2, k): k for k in dir(cv2) if k.startswith('Stitcher_ERR_') } | |
error_codes[-99] = "ERR_STITCH_CV_ERROR" | |
error_codes[-100] = "ERR_STITCH_EXCEPTION" | |
# Check if fallback message was already logged by exceptions during stitch() | |
if "Falling back to manual pairwise stitching method due to" not in log.splitlines()[-5:]: | |
log = log_and_print(f"cv2.Stitcher failed with status code: {status} ({error_codes.get(status, f'Unknown Error {status}')})\n", log) | |
log = log_and_print("Falling back to manual pairwise stitching method...\n", log) | |
except AttributeError as e_attr: | |
log = log_and_print(f"AttributeError during Stitcher setup ({e_attr}). Falling back.\n{traceback.format_exc()}\n", log) | |
except RuntimeError as e_runtime: | |
log = log_and_print(f"RuntimeError during Stitcher setup ({e_runtime}). Falling back.\n{traceback.format_exc()}\n", log) | |
except cv2.error as e: | |
log = log_and_print(f"OpenCV Error during Stitcher operation: {e}. Falling back.\n", log) | |
if "OutOfMemoryError" in str(e) or "Insufficient memory" in str(e): | |
log = log_and_print(">>> Specific OutOfMemoryError detected. Reduce resolutions or use more RAM.\n", log) | |
log = log_and_print(f"{traceback.format_exc()}\n", log) | |
except Exception as e: | |
log = log_and_print(f"Unexpected error during Stitcher: {e}. Falling back.\n{traceback.format_exc()}\n", log) | |
finally: | |
if stitcher is not None: | |
# Attempt to release stitcher resources if possible (may not exist) | |
try: | |
del stitcher | |
except NameError: | |
pass | |
gc.collect() | |
# 2. Fallback or Direct Pairwise Stitching | |
# Trigger if cv2.Stitcher was skipped OR if it failed | |
if skip_cv2_stitcher or not stitcher_success: | |
# Add clearer logging based on the reason | |
if skip_cv2_stitcher: | |
log = log_and_print(f"\n--- Starting Sequential Pairwise Stitching (Direct Mode, Transform: {transform_model_str}) ---\n", log) | |
else: | |
log = log_and_print(f"\n--- Starting Sequential Pairwise Stitching (Fallback, Transform: {transform_model_str}) ---\n", log) | |
if len(images) >= 2: | |
# Start with the first valid image. Ensure it's uint8. | |
if images[0].dtype != np.uint8: | |
current_stitched_image = images[0].clip(0, 255).astype(np.uint8) | |
else: | |
current_stitched_image = images[0].copy() # Copy to avoid modifying original list item | |
sequential_stitch_success = True | |
for i in range(1, len(images)): | |
log = log_and_print(f"\nSequentially stitching image {i+1} of {len(images)} using pairwise method...\n", log) | |
# Ensure next image is uint8 | |
if images[i].dtype != np.uint8: | |
next_image = images[i].clip(0, 255).astype(np.uint8) | |
else: | |
next_image = images[i] # Can use directly if already uint8 | |
result, pairwise_log = stitch_pairwise_images( | |
current_stitched_image, # BGR uint8 | |
next_image, # BGR uint8 | |
transform_model_str=transform_model_str, | |
blend_method=blend_method, | |
enable_gain_compensation=enable_gain_compensation, | |
orb_nfeatures=orb_nfeatures, | |
match_ratio_thresh=match_ratio_thresh, | |
ransac_reproj_thresh=ransac_reproj_thresh, | |
max_distance_coeff=max_distance_coeff, | |
max_blending_width=max_blending_width, | |
max_blending_height=max_blending_height, | |
blend_smooth_ksize=blend_smooth_ksize, | |
num_blend_levels=num_blend_levels | |
) | |
log += pairwise_log | |
if result is None: | |
log = log_and_print(f"Error: Failed to stitch image {i+1} onto previous composite in the pairwise step. Aborting sequential process.\n", log) # Corrected index in log | |
sequential_stitch_success = False | |
if 'current_stitched_image' in locals() and current_stitched_image is not None: | |
del current_stitched_image # Clean up intermediate result | |
gc.collect() | |
break | |
# Release the previous intermediate image before assigning the new one | |
if 'current_stitched_image' in locals() and current_stitched_image is not None: | |
del current_stitched_image | |
gc.collect() | |
current_stitched_image = result # Result is BGR uint8 | |
log = log_and_print(f"Intermediate stitched shape: {current_stitched_image.shape}\n", log) | |
# Ensure next_image is cleaned up if it was a conversion | |
if next_image is not images[i]: | |
del next_image | |
gc.collect() | |
if sequential_stitch_success and current_stitched_image is not None: | |
log = log_and_print("\nSequential pairwise stitching complete. Applying final cropping...\n", log) | |
# Apply FINAL black border cropping if enabled | |
cropped_fallback = crop_black_borders(current_stitched_image, enable_cropping, strict_no_black_edges) | |
if cropped_fallback is not None and cropped_fallback.size > 0: | |
stitched_img_bgr = cropped_fallback | |
log = log_and_print(f"Final dimensions after POST-stitch cropping: {stitched_img_bgr.shape}\n", log) | |
else: | |
stitched_img_bgr = current_stitched_image # Use uncropped if cropping failed | |
log = log_and_print("POST-stitch cropping failed or disabled, using uncropped manual result.\n", log) | |
# Clean up the last intermediate/uncropped result if cropping was successful and created a new object | |
if cropped_fallback is not current_stitched_image and current_stitched_image is not None: | |
del current_stitched_image | |
if 'cropped_fallback' in locals() and cropped_fallback is not stitched_img_bgr: | |
del cropped_fallback | |
gc.collect() | |
else: | |
log = log_and_print("Sequential pairwise stitching process could not produce a final result.\n", log) | |
# Ensure cleanup if loop broke early or current_stitched_image was None/deleted | |
if 'current_stitched_image' in locals() and current_stitched_image is not None: | |
del current_stitched_image | |
gc.collect() | |
else: # Handle len(images) < 2 case (shouldn't happen due to initial check, but safety) | |
log = log_and_print("Error: Not enough images for pairwise stitching (internal check).\n", log) | |
# Clean up the input image list now that it's processed | |
del images | |
if 'valid_images' in locals(): del valid_images # Should be same as images now | |
gc.collect() | |
# 3. Final Result Check and Return | |
total_end_time = time.time() | |
log = log_and_print(f"\nTotal processing time: {total_end_time - total_start_time:.2f} seconds.\n", log) | |
if stitched_img_bgr is not None and stitched_img_bgr.size > 0: | |
log = log_and_print("Stitching process finished for image list.", log) | |
try: | |
stitched_img_rgb = cv2.cvtColor(stitched_img_bgr, cv2.COLOR_BGR2RGB) # Convert BGR to RGB for Gradio | |
del stitched_img_bgr # Release BGR version | |
gc.collect() | |
return stitched_img_rgb, log | |
except cv2.error as e_cvt: | |
log = log_and_print(f"\nError converting final image to RGB: {e_cvt}. Returning None.\n", log) | |
if 'stitched_img_bgr' in locals(): del stitched_img_bgr | |
gc.collect() | |
return None, log | |
else: | |
log = log_and_print("Error: Stitching failed. No final image generated.", log) | |
if 'stitched_img_bgr' in locals() and stitched_img_bgr is not None: | |
del stitched_img_bgr | |
gc.collect() | |
return None, log | |
# --- Video Frame Stitching --- | |
def stitch_video_frames(video_path, | |
crop_top_percent=0.0, | |
crop_bottom_percent=0.0, | |
enable_cropping=True, # This is for POST-stitch cropping | |
strict_no_black_edges=False, | |
# Pairwise specific params | |
transform_model_str="Homography", | |
blend_method="multi-band", | |
enable_gain_compensation=True, | |
orb_nfeatures=2000, | |
match_ratio_thresh=0.75, | |
ransac_reproj_thresh=5.0, | |
max_distance_coeff=0.5, | |
max_blending_width=10000, | |
max_blending_height=10000, | |
blend_smooth_ksize=15, | |
num_blend_levels=4, | |
# Video specific params | |
sample_interval_ms=3000, | |
max_composite_width=10000, | |
max_composite_height=10000, | |
progress=None): | |
""" | |
Reads a video, samples frames incrementally, applies percentage crop, | |
and stitches them sequentially using the specified transform_model_str. | |
Includes size checks to limit composite image growth. | |
Returns a list of stitched images (RGB format) and a log. | |
""" | |
log = log_and_print(f"--- Starting Incremental Video Stitching for: {os.path.basename(video_path)} ---\n", "") | |
log = log_and_print(f"Params: Interval={sample_interval_ms}ms, Transform={transform_model_str}, ORB={orb_nfeatures}, Ratio={match_ratio_thresh}\n", log) | |
log = log_and_print(f"Params Cont'd: RANSAC Thresh={ransac_reproj_thresh}, Max Dist Coeff={max_distance_coeff}\n", log) | |
log = log_and_print(f"Composite Limits: MaxW={max_composite_width}, MaxH={max_composite_height}\n", log) | |
log = log_and_print(f"Pre-Crop: Top={crop_top_percent}%, Bottom={crop_bottom_percent}%\n", log) | |
log = log_and_print(f"Post-Crop Black Borders: {enable_cropping}, Strict Edges: {strict_no_black_edges}\n", log) | |
log = log_and_print(f"Blending: Method={blend_method}, GainComp={enable_gain_compensation}, SmoothKSize={blend_smooth_ksize}, MB Levels={num_blend_levels}\n", log) | |
total_start_time = time.time() | |
stitched_results_rgb = [] # Store final RGB images | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
log = log_and_print(f"Error: Could not open video file: {video_path}\n", log) | |
return [], log | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_count_total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
if fps <= 0 or np.isnan(fps): # Handle invalid FPS reads | |
fps = 30 # Default FPS | |
log = log_and_print("Warning: Could not read valid FPS, defaulting to 30.\n", log) | |
if frame_count_total <= 0: # Handle invalid frame count reads | |
log = log_and_print("Warning: Could not read valid total frame count.\n", log) | |
total_sampled_estimate = 0 # Cannot estimate progress accurately | |
else: | |
# Estimate total frames to be sampled, avoid division by zero if interval is 0 | |
frames_per_sample = max(1, int(round(fps * (sample_interval_ms / 1000.0)))) if sample_interval_ms > 0 else frame_count_total | |
total_sampled_estimate = frame_count_total / frames_per_sample if frames_per_sample > 0 else 0 | |
frame_interval = max(1, int(round(fps * (sample_interval_ms / 1000.0)))) | |
log = log_and_print(f"Video Info: ~{fps:.2f} FPS, {frame_count_total} Frames, Sampling every {frame_interval} frames.\n", log) | |
frame_num = 0 | |
processed_sampled_count = 0 # Counter for progress bar | |
anchor_frame = None # The starting frame of the current sequence (BGR, cropped) | |
current_composite = None # The stitched result being built (BGR, uint8) | |
last_saved_composite = None # Keep track of the last saved image to avoid duplicates | |
while True: | |
frame_bgr_raw = None # Initialize here for cleanup later | |
try: | |
if cap is None or not cap.isOpened(): | |
log = log_and_print("Error: Video capture became invalid during processing.\n", log) | |
break | |
ret, frame_bgr_raw = cap.read() | |
if not ret: | |
log = log_and_print("\nEnd of video stream reached.\n", log) | |
break # End of video | |
# --- Sampling Logic --- | |
if frame_num % frame_interval == 0: | |
if frame_bgr_raw is not None and frame_bgr_raw.size > 0: | |
processed_sampled_count += 1 | |
frame_bgr = None # Initialize BGR frame variable | |
# --- Frame Format Check --- | |
if frame_bgr_raw.ndim == 2: | |
frame_bgr = cv2.cvtColor(frame_bgr_raw, cv2.COLOR_GRAY2BGR) | |
elif frame_bgr_raw.ndim == 3 and frame_bgr_raw.shape[2] == 4: | |
frame_bgr = cv2.cvtColor(frame_bgr_raw, cv2.COLOR_BGRA2BGR) | |
elif frame_bgr_raw.ndim == 3 and frame_bgr_raw.shape[2] == 3: | |
frame_bgr = frame_bgr_raw # Already BGR | |
else: | |
log = log_and_print(f"Warning: Skipping frame {frame_num} due to unexpected shape {frame_bgr_raw.shape}\n", log) | |
if frame_bgr_raw is not None: del frame_bgr_raw # Clean up original frame | |
gc.collect() | |
frame_num += 1 | |
continue # Skip to next frame read | |
# Release the raw frame once converted/checked (if a copy was made) | |
if frame_bgr is not frame_bgr_raw: | |
del frame_bgr_raw | |
frame_bgr_raw = None # Mark as deleted | |
gc.collect() | |
cropped_frame_bgr = crop_image_by_percent(frame_bgr, crop_top_percent, crop_bottom_percent) | |
del frame_bgr # Release the uncropped BGR version | |
gc.collect() | |
# Check if cropping failed or resulted in an empty image | |
if cropped_frame_bgr is None or cropped_frame_bgr.size == 0: | |
log = log_and_print(f"Warning: Skipping frame {frame_num} because percentage cropping failed or resulted in empty image.\n", log) | |
if cropped_frame_bgr is not None: del cropped_frame_bgr # Should be None, but safety check | |
gc.collect() | |
frame_num += 1 | |
continue # Skip to next frame read | |
# Now use 'cropped_frame_bgr' as the current frame for stitching | |
current_frame_for_stitch = cropped_frame_bgr # BGR, uint8, potentially cropped | |
if progress is not None and total_sampled_estimate > 0: | |
# Ensure progress doesn't exceed 1.0 | |
progress_fraction = min(1.0, processed_sampled_count / total_sampled_estimate) | |
progress(progress_fraction, desc=f"Processing Sample {processed_sampled_count}/{int(total_sampled_estimate)}") | |
elif progress is not None: | |
# Fallback progress if estimate is bad | |
progress(processed_sampled_count / (processed_sampled_count + 10), desc=f"Processing Sample {processed_sampled_count}") | |
log = log_and_print(f"\n--- Processing sampled frame index {frame_num} (Count: {processed_sampled_count}) ---\n", log) | |
log = log_and_print(f"Frame shape after potential pre-crop: {current_frame_for_stitch.shape}\n", log) | |
# --- Stitching Logic --- | |
if anchor_frame is None: | |
# Start a new sequence | |
anchor_frame = current_frame_for_stitch.copy() # Make a copy | |
current_composite = anchor_frame # Start composite is the anchor itself | |
log = log_and_print(f"Frame {frame_num}: Set as new anchor (Shape: {anchor_frame.shape}).\n", log) | |
# No need to stitch yet, just set the anchor | |
else: | |
# Try stitching the current composite with the new frame | |
log = log_and_print(f"Attempting stitch: Composite({current_composite.shape}) + Frame({current_frame_for_stitch.shape})\n", log) | |
stitch_result, stitch_log = stitch_pairwise_images( | |
current_composite, # Previous result or anchor (uint8) | |
current_frame_for_stitch, # New frame to add (uint8) | |
transform_model_str=transform_model_str, | |
blend_method=blend_method, | |
enable_gain_compensation=enable_gain_compensation, | |
orb_nfeatures=orb_nfeatures, | |
match_ratio_thresh=match_ratio_thresh, | |
ransac_reproj_thresh=ransac_reproj_thresh, | |
max_distance_coeff=max_distance_coeff, | |
max_blending_width=max_blending_width, | |
max_blending_height=max_blending_height, | |
blend_smooth_ksize=blend_smooth_ksize, | |
num_blend_levels=num_blend_levels | |
) | |
log += stitch_log | |
if stitch_result is not None and stitch_result.size > 0: | |
# --- Stitching SUCCEEDED --- | |
log = log_and_print(f"Success: Stitched frame {frame_num}. New composite shape: {stitch_result.shape}\n", log) | |
# Release old composite before assigning new one | |
del current_composite | |
gc.collect() | |
current_composite = stitch_result # Update the composite (stitch_result is BGR uint8) | |
# anchor_frame remains the same for this sequence | |
# --- Check Size Limit --- | |
h_curr, w_curr = current_composite.shape[:2] | |
size_limit_exceeded = False | |
# Check only if limit > 0 | |
if max_composite_width > 0 and w_curr > max_composite_width: | |
log = log_and_print(f"ACTION: Composite width ({w_curr}) exceeded limit ({max_composite_width}).\n", log) | |
size_limit_exceeded = True | |
if max_composite_height > 0 and h_curr > max_composite_height: | |
log = log_and_print(f"ACTION: Composite height ({h_curr}) exceeded limit ({max_composite_height}).\n", log) | |
size_limit_exceeded = True | |
if size_limit_exceeded: | |
log = log_and_print("Saving current composite and starting new sequence with NEXT frame.\n", log) | |
# Apply FINAL black border cropping if enabled | |
post_cropped_composite = crop_black_borders(current_composite, enable_cropping, strict_no_black_edges) | |
if post_cropped_composite is not None and post_cropped_composite.size > 0: | |
# Avoid saving the exact same image twice in a row | |
is_duplicate = False | |
if last_saved_composite is not None: | |
try: | |
# Simple check: compare shapes first, then content if shapes match | |
if last_saved_composite.shape == post_cropped_composite.shape: | |
if np.array_equal(last_saved_composite, post_cropped_composite): | |
is_duplicate = True | |
except Exception as e_comp: | |
log = log_and_print(f"Warning: Error comparing images for duplication check: {e_comp}\n", log) | |
if not is_duplicate: | |
try: | |
stitched_results_rgb.append(cv2.cvtColor(post_cropped_composite, cv2.COLOR_BGR2RGB)) | |
# Update last_saved_composite only if append is successful | |
if last_saved_composite is not None: del last_saved_composite | |
last_saved_composite = post_cropped_composite.copy() # Store the saved one (BGR) | |
log = log_and_print(f"Saved composite image {len(stitched_results_rgb)} (Post-Cropped Shape: {post_cropped_composite.shape}).\n", log) | |
except cv2.error as e_cvt: | |
log = log_and_print(f"Warning: Failed to convert size-limited composite to RGB: {e_cvt}\n", log) | |
except Exception as e_save: | |
log = log_and_print(f"Warning: Failed to save size-limited composite: {e_save}\n", log) | |
else: | |
log = log_and_print("Skipping save: Result identical to previously saved image.\n", log) | |
# Clean up the post-cropped version if it wasn't stored in last_saved_composite | |
if last_saved_composite is not post_cropped_composite: | |
del post_cropped_composite | |
gc.collect() | |
else: | |
log = log_and_print("Warning: Post-stitch cropping failed for the size-limited composite, skipping save.\n", log) | |
if post_cropped_composite is not None: del post_cropped_composite # Delete if it existed but was empty | |
# Reset for the next frame to become the anchor | |
del current_composite | |
if anchor_frame is not None: del anchor_frame # Delete old anchor too | |
if last_saved_composite is not None: del last_saved_composite # Reset duplicate check too | |
current_composite = None | |
anchor_frame = None | |
last_saved_composite = None | |
gc.collect() | |
# --- End Size Check --- | |
else: | |
# --- Stitching FAILED --- | |
log = log_and_print(f"Failed: Could not stitch frame {frame_num} onto current composite.\n", log) | |
# Save the *previous* valid composite (if it exists and is not just the anchor) | |
save_previous = False | |
if current_composite is not None and anchor_frame is not None: | |
# Check if composite is actually different from the anchor | |
try: | |
if current_composite.shape != anchor_frame.shape or not np.array_equal(current_composite, anchor_frame): | |
save_previous = True | |
except Exception as e_comp: | |
log = log_and_print(f"Warning: Error comparing composite to anchor: {e_comp}\n", log) | |
save_previous = True # Assume different if compare fails | |
if save_previous: | |
log = log_and_print("ACTION: Saving the previously stitched result before resetting.\n", log) | |
# Apply FINAL black border cropping if enabled | |
post_cropped_composite = crop_black_borders(current_composite, enable_cropping, strict_no_black_edges) | |
if post_cropped_composite is not None and post_cropped_composite.size > 0: | |
is_duplicate = False | |
if last_saved_composite is not None: | |
try: | |
if last_saved_composite.shape == post_cropped_composite.shape: | |
if np.array_equal(last_saved_composite, post_cropped_composite): | |
is_duplicate = True | |
except Exception as e_comp: | |
log = log_and_print(f"Warning: Error comparing images for duplication check: {e_comp}\n", log) | |
if not is_duplicate: | |
try: | |
stitched_results_rgb.append(cv2.cvtColor(post_cropped_composite, cv2.COLOR_BGR2RGB)) | |
if last_saved_composite is not None: del last_saved_composite | |
last_saved_composite = post_cropped_composite.copy() # Store BGR | |
log = log_and_print(f"Saved composite image {len(stitched_results_rgb)} (Post-Cropped Shape: {post_cropped_composite.shape}).\n", log) | |
except cv2.error as e_cvt: | |
log = log_and_print(f"Warning: Failed to convert previous composite to RGB: {e_cvt}\n", log) | |
except Exception as e_save: | |
log = log_and_print(f"Warning: Failed to save previous composite: {e_save}\n", log) | |
else: | |
log = log_and_print("Skipping save: Result identical to previously saved image.\n", log) | |
if last_saved_composite is not post_cropped_composite: | |
del post_cropped_composite | |
gc.collect() | |
else: | |
log = log_and_print("Warning: Post-stitch cropping failed for the previously stitched result, skipping save.\n", log) | |
if post_cropped_composite is not None: del post_cropped_composite | |
else: | |
log = log_and_print("No previous composite to save (stitching failed on first attempt for this anchor or composite was just the anchor).\n", log) | |
# The frame that *failed* to stitch becomes the new anchor | |
log = log_and_print(f"ACTION: Setting frame {frame_num} (shape: {current_frame_for_stitch.shape}) as the new anchor.\n", log) | |
if current_composite is not None: del current_composite # Delete the old composite | |
if anchor_frame is not None: del anchor_frame # Delete the old anchor | |
if last_saved_composite is not None: del last_saved_composite # Reset duplicate check | |
gc.collect() | |
anchor_frame = current_frame_for_stitch.copy() # Use the frame that failed (already cropped) | |
current_composite = anchor_frame # Reset composite to this new anchor | |
last_saved_composite = None | |
gc.collect() | |
# current_frame_for_stitch is now anchor_frame, no need to delete separately below | |
# --- Clean up current frame AFTER processing (if it wasn't made the new anchor) --- | |
# If stitching succeeded OR if it failed but wasn't the first frame, | |
# current_frame_for_stitch needs cleanup unless it just became the anchor. | |
if 'current_frame_for_stitch' in locals() and current_frame_for_stitch is not anchor_frame: | |
del current_frame_for_stitch | |
gc.collect() | |
else: # Handle cases where frame_bgr_raw is None or empty after read | |
if frame_bgr_raw is not None: | |
del frame_bgr_raw | |
frame_bgr_raw = None | |
gc.collect() | |
else: # Frame not sampled | |
# Still need to release the raw frame if it was read | |
if frame_bgr_raw is not None: | |
del frame_bgr_raw | |
frame_bgr_raw = None | |
# Don't gc.collect() on every skipped frame, too slow | |
frame_num += 1 | |
# Loop continues | |
except Exception as loop_error: | |
log = log_and_print(f"Unexpected error in main video loop at frame {frame_num}: {loop_error}\n{traceback.format_exc()}\n", log) | |
# Try to continue to next frame if possible, or break if capture seems broken | |
if cap is None or not cap.isOpened(): | |
log = log_and_print("Video capture likely broken, stopping loop.\n", log) | |
break | |
else: | |
frame_num += 1 # Ensure frame counter increments | |
# Clean up potentially lingering frame data from the failed iteration | |
if 'frame_bgr_raw' in locals() and frame_bgr_raw is not None: del frame_bgr_raw | |
if 'frame_bgr' in locals() and frame_bgr is not None: del frame_bgr | |
if 'cropped_frame_bgr' in locals() and cropped_frame_bgr is not None: del cropped_frame_bgr | |
if 'current_frame_for_stitch' in locals() and current_frame_for_stitch is not None and current_frame_for_stitch is not anchor_frame: del current_frame_for_stitch | |
gc.collect() | |
# --- After the loop: Check if there's a final composite to save --- | |
if current_composite is not None and anchor_frame is not None: | |
# Only save if it contains more than just the last anchor frame OR if it's the *only* result | |
save_final = False | |
if len(stitched_results_rgb) == 0: # If no images saved yet, save this one | |
save_final = True | |
else: | |
try: | |
if current_composite.shape != anchor_frame.shape or not np.array_equal(current_composite, anchor_frame): | |
save_final = True | |
except Exception as e_comp: | |
log = log_and_print(f"Warning: Error comparing final composite to anchor: {e_comp}\n", log) | |
save_final = True # Save if comparison fails | |
if save_final: | |
log = log_and_print("\nEnd of frames reached. Checking final composite...\n", log) | |
post_cropped_final = crop_black_borders(current_composite, enable_cropping, strict_no_black_edges) | |
if post_cropped_final is not None and post_cropped_final.size > 0: | |
is_duplicate = False | |
if last_saved_composite is not None: | |
try: | |
if last_saved_composite.shape == post_cropped_final.shape: | |
if np.array_equal(last_saved_composite, post_cropped_final): | |
is_duplicate = True | |
except Exception as e_comp: | |
log = log_and_print(f"Warning: Error comparing final image for duplication check: {e_comp}\n", log) | |
if not is_duplicate: | |
try: | |
stitched_results_rgb.append(cv2.cvtColor(post_cropped_final, cv2.COLOR_BGR2RGB)) | |
log = log_and_print(f"Saved final composite image {len(stitched_results_rgb)} (Post-Cropped Shape: {post_cropped_final.shape}).\n", log) | |
# No need to update last_saved_composite here, loop is finished | |
except cv2.error as e_cvt: | |
log = log_and_print(f"Warning: Failed to convert final composite to RGB: {e_cvt}\n", log) | |
except Exception as e_save: | |
log = log_and_print(f"Warning: Failed to save final composite: {e_save}\n", log) | |
else: | |
log = log_and_print("Skipping save of final composite: Result identical to previously saved image.\n", log) | |
# Clean up final cropped image if it existed | |
del post_cropped_final | |
gc.collect() | |
else: | |
log = log_and_print("Warning: Post-stitch cropping failed for the final composite, skipping save.\n", log) | |
if post_cropped_final is not None: del post_cropped_final # Delete if empty | |
else: | |
log = log_and_print("\nEnd of frames reached. Final composite was identical to its anchor frame and not the only result, not saving.\n", log) | |
# --- Final Cleanup --- | |
if cap is not None and cap.isOpened(): | |
cap.release() | |
if 'cap' in locals(): del cap | |
if 'anchor_frame' in locals() and anchor_frame is not None: del anchor_frame | |
if 'current_composite' in locals() and current_composite is not None: del current_composite | |
if 'last_saved_composite' in locals() and last_saved_composite is not None: del last_saved_composite | |
gc.collect() | |
total_end_time = time.time() | |
log = log_and_print(f"\nVideo stitching process finished. Found {len(stitched_results_rgb)} stitched image(s).", log) | |
log = log_and_print(f"\nTotal processing time: {total_end_time - total_start_time:.2f} seconds.\n", log) | |
# Filter out potential None entries just before returning | |
final_results = [img for img in stitched_results_rgb if img is not None and img.size > 0] | |
if len(final_results) != len(stitched_results_rgb): | |
log = log_and_print(f"Warning: Filtered out {len(stitched_results_rgb) - len(final_results)} None or empty results before final return.\n", log) | |
# Clean up the original list with potential Nones | |
del stitched_results_rgb | |
gc.collect() | |
return final_results, log | |
# --- Gradio Interface Function --- | |
def run_stitching_interface(input_files, | |
crop_top_percent, | |
crop_bottom_percent, | |
stitcher_mode_str, # For cv2.Stitcher | |
registration_resol, | |
seam_estimation_resol, | |
compositing_resol, | |
wave_correction, | |
exposure_comp_type_str, # For cv2.Stitcher | |
enable_cropping, # Post-stitch black border crop | |
strict_no_black_edges_input, | |
# Detailed Stitcher Settings | |
transform_model_str, | |
blend_method_str, | |
enable_gain_compensation, | |
orb_nfeatures, | |
match_ratio_thresh, | |
ransac_reproj_thresh_input, | |
max_distance_coeff_input, | |
max_blending_width, | |
max_blending_height, | |
blend_smooth_ksize_input, | |
num_blend_levels_input, | |
# Video specific settings | |
sample_interval_ms, | |
max_composite_width_video, | |
max_composite_height_video, | |
progress=gr.Progress(track_tqdm=True) | |
): | |
""" | |
Wrapper function called by the Gradio interface. | |
Handles input (images or video), applies pre-cropping, | |
calls the appropriate stitching logic (passing transform_model_str), | |
and returns results. | |
""" | |
if input_files is None or len(input_files) == 0: | |
return [], "Please upload images or a video file." | |
# Convert Gradio inputs to correct types | |
blend_smooth_ksize = int(blend_smooth_ksize_input) if blend_smooth_ksize_input is not None else -1 | |
num_blend_levels = int(num_blend_levels_input) if num_blend_levels_input is not None else 4 | |
ransac_reproj_thresh = float(ransac_reproj_thresh_input) if ransac_reproj_thresh_input is not None else 3.0 | |
max_distance_coeff = float(max_distance_coeff_input) if max_distance_coeff_input is not None else 0.5 | |
log = f"Received {len(input_files)} file(s).\n" | |
log = log_and_print(f"Pre-Crop Settings: Top={crop_top_percent}%, Bottom={crop_bottom_percent}%\n", log) | |
log = log_and_print(f"Post-Crop Black Borders: Enabled={enable_cropping}, Strict Edges={strict_no_black_edges_input}\n", log) | |
# Log detailed settings including new ones | |
log = log_and_print(f"Detailed Settings: Transform={transform_model_str}, Blend={blend_method_str}, GainComp={enable_gain_compensation}, ORB={orb_nfeatures}, Ratio={match_ratio_thresh}\n", log) | |
log = log_and_print(f"Detailed Settings Cont'd: RANSAC Thresh={ransac_reproj_thresh}, MaxDistCoeff={max_distance_coeff}, MaxBlendW={max_blending_width}, MaxBlendH={max_blending_height}, SmoothKSize={blend_smooth_ksize}, MBLevels={num_blend_levels}\n", log) | |
progress(0, desc="Processing Input...") | |
# Determine input type: List of images or a single video | |
is_video_input = False | |
video_path = None | |
image_paths = [] | |
# Check file types using mimetypes | |
try: | |
# Handle potential TempfileWrappers or string paths | |
input_filepaths = [] | |
for f in input_files: | |
if hasattr(f, 'name'): # Gradio File object | |
input_filepaths.append(f.name) | |
elif isinstance(f, str): # String path (e.g., from examples) | |
input_filepaths.append(f) | |
else: | |
log = log_and_print(f"Warning: Unexpected input file type: {type(f)}. Skipping.\n", log) | |
if len(input_filepaths) == 1: | |
filepath = input_filepaths[0] | |
mime_type, _ = mimetypes.guess_type(filepath) | |
if mime_type and mime_type.startswith('video'): | |
is_video_input = True | |
video_path = filepath | |
log = log_and_print(f"Detected video input: {os.path.basename(video_path)}\n", log) | |
elif mime_type and mime_type.startswith('image'): | |
log = log_and_print("Detected single image input. Need at least two images for list stitching.\n", log) | |
image_paths = [filepath] # Keep it for error message later | |
else: | |
# Fallback check: try reading as image | |
img_test = None | |
try: | |
# Use np.fromfile for paths that might have unicode characters | |
n = np.fromfile(filepath, np.uint8) | |
if n.size > 0: | |
img_test = cv2.imdecode(n, cv2.IMREAD_COLOR) | |
else: | |
raise ValueError("File is empty") | |
if img_test is not None and img_test.size > 0: | |
log = log_and_print(f"Warning: Unknown file type for single file: {os.path.basename(filepath)}. Assuming image based on successful read. Need >= 2 images.\n", log) | |
image_paths = [filepath] | |
del img_test | |
else: | |
raise ValueError("Cannot read as image or image is empty") | |
except Exception as e_read_check: | |
log = log_and_print(f"Error: Could not determine file type or read single file: {os.path.basename(filepath)}. Error: {e_read_check}. Please provide video or image files.\n", log) | |
if img_test is not None: del img_test | |
return [], log | |
else: # Multiple files uploaded | |
image_paths = [] | |
non_image_skipped = False | |
for filepath in input_filepaths: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
is_image = False | |
if mime_type and mime_type.startswith('image'): | |
is_image = True | |
else: | |
# Fallback check: Try reading as image | |
img_test = None | |
try: | |
n = np.fromfile(filepath, np.uint8) | |
if n.size > 0: | |
img_test = cv2.imdecode(n, cv2.IMREAD_COLOR) | |
else: | |
raise ValueError("File is empty") | |
if img_test is not None and img_test.size > 0: | |
is_image = True | |
log = log_and_print(f"Warning: Non-image or unknown file type detected: {os.path.basename(filepath)}. Assuming image based on read success.\n", log) | |
del img_test | |
else: | |
non_image_skipped = True | |
log = log_and_print(f"Warning: Skipping non-image file (or empty/failed read): {os.path.basename(filepath)}\n", log) | |
except Exception as e_read_check: | |
non_image_skipped = True | |
log = log_and_print(f"Warning: Skipping non-image file (read failed: {e_read_check}): {os.path.basename(filepath)}\n", log) | |
if img_test is not None: del img_test | |
if is_image: | |
image_paths.append(filepath) | |
if not image_paths: # No valid images found | |
if non_image_skipped: | |
log = log_and_print("Error: No valid image files found in the input list after filtering.\n", log) | |
else: # Should only happen if initial list was empty, but covered by check at start | |
log = log_and_print("Error: No image files provided in the input list.\n", log) | |
return [], log | |
elif non_image_skipped: | |
log = log_and_print(f"Proceeding with {len(image_paths)} assumed image files (some non-images were skipped).\n", log) | |
else: | |
log = log_and_print(f"Detected {len(image_paths)} image inputs.\n", log) | |
except Exception as e: | |
log = log_and_print(f"Error detecting input file types: {e}\n{traceback.format_exc()}\n", log) | |
return [], log | |
# --- Process Based on Input Type --- | |
final_stitched_images_rgb = [] # List to hold RGB results for gallery | |
stitch_log = "" | |
if is_video_input: | |
# --- VIDEO PROCESSING --- | |
log = log_and_print("Starting incremental video frame stitching...\n", log) | |
progress(0.1, desc="Sampling & Stitching Video...") | |
# Ensure blend method string is lowercase for internal checks | |
blend_method_lower = blend_method_str.lower() if blend_method_str else "multi-band" | |
final_stitched_images_rgb, stitch_log = stitch_video_frames( | |
video_path, | |
crop_top_percent=crop_top_percent, | |
crop_bottom_percent=crop_bottom_percent, | |
enable_cropping=enable_cropping, # Post-stitch crop | |
strict_no_black_edges=strict_no_black_edges_input, | |
transform_model_str=transform_model_str, | |
blend_method=blend_method_lower, # linear or multi-band | |
enable_gain_compensation=enable_gain_compensation, | |
orb_nfeatures=orb_nfeatures, | |
match_ratio_thresh=match_ratio_thresh, | |
ransac_reproj_thresh=ransac_reproj_thresh, | |
max_distance_coeff=max_distance_coeff, | |
max_blending_width=max_blending_width, | |
max_blending_height=max_blending_height, | |
sample_interval_ms=sample_interval_ms, | |
max_composite_width=max_composite_width_video, | |
max_composite_height=max_composite_height_video, | |
blend_smooth_ksize=blend_smooth_ksize, | |
num_blend_levels=num_blend_levels, | |
progress=progress | |
) | |
elif len(image_paths) >= 2: | |
# --- IMAGE LIST PROCESSING --- | |
log = log_and_print("Reading and preparing images for list stitching...\n", log) | |
images_bgr_cropped = [] # Store potentially cropped BGR images | |
read_success = True | |
for i, img_path in enumerate(image_paths): | |
progress(i / len(image_paths) * 0.1, desc=f"Reading image {i+1}/{len(image_paths)}") # Small progress for reading | |
img = None | |
try: | |
n = np.fromfile(img_path, np.uint8) | |
if n.size > 0: | |
img = cv2.imdecode(n, cv2.IMREAD_UNCHANGED) | |
else: | |
log = log_and_print(f"Error: File is empty: {os.path.basename(img_path)}. Skipping.\n", log) | |
continue | |
if img is None: | |
raise ValueError("imdecode returned None") | |
except Exception as e_read: | |
log = log_and_print(f"Error reading image: {os.path.basename(img_path)}. Error: {e_read}. Skipping.\n", log) | |
if img is not None: del img | |
continue # Skip to the next image | |
# Convert to BGR | |
img_bgr = None | |
try: | |
if img.ndim == 2: | |
img_bgr = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) | |
elif img.ndim == 3 and img.shape[2] == 4: | |
img_bgr = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) | |
elif img.ndim == 3 and img.shape[2] == 3: | |
img_bgr = img # Already BGR, no copy needed yet | |
else: | |
log = log_and_print(f"Error: Invalid image shape {img.shape} for {os.path.basename(img_path)}. Skipping.\n", log) | |
del img | |
if 'img_bgr' in locals() and img_bgr is not None: del img_bgr | |
gc.collect() | |
continue # Skip to the next image | |
except cv2.error as e_cvt_color: | |
log = log_and_print(f"Error converting image color for {os.path.basename(img_path)}: {e_cvt_color}. Skipping.\n", log) | |
del img | |
if 'img_bgr' in locals() and img_bgr is not None: del img_bgr | |
gc.collect() | |
continue | |
# Release original read image if conversion happened | |
if img_bgr is not img: | |
del img | |
gc.collect() | |
# Apply Percentage Cropping | |
img_bgr_cropped_single = crop_image_by_percent(img_bgr, crop_top_percent, crop_bottom_percent) | |
# Release uncropped BGR version (unless it was the result of cropping) | |
if img_bgr_cropped_single is not img_bgr: | |
del img_bgr | |
gc.collect() | |
if img_bgr_cropped_single is None or img_bgr_cropped_single.size == 0: | |
log = log_and_print(f"Warning: Skipping image {os.path.basename(img_path)} because percentage cropping failed or resulted in empty image.\n", log) | |
if img_bgr_cropped_single is not None: del img_bgr_cropped_single | |
gc.collect() | |
continue # Skip to next image | |
images_bgr_cropped.append(img_bgr_cropped_single) | |
# log = log_and_print(f"Read and pre-cropped: {os.path.basename(img_path)} -> Shape: {img_bgr_cropped_single.shape}\n", log) # Can be verbose | |
if len(images_bgr_cropped) < 2: | |
stitch_log = log_and_print(f"Need at least two valid images after reading and pre-cropping ({len(images_bgr_cropped)} found) for list stitching.\n", log) # Append to main log | |
read_success = False # Indicate failure to proceed | |
else: | |
log = log_and_print(f"Proceeding with {len(images_bgr_cropped)} valid, pre-cropped images. Starting list stitching...\n", log) | |
progress(0.1, desc="Stitching Image List...") | |
# Ensure blend method string is lowercase for internal checks | |
blend_method_lower = blend_method_str.lower() if blend_method_str else "multi-band" | |
stitched_single_rgb, stitch_log_img = stitch_multiple_images( | |
images_bgr_cropped, # Pass the list of cropped images | |
stitcher_mode_str=stitcher_mode_str, | |
registration_resol=registration_resol, | |
seam_estimation_resol=seam_estimation_resol, | |
compositing_resol=compositing_resol, | |
wave_correction=wave_correction, | |
exposure_comp_type_str=exposure_comp_type_str, | |
enable_cropping=enable_cropping, # Post-stitch crop | |
strict_no_black_edges=strict_no_black_edges_input, | |
transform_model_str=transform_model_str, | |
blend_method=blend_method_lower, | |
enable_gain_compensation=enable_gain_compensation, | |
orb_nfeatures=orb_nfeatures, | |
match_ratio_thresh=match_ratio_thresh, | |
ransac_reproj_thresh=ransac_reproj_thresh, | |
max_distance_coeff=max_distance_coeff, | |
max_blending_width=max_blending_width, | |
max_blending_height=max_blending_height, | |
blend_smooth_ksize=blend_smooth_ksize, | |
num_blend_levels=num_blend_levels | |
) | |
stitch_log += stitch_log_img # Append log from stitching function | |
if stitched_single_rgb is not None: | |
final_stitched_images_rgb = [stitched_single_rgb] # Result is a list containing the single image | |
# Clean up loaded images for list mode after stitching attempt | |
if 'images_bgr_cropped' in locals(): | |
for img_del in images_bgr_cropped: | |
if img_del is not None: del img_del | |
del images_bgr_cropped | |
gc.collect() | |
elif len(image_paths) == 1: | |
# This case should have been handled by the input type detection, | |
# but add a message here just in case. | |
log = log_and_print("Error: Only one image file provided or detected. Need at least two for image list stitching.\n", log) | |
stitch_log = "" # No stitching attempted | |
else: | |
# This case means no valid input files were found or passed initial checks. | |
log = log_and_print("Error: Input must be a single video file or at least two image files. No valid input found.\n", log) | |
stitch_log = "" | |
final_log = log + stitch_log | |
if not final_stitched_images_rgb: | |
# Avoid duplicating error messages if log already indicates failure | |
if "Error:" not in final_log[-200:]: # Check last few lines for errors | |
final_log = log_and_print("\nNo stitched images were generated.", final_log) | |
# --- Saving Results to Temporary Files --- | |
output_file_paths = [] # List to store paths for the Gallery | |
temp_dir = None | |
if final_stitched_images_rgb: | |
try: | |
# Try to create a subdirectory within the default Gradio temp space if possible | |
gradio_temp_base = tempfile.gettempdir() | |
gradio_subdir = os.path.join(gradio_temp_base, 'gradio') # Default Gradio temp subdir name | |
# Check if we can write there, otherwise use default temp dir | |
target_temp_dir_base = gradio_subdir if os.path.exists(gradio_subdir) and os.access(gradio_subdir, os.W_OK) else gradio_temp_base | |
if not os.path.exists(target_temp_dir_base): | |
try: | |
os.makedirs(target_temp_dir_base) | |
except OSError as e_mkdir: | |
final_log = log_and_print(f"Warning: Could not create temp directory '{target_temp_dir_base}', using default. Error: {e_mkdir}\n", final_log) | |
target_temp_dir_base = tempfile.gettempdir() # Fallback to system default temp | |
temp_dir = tempfile.mkdtemp(prefix="stitch_run_", dir=target_temp_dir_base) | |
final_log = log_and_print(f"\nInfo: Saving output images to temporary directory: {temp_dir}\n", final_log) | |
for i, img_rgb in enumerate(final_stitched_images_rgb): | |
if img_rgb is None or img_rgb.size == 0: | |
final_log = log_and_print(f"Warning: Skipping saving image index {i} because it is None or empty.\n", final_log) | |
continue | |
filename = f"stitched_image_{i+1:03d}.png" | |
# Use os.path.join for cross-platform compatibility | |
full_path = os.path.join(temp_dir, filename) | |
img_bgr = None # Initialize for finally block | |
try: | |
img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR) | |
# Use imencode -> write pattern for better handling of paths/special chars | |
is_success, buf = cv2.imencode('.png', img_bgr) | |
if is_success: | |
with open(full_path, 'wb') as f: | |
f.write(buf) | |
# Use Gradio File obj or just path string? Gallery seems to prefer path strings. | |
output_file_paths.append((full_path, filename)) # Store the full path for Gradio Gallery | |
# final_log = log_and_print(f"Successfully saved: {filename}\n", final_log) # Can be verbose | |
else: | |
final_log = log_and_print(f"Warning: Failed to encode image for saving: {filename}\n", final_log) | |
except cv2.error as e_cvt_write: | |
final_log = log_and_print(f"Error converting or encoding image {filename}: {e_cvt_write}\n", final_log) | |
except IOError as e_io: | |
final_log = log_and_print(f"Error writing image file {filename} to {full_path}: {e_io}\n", final_log) | |
except Exception as e_write: | |
final_log = log_and_print(f"Unexpected error writing image {filename} to {full_path}: {e_write}\n", final_log) | |
finally: | |
if img_bgr is not None: del img_bgr | |
gc.collect() | |
except Exception as e_tempdir: | |
final_log = log_and_print(f"Error creating temporary directory or saving output: {e_tempdir}\n", final_log) | |
output_file_paths = [] # Fallback to empty list | |
# --- Final Cleanup of RGB images list --- | |
if 'final_stitched_images_rgb' in locals(): | |
for img_del in final_stitched_images_rgb: | |
if img_del is not None: del img_del | |
del final_stitched_images_rgb | |
gc.collect() | |
progress(1.0, desc="Finished!") | |
final_log = log_and_print("\nCleanup complete.", final_log) | |
# Return the LIST OF FILE PATHS for the Gallery, and the log | |
return output_file_paths, final_log | |
# --- Define Gradio Interface --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# OpenCV Image and Video Stitcher") | |
gr.Markdown( | |
"Upload multiple images (for list/panorama stitching) OR a single video file (for sequential frame stitching). " | |
"Video frames are sampled incrementally based on the interval. " | |
"Use Pre-Cropping to remove unwanted areas *before* stitching. Adjust other parameters and click 'Stitch'." | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
stitch_button = gr.Button("Stitch", variant="primary") | |
input_files = gr.File( | |
label="Upload Images or a Video", | |
# Common image and video types | |
file_types=["image", ".mp4", ".avi", ".mov", ".mkv", ".wmv", ".webm"], | |
file_count="multiple", | |
elem_id="input_files" | |
) | |
# --- Parameters Grouping --- | |
with gr.Accordion("Preprocessing Settings", open=True): | |
crop_top_percent = gr.Slider(0.0, 49.0, step=0.5, value=0.0, label="Crop Top %", | |
info="Percentage of height to remove from the TOP of each image/frame BEFORE stitching.") | |
crop_bottom_percent = gr.Slider(0.0, 49.0, step=0.5, value=0.0, label="Crop Bottom %", | |
info="Percentage of height to remove from the BOTTOM of each image/frame BEFORE stitching.") | |
with gr.Accordion("OpenCV Stitcher Settings (Image List Mode Only)", open=True): | |
stitcher_mode = gr.Radio(["SCANS", "PANORAMA", "DIRECT_PAIRWISE"], label="Stitcher Mode (Image List)", value="SCANS", | |
info=( | |
"Method for image list stitching. 'SCANS'/'PANORAMA': Use OpenCV's built-in Stitcher (optimized for translation/rotation). " | |
"'SCANS': Optimized for images primarily related by translation (like scanning documents or linear camera motion), potentially using simpler geometric models or assumptions internally. " | |
"'PANORAMA': Designed for images captured by rotating the camera around a central point. It uses full perspective transformations (Homography) to handle the complex geometric distortions typical in panoramic shots." | |
"'DIRECT_PAIRWISE': Skip OpenCV Stitcher and directly use sequential pairwise feature matching (same as video mode or fallback)." | |
) | |
) | |
registration_resol = gr.Slider(0.1, 1.0, step=0.05, value=0.6, label="Registration Resolution", | |
info="Scale factor for the image resolution used during feature detection and matching. Lower values (e.g., 0.6) are faster but may miss features in high-res images. 1.0 uses full resolution.") | |
seam_estimation_resol = gr.Slider(0.05, 1.0, step=0.05, value=0.1, label="Seam Estimation Resolution", | |
info="Scale factor for the image resolution used during seam finding (finding the optimal cut line). Lower values (e.g., 0.1) are much faster.") | |
compositing_resol = gr.Slider(-1.0, 1.0, step=0.1, value=-1.0, label="Compositing Resolution", | |
info="Scale factor for the image resolution used during the final blending stage. -1.0 uses the original source image resolution. Lower values reduce memory usage but might slightly blur the output.") | |
wave_correction = gr.Checkbox(value=False, label="Enable Wave Correction", | |
info="Attempts to correct perspective distortions (waviness) common in panoramas. Can increase processing time.") | |
exposure_comp_type = gr.Dropdown(["NO", "GAIN", "GAIN_BLOCKS"], value="GAIN_BLOCKS", label="Exposure Compensation", | |
info="Method used by the built-in stitcher to adjust brightness/contrast differences between images. 'GAIN_BLOCKS' is generally preferred for varying lighting.") | |
# --- Detailed Stitcher Settings (Used for Video, DIRECT_PAIRWISE, and Fallback) --- | |
with gr.Accordion("Pairwise Stitching Settings (Video / Direct / Fallback)", open=True): | |
transform_model = gr.Radio(["Homography", "Affine_Partial", "Affine_Full"], label="Pairwise Transform Model", value="Homography", # Default to Homography | |
info="Geometric model for pairwise alignment. 'Homography' handles perspective. 'Affine' (Partial/Full) handles translation, rotation, scale, shear (better for scans, less distortion risk). If stitching fails with one model, try another.") | |
blend_method = gr.Radio(["Linear", "Multi-Band"], label="Blending Method", value="Multi-Band", | |
info="Algorithm for smoothing seams in overlapping regions when using the detailed stitcher (for video or image list fallback). 'Multi-Band' is often better but slower.") | |
enable_gain_compensation = gr.Checkbox(value=True, label="Enable Gain Compensation", | |
info="Adjusts overall brightness difference *before* blending when using the detailed stitcher. Recommended.") | |
orb_nfeatures = gr.Slider(500, 10000, step=100, value=2000, label="ORB Features", | |
info="Maximum ORB keypoints detected per image/frame. Used by the detailed stitcher (for video or image list fallback).") | |
match_ratio_thresh = gr.Slider(0.5, 0.95, step=0.01, value=0.75, label="Match Ratio Threshold", | |
info="Lowe's ratio test threshold for filtering feature matches (lower = stricter). Used by the detailed stitcher (for video or image list fallback).") | |
ransac_reproj_thresh = gr.Slider(1.0, 10.0, step=0.1, value=5.0, label="RANSAC Reproj Threshold", | |
info="Maximum reprojection error (pixels) allowed for a match to be considered an inlier by RANSAC during transformation estimation. Lower values are stricter.") | |
max_distance_coeff = gr.Slider(0.1, 2.0, step=0.05, value=0.5, label="Max Distance Coeff", | |
info="Multiplier for image diagonal used to filter initial matches. Limits the pixel distance between matched keypoints (0.5 means half the diagonal).") | |
max_blending_width = gr.Number(value=10000, label="Max Blending Width", precision=0, | |
info="Limits the canvas width during the detailed pairwise blending step to prevent excessive memory usage. Relevant for the detailed stitcher.") | |
max_blending_height = gr.Number(value=10000, label="Max Blending Height", precision=0, | |
info="Limits the canvas height during the detailed pairwise blending step to prevent excessive memory usage. Relevant for the detailed stitcher.") | |
blend_smooth_ksize = gr.Number(value=15, label="Blend Smooth Kernel Size", precision=0, | |
info="Size of Gaussian kernel to smooth blend mask/weights. Must be POSITIVE ODD integer to enable smoothing (e.g., 5, 15, 21). Set to -1 or an even number to disable smoothing.") | |
num_blend_levels = gr.Slider(2, 7, step=1, value=4, label="Multi-Band Blend Levels", | |
info="Number of pyramid levels for Multi-Band blending. Fewer levels are faster but might have less smooth transitions.") | |
with gr.Accordion("Video Stitcher Settings", open=False): | |
sample_interval_ms = gr.Number(value=3000, label="Sample Interval (ms)", precision=0, | |
info="Time interval (in milliseconds) between sampled frames for video stitching. Smaller values sample more frames, increasing processing time but potentially improving tracking.") | |
max_composite_width_video = gr.Number(value=10000, label="Max Composite Width (Video)", precision=0, | |
info="Limits the width of the stitched output during video processing. If exceeded, the current result is saved and stitching restarts with the next frame. 0 = no limit.") | |
max_composite_height_video = gr.Number(value=10000, label="Max Composite Height (Video)", precision=0, | |
info="Limits the height of the stitched output during video processing. If exceeded, the current result is saved and stitching restarts with the next frame. 0 = no limit.") | |
with gr.Accordion("Postprocessing Settings", open=False): | |
enable_cropping = gr.Checkbox(value=True, label="Crop Black Borders (Post-Stitch)", | |
info="Automatically remove black border areas from the final stitched image(s) AFTER stitching.") | |
strict_no_black_edges_checkbox = gr.Checkbox(value=False, label="Strict No Black Edges (Post-Crop)", | |
info="If 'Crop Black Borders' is enabled, this forces removal of *any* remaining black pixels directly on the image edges after the main crop. Might slightly shrink the image further.") | |
with gr.Column(scale=1): | |
output_gallery = gr.Gallery( | |
label="Stitched Results", elem_id="output_gallery", object_fit="contain", type="filepath", rows=2, preview=True, height="auto", format="png", container=True) | |
output_log = gr.Textbox( | |
label="Status / Log", lines=20, interactive=False, show_copy_button=True) | |
# Define the list of inputs for the button click event | |
inputs=[ | |
input_files, | |
# Preprocessing | |
crop_top_percent, | |
crop_bottom_percent, | |
# OpenCV Stitcher (Image List) | |
stitcher_mode, # the selected string ("SCANS", "PANORAMA", or "DIRECT_PAIRWISE") | |
registration_resol, | |
seam_estimation_resol, | |
compositing_resol, | |
wave_correction, | |
exposure_comp_type, | |
# Postprocessing | |
enable_cropping, | |
strict_no_black_edges_checkbox, | |
# Detailed Stitcher Settings | |
transform_model, | |
blend_method, | |
enable_gain_compensation, | |
orb_nfeatures, | |
match_ratio_thresh, | |
ransac_reproj_thresh, | |
max_distance_coeff, | |
max_blending_width, | |
max_blending_height, | |
blend_smooth_ksize, | |
num_blend_levels, | |
# Video specific settings | |
sample_interval_ms, | |
max_composite_width_video, | |
max_composite_height_video | |
] | |
# Define examples (update to include the new transform_model parameter) | |
examples = [ | |
[ | |
["examples/Wetter-Panorama/Wetter-Panorama1[NIuO6hrFTrg].mp4"], | |
0, 20, | |
"DIRECT_PAIRWISE", 0.6, 0.1, -1, False, "GAIN_BLOCKS", | |
True, False, | |
"Homography", "Multi-Band", True, 5000, 0.5, 5.0, 0.5, 10000, 10000, 15, 4, | |
2500, 10000, 10000, | |
], | |
[ | |
["examples/Wetter-Panorama/Wetter-Panorama2[NIuO6hrFTrg].mp4"], | |
0, 20, | |
"DIRECT_PAIRWISE", 0.6, 0.1, -1, False, "GAIN_BLOCKS", | |
True, False, | |
"Homography", "Multi-Band", True, 5000, 0.5, 5.0, 0.5, 10000, 10000, 15, 4, | |
2500, 10000, 10000, | |
], | |
[ | |
["examples/NieRAutomata/nier2B_01.jpg", "examples/NieRAutomata/nier2B_02.jpg", "examples/NieRAutomata/nier2B_03.jpg", "examples/NieRAutomata/nier2B_04.jpg", "examples/NieRAutomata/nier2B_05.jpg", | |
"examples/NieRAutomata/nier2B_06.jpg", "examples/NieRAutomata/nier2B_07.jpg", "examples/NieRAutomata/nier2B_08.jpg", "examples/NieRAutomata/nier2B_09.jpg", "examples/NieRAutomata/nier2B_10.jpg", ], | |
0, 0, | |
"PANORAMA", 0.6, 0.1, -1, False, "GAIN_BLOCKS", | |
True, False, | |
"Homography", "Multi-Band", True, 5000, 0.5, 5.0, 0.5, 10000, 10000, 15, 4, | |
5000, 10000, 10000, | |
], | |
[ | |
["examples/cat/cat_left.jpg", "examples/cat/cat_right.jpg"], | |
0, 0, | |
"SCANS", 0.6, 0.1, -1, False, "GAIN_BLOCKS", | |
True, False, | |
"Affine_Partial", "Linear", True, 5000, 0.5, 5.0, 0.5, 10000, 10000, 15, 4, | |
5000, 10000, 10000, | |
], | |
[ | |
["examples/ギルドの受付嬢ですが/Girumasu_1.jpg", "examples/ギルドの受付嬢ですが/Girumasu_2.jpg", "examples/ギルドの受付嬢ですが/Girumasu_3.jpg"], | |
0, 0, | |
"PANORAMA", 0.6, 0.1, -1, False, "GAIN_BLOCKS", | |
True, False, | |
"Affine_Partial", "Linear", True, 5000, 0.65, 5.0, 0.5, 10000, 10000, 15, 4, | |
5000, 10000, 10000, | |
], | |
[ | |
["examples/photographs1/img1.jpg", "examples/photographs1/img2.jpg", "examples/photographs1/img3.jpg", "examples/photographs1/img4.jpg"], | |
0, 0, | |
"PANORAMA", 0.6, 0.1, -1, True, "GAIN_BLOCKS", | |
True, False, | |
"Homography", "Linear", True, 5000, 0.5, 5.0, 0.5, 10000, 10000, 15, 4, | |
5000, 10000, 10000, | |
] | |
] | |
gr.Examples(examples, inputs=inputs, label="Example Configurations") | |
# Connect button click to the function | |
stitch_button.click( | |
fn=run_stitching_interface, | |
inputs=inputs, | |
outputs=[output_gallery, output_log] | |
) | |
# --- Main Execution Block --- | |
if __name__ == "__main__": | |
print("Starting Gradio interface with selectable transformation model...") | |
# Enable queue for handling multiple requests and progress updates | |
demo.queue() | |
# Launch the interface | |
demo.launch(inbrowser=True) |