import cv2 import numpy as np import os from ultralytics import YOLO import time from typing import Tuple, Set, List def detection(path: str) -> Tuple[Set[str], str]: """ Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video. Args: path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.) Returns: Tuple[Set[str], str]: - Set of unique detected object labels (e.g., {'Gun', 'Knife'}) - Path to the output annotated video with detection boxes and tracking IDs Raises: FileNotFoundError: If input video doesn't exist ValueError: If video cannot be opened/processed or output dir cannot be created """ # Validate input file exists if not os.path.exists(path): raise FileNotFoundError(f"Video file not found: {path}") # --- Model Loading --- # Construct path relative to this script file model_path = os.path.join(os.path.dirname(__file__), "best.pt") if not os.path.exists(model_path): raise FileNotFoundError(f"YOLO model file not found at: {model_path}") try: model = YOLO(model_path) class_names = model.names # Get class label mappings print(f"[INFO] YOLO model loaded from {model_path}. Class names: {class_names}") except Exception as e: raise ValueError(f"Failed to load YOLO model: {e}") # --- Output Path Setup --- input_video_name = os.path.basename(path) base_name = os.path.splitext(input_video_name)[0] # Sanitize basename to prevent issues with weird characters in filenames safe_base_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in base_name) # Define output directory relative to this script # In HF Spaces, this will be inside the container's file system output_dir = os.path.join(os.path.dirname(__file__), "results") temp_output_name = f"{safe_base_name}_output_temp.mp4" try: os.makedirs(output_dir, exist_ok=True) # Create output dir if needed if not os.path.isdir(output_dir): raise ValueError(f"Path exists but is not a directory: {output_dir}") except OSError as e: raise ValueError(f"Failed to create or access output directory '{output_dir}': {e}") temp_output_path = os.path.join(output_dir, temp_output_name) print(f"[INFO] Temporary output will be saved to: {temp_output_path}") # --- Video Processing Setup --- cap = cv2.VideoCapture(path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {path}") # Get video properties for output writer # Use source FPS if available and reasonable, otherwise default to 30 source_fps = cap.get(cv2.CAP_PROP_FPS) output_fps = source_fps if 10 <= source_fps <= 60 else 30.0 # Process at a fixed resolution for consistency or use source resolution # Using fixed 640x640 as potentially used during training/fine-tuning frame_width, frame_height = 640, 640 # OR use source resolution (might require adjusting YOLO parameters if model expects specific size) # frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) try: out = cv2.VideoWriter( temp_output_path, cv2.VideoWriter_fourcc(*'mp4v'), # Use MP4 codec output_fps, (frame_width, frame_height) ) if not out.isOpened(): # Attempt alternative codec if mp4v fails (less common) print("[WARNING] mp4v codec failed, trying avc1...") out = cv2.VideoWriter( temp_output_path, cv2.VideoWriter_fourcc(*'avc1'), output_fps, (frame_width, frame_height) ) if not out.isOpened(): raise ValueError("Failed to initialize VideoWriter with mp4v or avc1 codec.") except Exception as e: cap.release() # Release capture device before raising raise ValueError(f"Failed to create VideoWriter: {e}") # --- Main Processing Loop --- detected_classes: List[str] = [] # Track detected object class names start = time.time() frame_count = 0 print(f"[INFO] Video processing started...") while True: ret, frame = cap.read() if not ret: # End of video or read error break frame_count += 1 # Resize frame BEFORE passing to model resized_frame = cv2.resize(frame, (frame_width, frame_height)) try: # Run YOLOv8 detection and tracking on the resized frame results = model.track( source=resized_frame, # Use resized frame conf=0.7, # Confidence threshold persist=True, # Maintain track IDs across frames verbose=False # Suppress Ultralytics console output per frame ) # Check if results are valid and contain boxes if results and results[0] and results[0].boxes: # Annotate the RESIZED frame with bounding boxes and track IDs annotated_frame = results[0].plot() # plot() draws on the source image # Record detected class names for this frame for box in results[0].boxes: if box.cls is not None: # Check if class ID is present cls_id = int(box.cls[0]) # Get class index if 0 <= cls_id < len(class_names): detected_classes.append(class_names[cls_id]) else: print(f"[WARNING] Detected unknown class ID: {cls_id}") else: # If no detections, use the original resized frame for the output video annotated_frame = resized_frame # Write the (potentially annotated) frame to the output video out.write(annotated_frame) except Exception as e: print(f"[ERROR] Error processing frame {frame_count}: {e}") # Write the unannotated frame to keep video timing consistent out.write(resized_frame) # --- Clean Up --- end = time.time() print(f"[INFO] Video processing finished. Processed {frame_count} frames.") print(f"[INFO] Total processing time: {end - start:.2f} seconds") cap.release() out.release() cv2.destroyAllWindows() # Close any OpenCV windows if they were opened # --- Final Output Renaming --- unique_detected_labels = set(detected_classes) # Create a short string from labels for the filename labels_str = "_".join(sorted(list(unique_detected_labels))).replace(" ", "_") # Limit length to avoid overly long filenames max_label_len = 50 if len(labels_str) > max_label_len: labels_str = labels_str[:max_label_len] + "_etc" if not labels_str: # Handle case where nothing was detected labels_str = "no_detections" final_output_name = f"{safe_base_name}_{labels_str}_output.mp4" final_output_path = os.path.join(output_dir, final_output_name) # Ensure final path doesn't already exist (rename might fail otherwise) if os.path.exists(final_output_path): os.remove(final_output_path) try: # Rename the temporary file to the final name os.rename(temp_output_path, final_output_path) print(f"[INFO] Detected object labels: {unique_detected_labels}") print(f"[INFO] Annotated video saved successfully at: {final_output_path}") except OSError as e: print(f"[ERROR] Failed to rename {temp_output_path} to {final_output_path}: {e}") # Fallback: return the temp path if rename fails but file exists if os.path.exists(temp_output_path): print(f"[WARNING] Returning path to temporary file: {temp_output_path}") return unique_detected_labels, temp_output_path else: raise ValueError(f"Output video generation failed. No output file found.") return unique_detected_labels, final_output_path # # Example usage (commented out for library use) # if __name__ == "__main__": # test_video = input("Enter the local path to the video file: ").strip('"') # if os.path.exists(test_video): # try: # print(f"[INFO] Processing video: {test_video}") # labels, out_path = detection(test_video) # print(f"\nDetection Complete.") # print(f"Detected unique labels: {labels}") # print(f"Output video saved to: {out_path}") # except (FileNotFoundError, ValueError, Exception) as e: # print(f"\nAn error occurred: {e}") # else: # print(f"Error: Input video file not found - {test_video}")