File size: 8,872 Bytes
8de568b
6b881e8
 
 
 
 
8de568b
6b881e8
8de568b
6b881e8
8de568b
61bcdf4
6b881e8
8de568b
61bcdf4
6b881e8
8de568b
6b881e8
 
8de568b
 
 
 
6b881e8
8de568b
 
6b881e8
 
 
8de568b
 
b2f3c2c
8de568b
 
 
 
 
 
 
 
6b881e8
8de568b
 
6b881e8
 
8de568b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b881e8
8de568b
 
6b881e8
8de568b
6b881e8
 
 
 
8de568b
 
 
 
 
 
 
6b881e8
8de568b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b881e8
8de568b
 
6b881e8
 
 
8de568b
6b881e8
 
8de568b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61bcdf4
6b881e8
8de568b
 
 
 
 
6b881e8
8de568b
6b881e8
8de568b
 
6b881e8
 
8de568b
 
6b881e8
8de568b
 
 
 
 
 
 
 
 
 
 
 
6b881e8
 
8de568b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b881e8
 
8de568b
6b881e8
 
8de568b
61bcdf4
8de568b
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216

import cv2
import numpy as np
import os
from ultralytics import YOLO
import time
from typing import Tuple, Set, List

def detection(path: str) -> Tuple[Set[str], str]:
    """
    Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video.

    Args:
        path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.)

    Returns:
        Tuple[Set[str], str]:
            - Set of unique detected object labels (e.g., {'Gun', 'Knife'})
            - Path to the output annotated video with detection boxes and tracking IDs

    Raises:
        FileNotFoundError: If input video doesn't exist
        ValueError: If video cannot be opened/processed or output dir cannot be created
    """

    # Validate input file exists
    if not os.path.exists(path):
        raise FileNotFoundError(f"Video file not found: {path}")

    # --- Model Loading ---
    # Construct path relative to this script file
    model_path = os.path.join(os.path.dirname(__file__), "best.pt")
    if not os.path.exists(model_path):
         raise FileNotFoundError(f"YOLO model file not found at: {model_path}")
    try:
        model = YOLO(model_path)
        class_names = model.names  # Get class label mappings
        print(f"[INFO] YOLO model loaded from {model_path}. Class names: {class_names}")
    except Exception as e:
        raise ValueError(f"Failed to load YOLO model: {e}")


    # --- Output Path Setup ---
    input_video_name = os.path.basename(path)
    base_name = os.path.splitext(input_video_name)[0]
    # Sanitize basename to prevent issues with weird characters in filenames
    safe_base_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in base_name)

    # Define output directory relative to this script
    # In HF Spaces, this will be inside the container's file system
    output_dir = os.path.join(os.path.dirname(__file__), "results")
    temp_output_name = f"{safe_base_name}_output_temp.mp4"

    try:
        os.makedirs(output_dir, exist_ok=True)  # Create output dir if needed
        if not os.path.isdir(output_dir):
             raise ValueError(f"Path exists but is not a directory: {output_dir}")
    except OSError as e:
        raise ValueError(f"Failed to create or access output directory '{output_dir}': {e}")

    temp_output_path = os.path.join(output_dir, temp_output_name)
    print(f"[INFO] Temporary output will be saved to: {temp_output_path}")


    # --- Video Processing Setup ---
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise ValueError(f"Failed to open video file: {path}")

    # Get video properties for output writer
    # Use source FPS if available and reasonable, otherwise default to 30
    source_fps = cap.get(cv2.CAP_PROP_FPS)
    output_fps = source_fps if 10 <= source_fps <= 60 else 30.0

    # Process at a fixed resolution for consistency or use source resolution
    # Using fixed 640x640 as potentially used during training/fine-tuning
    frame_width, frame_height = 640, 640
    # OR use source resolution (might require adjusting YOLO parameters if model expects specific size)
    # frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    # frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    try:
        out = cv2.VideoWriter(
            temp_output_path,
            cv2.VideoWriter_fourcc(*'mp4v'),  # Use MP4 codec
            output_fps,
            (frame_width, frame_height)
        )
        if not out.isOpened():
            # Attempt alternative codec if mp4v fails (less common)
            print("[WARNING] mp4v codec failed, trying avc1...")
            out = cv2.VideoWriter(
                temp_output_path,
                cv2.VideoWriter_fourcc(*'avc1'),
                output_fps,
                (frame_width, frame_height)
            )
            if not out.isOpened():
                 raise ValueError("Failed to initialize VideoWriter with mp4v or avc1 codec.")

    except Exception as e:
        cap.release() # Release capture device before raising
        raise ValueError(f"Failed to create VideoWriter: {e}")


    # --- Main Processing Loop ---
    detected_classes: List[str] = []  # Track detected object class names
    start = time.time()
    frame_count = 0
    print(f"[INFO] Video processing started...")

    while True:
        ret, frame = cap.read()
        if not ret:  # End of video or read error
            break

        frame_count += 1
        # Resize frame BEFORE passing to model
        resized_frame = cv2.resize(frame, (frame_width, frame_height))

        try:
            # Run YOLOv8 detection and tracking on the resized frame
            results = model.track(
                source=resized_frame, # Use resized frame
                conf=0.7,              # Confidence threshold
                persist=True,          # Maintain track IDs across frames
                verbose=False          # Suppress Ultralytics console output per frame
            )

            # Check if results are valid and contain boxes
            if results and results[0] and results[0].boxes:
                # Annotate the RESIZED frame with bounding boxes and track IDs
                annotated_frame = results[0].plot() # plot() draws on the source image

                # Record detected class names for this frame
                for box in results[0].boxes:
                    if box.cls is not None: # Check if class ID is present
                        cls_id = int(box.cls[0]) # Get class index
                        if 0 <= cls_id < len(class_names):
                            detected_classes.append(class_names[cls_id])
                        else:
                            print(f"[WARNING] Detected unknown class ID: {cls_id}")
            else:
                 # If no detections, use the original resized frame for the output video
                 annotated_frame = resized_frame

            # Write the (potentially annotated) frame to the output video
            out.write(annotated_frame)

        except Exception as e:
             print(f"[ERROR] Error processing frame {frame_count}: {e}")
             # Write the unannotated frame to keep video timing consistent
             out.write(resized_frame)


    # --- Clean Up ---
    end = time.time()
    print(f"[INFO] Video processing finished. Processed {frame_count} frames.")
    print(f"[INFO] Total processing time: {end - start:.2f} seconds")
    cap.release()
    out.release()
    cv2.destroyAllWindows() # Close any OpenCV windows if they were opened


    # --- Final Output Renaming ---
    unique_detected_labels = set(detected_classes)
    # Create a short string from labels for the filename
    labels_str = "_".join(sorted(list(unique_detected_labels))).replace(" ", "_")
    # Limit length to avoid overly long filenames
    max_label_len = 50
    if len(labels_str) > max_label_len:
        labels_str = labels_str[:max_label_len] + "_etc"
    if not labels_str: # Handle case where nothing was detected
        labels_str = "no_detections"

    final_output_name = f"{safe_base_name}_{labels_str}_output.mp4"
    final_output_path = os.path.join(output_dir, final_output_name)

    # Ensure final path doesn't already exist (rename might fail otherwise)
    if os.path.exists(final_output_path):
        os.remove(final_output_path)

    try:
        # Rename the temporary file to the final name
        os.rename(temp_output_path, final_output_path)
        print(f"[INFO] Detected object labels: {unique_detected_labels}")
        print(f"[INFO] Annotated video saved successfully at: {final_output_path}")
    except OSError as e:
        print(f"[ERROR] Failed to rename {temp_output_path} to {final_output_path}: {e}")
        # Fallback: return the temp path if rename fails but file exists
        if os.path.exists(temp_output_path):
             print(f"[WARNING] Returning path to temporary file: {temp_output_path}")
             return unique_detected_labels, temp_output_path
        else:
             raise ValueError(f"Output video generation failed. No output file found.")


    return unique_detected_labels, final_output_path


# # Example usage (commented out for library use)
# if __name__ == "__main__":
#     test_video = input("Enter the local path to the video file: ").strip('"')
#     if os.path.exists(test_video):
#         try:
#             print(f"[INFO] Processing video: {test_video}")
#             labels, out_path = detection(test_video)
#             print(f"\nDetection Complete.")
#             print(f"Detected unique labels: {labels}")
#             print(f"Output video saved to: {out_path}")
#         except (FileNotFoundError, ValueError, Exception) as e:
#             print(f"\nAn error occurred: {e}")
#     else:
#         print(f"Error: Input video file not found - {test_video}")