Spaces:
Runtime error
Runtime error
from ultralytics import YOLO | |
import cv2 | |
from PIL import Image | |
import time | |
import numpy as np | |
import uuid | |
import spaces | |
model = YOLO("model/yolo11n_6-2-25.pt") | |
SUBSAMPLE = 2 | |
def draw_boxes(frame, results): | |
for r in results: | |
boxes = r.boxes | |
for box in boxes: | |
x1, y1, x2, y2 = box.xyxy[0] | |
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) | |
cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 3) | |
cls = r.names[box.cls[0].item()] | |
# object details | |
org = [x1, y1] | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
fontScale = 1 | |
color = (255, 0, 0) | |
thickness = 2 | |
cv2.putText(frame, cls, org, font, fontScale, color, thickness) | |
return frame | |
#@spaces.GPU | |
def video_detection(cap): | |
video_codec = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore | |
#video_codec = cv2.VideoWriter_fourcc(*'h264') | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
#desired_fps = fps // SUBSAMPLE | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) // 2 | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) // 2 | |
iterating, frame = cap.read() | |
n_frames = 0 | |
n_chunks = 0 | |
#name = f"output_{uuid.uuid4()}.mp4" | |
name = f"output_{n_chunks}.mp4" #if stream_as_mp4 else '.ts'} | |
segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore | |
batch = [] | |
while iterating: | |
frame = cv2.resize( frame, (0,0), fx=0.5, fy=0.5) | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
#if n_frames % SUBSAMPLE == 0: | |
#batch.append(frame) | |
#if len(batch) == 2 * desired_fps: | |
#if len(batch) == 4: | |
#inputs = image_processor(images=batch, return_tensors="pt").to("cuda") | |
print(f"starting batch of size {len(batch)}") | |
start = time.time() | |
#with torch.no_grad(): | |
# outputs = model(**inputs) | |
results = model(frame, stream=True) | |
end = time.time() | |
print("time taken for inference", end - start) | |
start = time.time() | |
#boxes = image_processor.post_process_object_detection( | |
# outputs, | |
# target_sizes=torch.tensor([(height, width)] * len(batch)), | |
# threshold=conf_threshold) | |
""" | |
for i, (array, box) in enumerate(zip(batch, boxes)): | |
pil_image = draw_bounding_boxes(Image.fromarray(array), box, model, conf_threshold) | |
frame = np.array(pil_image) | |
# Convert RGB to BGR | |
frame = frame[:, :, ::-1].copy() | |
segment_file.write(frame) | |
""" | |
frame = draw_boxes(frame, results) | |
segment_file.write(frame) | |
""" | |
for i, r in enumerate(results): | |
# Plot results image | |
im_bgr = r.plot() # BGR-order numpy array | |
im_rgb = Image.fromarray(im_bgr[..., ::-1]) # RGB-order PIL image | |
frame = np.array(im_rgb) | |
# Convert RGB to BGR | |
frame = frame[:, :, ::-1].copy() | |
segment_file.write(frame) | |
""" | |
if n_frames == 16: | |
n_chunks += 1 | |
segment_file.release() | |
n_frames = 0 | |
yield frame, name | |
#name = f"output_{n_chunks}{'.mp4' if stream_as_mp4 else '.ts'}" | |
name = f"output_{n_chunks}.mp4" | |
segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore | |
else: | |
yield frame, None | |
#batch = [] | |
#segment_file.release() | |
#yield None, name | |
#end = time.time() | |
#print("time taken for processing boxes", end - start) | |
#name = f"output_{uuid.uuid4()}.mp4" | |
#segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore | |
iterating, frame = cap.read() | |
n_frames += 1 | |
cap.release() | |
segment_file.release() | |
cv2.destroyAllWindows() | |
yield None, name | |
""" | |
#@spaces.GPU | |
def video_detection(cap): | |
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
out = cv2.VideoWriter('output_video.mp4', cv2.VideoWriter_fourcc(*'h264'), fps, (frame_width, frame_height)) | |
count = 0 | |
while cap.isOpened(): | |
success, frame = cap.read() | |
if not success: | |
break | |
#results = model(frame, stream=True, device='cuda', verbose=False) | |
results = model(frame, stream=True) | |
frame = draw_boxes(frame, results) | |
out.write(frame) | |
#if not count % 10: | |
yield frame, None | |
# print(count) | |
count += 1 | |
cap.release() | |
out.release() | |
cv2.destroyAllWindows() | |
yield None, 'output_video.mp4' | |
""" | |