File size: 4,419 Bytes
da07a7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import cv2
import numpy as np
import os
from ultralytics import YOLO
import time
from typing import Tuple, Set

def detection(path: str) -> Tuple[Set[str], str]:
    """

    Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video.

    

    Args:

        path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.)

        

    Returns:

        Tuple[Set[str], str]: 

            - Set of unique detected object labels (e.g., {'Gun', 'Knife'})

            - Path to the output annotated video with detection boxes and tracking IDs

            

    Raises:

        FileNotFoundError: If input video doesn't exist

        ValueError: If video cannot be opened/processed

    """

    # Validate input file exists
    if not os.path.exists(path):
        raise FileNotFoundError(f"Video file not found: {path}")

    # Initialize YOLOv8 model with pretrained weights
    # Model is trained to detect: ['Fire', 'Gun', 'License_Plate', 'Smoke', 'knife']
    model = YOLO(os.path.join(os.path.dirname(__file__), "yolo", "best.pt"))
    class_names = model.names  # Get class label mappings

    # Set up output paths:
    # 1. Temporary output during processing
    # 2. Final output with detected objects in filename
    input_video_name = os.path.basename(path)
    base_name = os.path.splitext(input_video_name)[0]
    temp_output_name = f"{base_name}_output_temp.mp4"
    output_dir = "results"
    os.makedirs(output_dir, exist_ok=True)  # Create output dir if needed
    if not os.path.exists(output_dir):
        raise ValueError(f"Failed to create output directory: {output_dir}")
    temp_output_path = os.path.join(output_dir, temp_output_name)

    # Video processing setup:
    # - Open input video stream
    # - Initialize output writer with MP4 codec
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise ValueError(f"Failed to open video file: {path}")

    # Process all frames at 640x640 resolution for consistency
    frame_width, frame_height = 640, 640
    out = cv2.VideoWriter(
        temp_output_path, 
        cv2.VideoWriter_fourcc(*'mp4v'),  # MP4 codec
        30.0,  # Output FPS
        (frame_width, frame_height)
    )

    # Main processing loop:
    # 1. Read each frame
    # 2. Run object detection + tracking
    # 3. Annotate frame with boxes and IDs
    # 4. Collect detected classes
    crimes = []  # Track all detected objects
    start = time.time()
    print(f"[INFO] Processing started at {start:.2f} seconds")

    while True:
        ret, frame = cap.read()
        if not ret:  # End of video
            break

        # Resize and run detection + tracking
        frame = cv2.resize(frame, (frame_width, frame_height))
        results = model.track(
            source=frame,
            conf=0.7,  # Minimum confidence threshold
            persist=True  # Enable tracking across frames
        )

        # Annotate frame with boxes and tracking IDs
        annotated_frame = results[0].plot()
        
        # Record detected classes
        for box in results[0].boxes:
            cls = int(box.cls)
            crimes.append(class_names[cls])

        out.write(annotated_frame)

    # Clean up video resources
    end = time.time()
    print(f"[INFO] Processing finished at {end:.2f} seconds")
    print(f"[INFO] Total execution time: {end - start:.2f} seconds")
    cap.release()
    out.release()

    # Generate final output filename containing detected object labels
    # Format: {original_name}_{detected_objects}_output.mp4
    unique_crimes = set(crimes)
    crimes_str = "_".join(sorted(unique_crimes)).replace(" ", "_")[:50]  # truncate if needed
    final_output_name = f"{base_name}_{crimes_str}_output.mp4"
    final_output_path = os.path.join(output_dir, final_output_name)

    # Rename the video file
    os.rename(temp_output_path, final_output_path)

    print(f"[INFO] Detected crimes: {unique_crimes}")
    print(f"[INFO] Annotated video saved at: {final_output_path}")

    return unique_crimes, final_output_path


# # Entry point
# path0 = input("Enter the local path to the video file to detect objects: ")
# path = path0.strip('"')  # Remove extra quotes if copied from Windows
# print(f"[INFO] Loading video: {path}")
# detection(path)