import cv2 import numpy as np import os from ultralytics import YOLO import time from typing import Tuple, Set def detect_objects_in_video(path: str) -> Tuple[Set[str], str]: """ Detects and tracks objects in a video using a YOLOv8 model, saving an annotated output video. Args: path (str): Path to the input video file. Returns: Tuple[Set[str], str]: - Set of unique detected object labels (e.g., {'Gun', 'Knife'}) - Path to the output annotated video with detection boxes and tracking IDs """ if not os.path.exists(path): raise FileNotFoundError(f"Video file not found: {path}") # Load YOLOv8 model (adjust path if necessary) model = YOLO("yolo/best.pt") # Make sure this path is correct class_names = model.names # Output setup input_video_name = os.path.basename(path) base_name = os.path.splitext(input_video_name)[0] temp_output_name = f"{base_name}_output_temp.mp4" output_dir = "results" os.makedirs(output_dir, exist_ok=True) temp_output_path = os.path.join(output_dir, temp_output_name) # Video I/O setup cap = cv2.VideoCapture(path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {path}") frame_width, frame_height = 640, 640 out = cv2.VideoWriter( temp_output_path, cv2.VideoWriter_fourcc(*'mp4v'), 30.0, (frame_width, frame_height) ) detected_labels = set() start = time.time() print(f"[INFO] Processing started at {start:.2f} seconds") while True: ret, frame = cap.read() if not ret: break frame = cv2.resize(frame, (frame_width, frame_height)) # Run detection and tracking results = model.track( source=frame, conf=0.7, persist=True ) if results and hasattr(results[0], "plot"): annotated_frame = results[0].plot() out.write(annotated_frame) # Extract class labels if hasattr(results[0], "boxes"): for box in results[0].boxes: cls = int(box.cls) detected_labels.add(class_names[cls]) else: out.write(frame) end = time.time() cap.release() out.release() # Create final output filename crimes_str = "_".join(sorted(detected_labels)).replace(" ", "_")[:50] final_output_name = f"{base_name}_{crimes_str}_output.mp4" final_output_path = os.path.join(output_dir, final_output_name) os.rename(temp_output_path, final_output_path) print(f"[INFO] Processing finished at {end:.2f} seconds") print(f"[INFO] Total execution time: {end - start:.2f} seconds") print(f"[INFO] Detected crimes: {detected_labels}") print(f"[INFO] Annotated video saved at: {final_output_path}") return detected_labels, final_output_path # Example usage (uncomment to use as standalone script) # if __name__ == "__main__": # video_path = input("Enter the path to the video file: ").strip('"') # print(f"[INFO] Loading video: {video_path}") # detect_objects_in_video(video_path)