|
import cv2
|
|
import numpy as np
|
|
import os
|
|
from ultralytics import YOLO
|
|
import time
|
|
from typing import Tuple, Set
|
|
|
|
def detection(path: str) -> Tuple[Set[str], str]:
|
|
"""
|
|
Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video.
|
|
|
|
Args:
|
|
path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.)
|
|
|
|
Returns:
|
|
Tuple[Set[str], str]:
|
|
- Set of unique detected object labels (e.g., {'Gun', 'Knife'})
|
|
- Path to the output annotated video with detection boxes and tracking IDs
|
|
|
|
Raises:
|
|
FileNotFoundError: If input video doesn't exist
|
|
ValueError: If video cannot be opened/processed
|
|
"""
|
|
|
|
|
|
if not os.path.exists(path):
|
|
raise FileNotFoundError(f"Video file not found: {path}")
|
|
|
|
|
|
|
|
model = YOLO(os.path.join(os.path.dirname(__file__), "yolo", "best.pt"))
|
|
class_names = model.names
|
|
|
|
|
|
|
|
|
|
input_video_name = os.path.basename(path)
|
|
base_name = os.path.splitext(input_video_name)[0]
|
|
temp_output_name = f"{base_name}_output_temp.mp4"
|
|
output_dir = "results"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
if not os.path.exists(output_dir):
|
|
raise ValueError(f"Failed to create output directory: {output_dir}")
|
|
temp_output_path = os.path.join(output_dir, temp_output_name)
|
|
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(path)
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Failed to open video file: {path}")
|
|
|
|
|
|
frame_width, frame_height = 640, 640
|
|
out = cv2.VideoWriter(
|
|
temp_output_path,
|
|
cv2.VideoWriter_fourcc(*'mp4v'),
|
|
30.0,
|
|
(frame_width, frame_height)
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
crimes = []
|
|
start = time.time()
|
|
print(f"[INFO] Processing started at {start:.2f} seconds")
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
|
|
frame = cv2.resize(frame, (frame_width, frame_height))
|
|
results = model.track(
|
|
source=frame,
|
|
conf=0.7,
|
|
persist=True
|
|
)
|
|
|
|
|
|
annotated_frame = results[0].plot()
|
|
|
|
|
|
for box in results[0].boxes:
|
|
cls = int(box.cls)
|
|
crimes.append(class_names[cls])
|
|
|
|
out.write(annotated_frame)
|
|
|
|
|
|
end = time.time()
|
|
print(f"[INFO] Processing finished at {end:.2f} seconds")
|
|
print(f"[INFO] Total execution time: {end - start:.2f} seconds")
|
|
cap.release()
|
|
out.release()
|
|
|
|
|
|
|
|
unique_crimes = set(crimes)
|
|
crimes_str = "_".join(sorted(unique_crimes)).replace(" ", "_")[:50]
|
|
final_output_name = f"{base_name}_{crimes_str}_output.mp4"
|
|
final_output_path = os.path.join(output_dir, final_output_name)
|
|
|
|
|
|
os.rename(temp_output_path, final_output_path)
|
|
|
|
print(f"[INFO] Detected crimes: {unique_crimes}")
|
|
print(f"[INFO] Annotated video saved at: {final_output_path}")
|
|
|
|
return unique_crimes, final_output_path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|