import cv2 import numpy as np import os from ultralytics import YOLO import time from typing import Tuple, Set def detection(path: str) -> Tuple[Set[str], str]: """ Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video. Args: path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.) Returns: Tuple[Set[str], str]: - Set of unique detected object labels (e.g., {'Gun', 'Knife'}) - Path to the output annotated video with detection boxes and tracking IDs Raises: FileNotFoundError: If input video doesn't exist ValueError: If video cannot be opened/processed """ # Validate input file exists if not os.path.exists(path): raise FileNotFoundError(f"Video file not found: {path}") # Initialize YOLOv8 model with pretrained weights # Model is trained to detect: ['Fire', 'Gun', 'License_Plate', 'Smoke', 'knife'] model = YOLO(os.path.join(os.path.dirname(__file__), "yolo", "best.pt")) class_names = model.names # Get class label mappings # Set up output paths: # 1. Temporary output during processing # 2. Final output with detected objects in filename input_video_name = os.path.basename(path) base_name = os.path.splitext(input_video_name)[0] temp_output_name = f"{base_name}_output_temp.mp4" output_dir = "results" os.makedirs(output_dir, exist_ok=True) # Create output dir if needed if not os.path.exists(output_dir): raise ValueError(f"Failed to create output directory: {output_dir}") temp_output_path = os.path.join(output_dir, temp_output_name) # Video processing setup: # - Open input video stream # - Initialize output writer with MP4 codec cap = cv2.VideoCapture(path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {path}") # Process all frames at 640x640 resolution for consistency frame_width, frame_height = 640, 640 out = cv2.VideoWriter( temp_output_path, cv2.VideoWriter_fourcc(*'mp4v'), # MP4 codec 30.0, # Output FPS (frame_width, frame_height) ) # Main processing loop: # 1. Read each frame # 2. Run object detection + tracking # 3. Annotate frame with boxes and IDs # 4. Collect detected classes crimes = [] # Track all detected objects start = time.time() print(f"[INFO] Processing started at {start:.2f} seconds") while True: ret, frame = cap.read() if not ret: # End of video break # Resize and run detection + tracking frame = cv2.resize(frame, (frame_width, frame_height)) results = model.track( source=frame, conf=0.7, # Minimum confidence threshold persist=True # Enable tracking across frames ) # Annotate frame with boxes and tracking IDs annotated_frame = results[0].plot() # Record detected classes for box in results[0].boxes: cls = int(box.cls) crimes.append(class_names[cls]) out.write(annotated_frame) # Clean up video resources end = time.time() print(f"[INFO] Processing finished at {end:.2f} seconds") print(f"[INFO] Total execution time: {end - start:.2f} seconds") cap.release() out.release() # Generate final output filename containing detected object labels # Format: {original_name}_{detected_objects}_output.mp4 unique_crimes = set(crimes) crimes_str = "_".join(sorted(unique_crimes)).replace(" ", "_")[:50] # truncate if needed final_output_name = f"{base_name}_{crimes_str}_output.mp4" final_output_path = os.path.join(output_dir, final_output_name) # Rename the video file os.rename(temp_output_path, final_output_path) print(f"[INFO] Detected crimes: {unique_crimes}") print(f"[INFO] Annotated video saved at: {final_output_path}") return unique_crimes, final_output_path # # Entry point # path0 = input("Enter the local path to the video file to detect objects: ") # path = path0.strip('"') # Remove extra quotes if copied from Windows # print(f"[INFO] Loading video: {path}") # detection(path)