from ultralytics import YOLO import cv2 from PIL import Image import time import numpy as np import uuid import spaces model = YOLO("model/yolo11n_6-2-25.pt") SUBSAMPLE = 2 def draw_boxes(frame, results): for r in results: boxes = r.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0] x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 3) cls = r.names[box.cls[0].item()] # object details org = [x1, y1] font = cv2.FONT_HERSHEY_SIMPLEX fontScale = 1 color = (255, 0, 0) thickness = 2 cv2.putText(frame, cls, org, font, fontScale, color, thickness) return frame #@spaces.GPU def video_detection(cap): video_codec = cv2.VideoWriter_fourcc(*"mp4v") # type: ignore #video_codec = cv2.VideoWriter_fourcc(*'h264') fps = int(cap.get(cv2.CAP_PROP_FPS)) #desired_fps = fps // SUBSAMPLE width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) // 2 height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) // 2 iterating, frame = cap.read() n_frames = 0 n_chunks = 0 #name = f"output_{uuid.uuid4()}.mp4" name = f"output_{n_chunks}.mp4" #if stream_as_mp4 else '.ts'} segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore batch = [] while iterating: frame = cv2.resize( frame, (0,0), fx=0.5, fy=0.5) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) #if n_frames % SUBSAMPLE == 0: #batch.append(frame) #if len(batch) == 2 * desired_fps: #if len(batch) == 4: #inputs = image_processor(images=batch, return_tensors="pt").to("cuda") print(f"starting batch of size {len(batch)}") start = time.time() #with torch.no_grad(): # outputs = model(**inputs) results = model(frame, stream=True) end = time.time() print("time taken for inference", end - start) start = time.time() #boxes = image_processor.post_process_object_detection( # outputs, # target_sizes=torch.tensor([(height, width)] * len(batch)), # threshold=conf_threshold) """ for i, (array, box) in enumerate(zip(batch, boxes)): pil_image = draw_bounding_boxes(Image.fromarray(array), box, model, conf_threshold) frame = np.array(pil_image) # Convert RGB to BGR frame = frame[:, :, ::-1].copy() segment_file.write(frame) """ frame = draw_boxes(frame, results) segment_file.write(frame) """ for i, r in enumerate(results): # Plot results image im_bgr = r.plot() # BGR-order numpy array im_rgb = Image.fromarray(im_bgr[..., ::-1]) # RGB-order PIL image frame = np.array(im_rgb) # Convert RGB to BGR frame = frame[:, :, ::-1].copy() segment_file.write(frame) """ if n_frames == 16: n_chunks += 1 segment_file.release() n_frames = 0 yield frame, name #name = f"output_{n_chunks}{'.mp4' if stream_as_mp4 else '.ts'}" name = f"output_{n_chunks}.mp4" segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore else: yield frame, None #batch = [] #segment_file.release() #yield None, name #end = time.time() #print("time taken for processing boxes", end - start) #name = f"output_{uuid.uuid4()}.mp4" #segment_file = cv2.VideoWriter(name, video_codec, fps, (width, height)) # type: ignore iterating, frame = cap.read() n_frames += 1 cap.release() segment_file.release() cv2.destroyAllWindows() yield None, name """ #@spaces.GPU def video_detection(cap): frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) out = cv2.VideoWriter('output_video.mp4', cv2.VideoWriter_fourcc(*'h264'), fps, (frame_width, frame_height)) count = 0 while cap.isOpened(): success, frame = cap.read() if not success: break #results = model(frame, stream=True, device='cuda', verbose=False) results = model(frame, stream=True) frame = draw_boxes(frame, results) out.write(frame) #if not count % 10: yield frame, None # print(count) count += 1 cap.release() out.release() cv2.destroyAllWindows() yield None, 'output_video.mp4' """