ai-detector / video_detection.py
aidas-79's picture
Update video_detection.py
949f30c verified
from ultralytics import YOLO
import cv2
from PIL import Image
import time
import numpy as np
import uuid
import spaces
model = YOLO("model/yolo11n_6-2-25.pt")
SUBSAMPLE = 2
def draw_boxes(frame, results):
for r in results:
boxes = r.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0]
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 3)
cls = r.names[box.cls[0].item()]
# object details
org = [x1, y1]
font = cv2.FONT_HERSHEY_SIMPLEX
fontScale = 1
color = (255, 0, 0)
thickness = 2
cv2.putText(frame, cls, org, font, fontScale, color, thickness)
return frame
#@spaces.GPU
def video_detection(cap):
video_codec = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore
#video_codec = cv2.VideoWriter_fourcc(*'h264')
fps = int(cap.get(cv2.CAP_PROP_FPS))
#desired_fps = fps // SUBSAMPLE
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) // 2
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) // 2
iterating, frame = cap.read()
n_frames = 0
n_chunks = 0
#name = f"output_{uuid.uuid4()}.mp4"
name = f"output_{n_chunks}.mp4" #if stream_as_mp4 else '.ts'}
segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore
batch = []
while iterating:
frame = cv2.resize( frame, (0,0), fx=0.5, fy=0.5)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#if n_frames % SUBSAMPLE == 0:
#batch.append(frame)
#if len(batch) == 2 * desired_fps:
#if len(batch) == 4:
#inputs = image_processor(images=batch, return_tensors="pt").to("cuda")
print(f"starting batch of size {len(batch)}")
start = time.time()
#with torch.no_grad():
# outputs = model(**inputs)
results = model(frame, stream=True)
end = time.time()
print("time taken for inference", end - start)
start = time.time()
#boxes = image_processor.post_process_object_detection(
# outputs,
# target_sizes=torch.tensor([(height, width)] * len(batch)),
# threshold=conf_threshold)
"""
for i, (array, box) in enumerate(zip(batch, boxes)):
pil_image = draw_bounding_boxes(Image.fromarray(array), box, model, conf_threshold)
frame = np.array(pil_image)
# Convert RGB to BGR
frame = frame[:, :, ::-1].copy()
segment_file.write(frame)
"""
frame = draw_boxes(frame, results)
segment_file.write(frame)
"""
for i, r in enumerate(results):
# Plot results image
im_bgr = r.plot() # BGR-order numpy array
im_rgb = Image.fromarray(im_bgr[..., ::-1]) # RGB-order PIL image
frame = np.array(im_rgb)
# Convert RGB to BGR
frame = frame[:, :, ::-1].copy()
segment_file.write(frame)
"""
if n_frames == 16:
n_chunks += 1
segment_file.release()
n_frames = 0
yield frame, name
#name = f"output_{n_chunks}{'.mp4' if stream_as_mp4 else '.ts'}"
name = f"output_{n_chunks}.mp4"
segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore
else:
yield frame, None
#batch = []
#segment_file.release()
#yield None, name
#end = time.time()
#print("time taken for processing boxes", end - start)
#name = f"output_{uuid.uuid4()}.mp4"
#segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore
iterating, frame = cap.read()
n_frames += 1
cap.release()
segment_file.release()
cv2.destroyAllWindows()
yield None, name
"""
#@spaces.GPU
def video_detection(cap):
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
out = cv2.VideoWriter('output_video.mp4', cv2.VideoWriter_fourcc(*'h264'), fps, (frame_width, frame_height))
count = 0
while cap.isOpened():
success, frame = cap.read()
if not success:
break
#results = model(frame, stream=True, device='cuda', verbose=False)
results = model(frame, stream=True)
frame = draw_boxes(frame, results)
out.write(frame)
#if not count % 10:
yield frame, None
# print(count)
count += 1
cap.release()
out.release()
cv2.destroyAllWindows()
yield None, 'output_video.mp4'
"""