Spaces:
Running
Running
import torch | |
import numpy as np | |
import gradio as gr | |
import cv2 | |
import time | |
import os | |
from pathlib import Path | |
from PIL import Image | |
from threading import Thread | |
from queue import Queue | |
# Create cache directory for models | |
os.makedirs("models", exist_ok=True) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
# Load YOLOv5 Nano model | |
model_path = Path("models/yolov5n.pt") | |
if model_path.exists(): | |
print(f"Loading model from cache: {model_path}") | |
model = torch.hub.load("ultralytics/yolov5", "custom", path=str(model_path), source="local").to(device) | |
else: | |
print("Downloading YOLOv5n model and caching...") | |
model = torch.hub.load("ultralytics/yolov5", "yolov5n", pretrained=True).to(device) | |
torch.save(model.state_dict(), model_path) | |
# Optimize model for speed | |
model.conf = 0.25 # Slightly lower confidence threshold | |
model.iou = 0.45 # Better IoU threshold | |
model.classes = None | |
model.max_det = 100 # Limit maximum detections | |
if device.type == "cuda": | |
model.half() # Use FP16 precision | |
else: | |
torch.set_num_threads(os.cpu_count()) | |
model.eval() | |
# Pre-generate colors for bounding boxes | |
np.random.seed(42) | |
colors = np.random.randint(0, 255, size=(len(model.names), 3), dtype=np.uint8) | |
# Async video processing | |
def process_frame(model, frame_queue, result_queue): | |
while True: | |
if frame_queue.empty(): | |
time.sleep(0.001) | |
continue | |
frame_data = frame_queue.get() | |
if frame_data is None: # Signal to stop | |
result_queue.put(None) | |
break | |
frame, frame_index = frame_data | |
img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Use a smaller inference size for speed | |
results = model(img, size=384) # Reduced from 640 to 384 | |
detections = results.xyxy[0].cpu().numpy() | |
result_queue.put((frame, detections, frame_index)) | |
def process_video(video_path): | |
# Check if video_path is None or empty | |
if video_path is None or video_path == "": | |
return None | |
# Handle the case when Gradio passes a tuple (file, None) | |
if isinstance(video_path, tuple) and len(video_path) >= 1: | |
video_path = video_path[0] | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
return "Error: Could not open video file." | |
frame_width = int(cap.get(3)) | |
frame_height = int(cap.get(4)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
# Used h264 codec for better performance | |
fourcc = cv2.VideoWriter_fourcc(*'avc1') | |
output_path = "output_video.mp4" | |
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) | |
# Created queues for async processing | |
frame_queue = Queue(maxsize=10) | |
result_queue = Queue() | |
# Start processing thread | |
processing_thread = Thread(target=process_frame, args=(model, frame_queue, result_queue)) | |
processing_thread.daemon = True | |
processing_thread.start() | |
total_frames = 0 | |
start_time = time.time() | |
processing_started = False | |
frames_buffer = {} | |
next_frame_to_write = 0 | |
try: | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if not processing_started: | |
processing_started = True | |
start_time = time.time() | |
frame_queue.put((frame, total_frames)) | |
total_frames += 1 | |
# Process results if available | |
while not result_queue.empty(): | |
result = result_queue.get() | |
if result is None: | |
break | |
processed_frame, detections, frame_idx = result | |
frames_buffer[frame_idx] = (processed_frame, detections) | |
# Write frames in order | |
while next_frame_to_write in frames_buffer: | |
buffer_frame, buffer_detections = frames_buffer.pop(next_frame_to_write) | |
# Draw bounding boxes | |
for *xyxy, conf, cls in buffer_detections: | |
if conf < 0.35: # Additional filtering | |
continue | |
x1, y1, x2, y2 = map(int, xyxy) | |
class_id = int(cls) | |
color = colors[class_id].tolist() | |
cv2.rectangle(buffer_frame, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA) | |
label = f"{model.names[class_id]} {conf:.2f}" | |
# Black text with white outline for better visibility | |
cv2.putText(buffer_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, | |
0.7, (0, 0, 0), 2, cv2.LINE_AA) | |
# Calculate elapsed time and FPS | |
elapsed = time.time() - start_time | |
current_fps = next_frame_to_write / elapsed if elapsed > 0 else 0 | |
# Add FPS counter with black text | |
cv2.putText(buffer_frame, f"FPS: {current_fps:.2f}", (20, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) | |
out.write(buffer_frame) | |
next_frame_to_write += 1 | |
# Signal thread to finish and process remaining frames | |
frame_queue.put(None) | |
# Process remaining buffered frames | |
while True: | |
if result_queue.empty(): | |
time.sleep(0.01) | |
continue | |
result = result_queue.get() | |
if result is None: | |
break | |
processed_frame, detections, frame_idx = result | |
frames_buffer[frame_idx] = (processed_frame, detections) | |
# Write remaining frames in order | |
while next_frame_to_write in frames_buffer: | |
buffer_frame, buffer_detections = frames_buffer.pop(next_frame_to_write) | |
# Draw bounding boxes | |
for *xyxy, conf, cls in buffer_detections: | |
if conf < 0.35: | |
continue | |
x1, y1, x2, y2 = map(int, xyxy) | |
class_id = int(cls) | |
color = colors[class_id].tolist() | |
cv2.rectangle(buffer_frame, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA) | |
label = f"{model.names[class_id]} {conf:.2f}" | |
cv2.putText(buffer_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, | |
0.7, (0, 0, 0), 2, cv2.LINE_AA) | |
# Add FPS counter | |
elapsed = time.time() - start_time | |
current_fps = next_frame_to_write / elapsed if elapsed > 0 else 0 | |
cv2.putText(buffer_frame, f"FPS: {current_fps:.2f}", (20, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) | |
out.write(buffer_frame) | |
next_frame_to_write += 1 | |
finally: | |
cap.release() | |
out.release() | |
return output_path | |
def process_image(image): | |
if image is None: | |
return None | |
img = np.array(image) | |
# Process with smaller size for speed | |
results = model(img, size=512) | |
detections = results.pred[0].cpu().numpy() | |
for *xyxy, conf, cls in detections: | |
x1, y1, x2, y2 = map(int, xyxy) | |
class_id = int(cls) | |
color = colors[class_id].tolist() | |
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA) | |
label = f"{model.names[class_id]} {conf:.2f}" | |
# Black text | |
cv2.putText(img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2, cv2.LINE_AA) | |
return Image.fromarray(img) | |
css = """ | |
#title { | |
text-align: center; | |
color: #2C3E50; | |
font-size: 2.5rem; | |
margin: 1.5rem 0; | |
text-shadow: 1px 1px 2px rgba(0,0,0,0.1); | |
} | |
.gradio-container { | |
background-color: #F5F7FA; | |
} | |
.tab-item { | |
background-color: white; | |
border-radius: 10px; | |
padding: 20px; | |
box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
margin: 10px; | |
} | |
.button-row { | |
display: flex; | |
justify-content: space-around; | |
margin: 1rem 0; | |
} | |
#video-process-btn, #submit-btn { | |
background-color: #3498DB; | |
border: none; | |
} | |
#clear-btn { | |
background-color: #E74C3C; | |
border: none; | |
} | |
.output-container { | |
margin-top: 1.5rem; | |
border: 2px dashed #3498DB; | |
border-radius: 10px; | |
padding: 10px; | |
} | |
.footer { | |
text-align: center; | |
margin-top: 2rem; | |
font-size: 0.9rem; | |
color: #7F8C8D; | |
} | |
""" | |
with gr.Blocks(css=css, title="Video & Image Object Detection by YOLOv5") as demo: | |
gr.Markdown("""# YOLOv5 Object Detection""", elem_id="title") | |
with gr.Tabs(): | |
with gr.TabItem("Video Detection", elem_classes="tab-item"): | |
with gr.Row(): | |
video_input = gr.Video( | |
label="Upload Video", | |
interactive=True, | |
elem_id="video-input" | |
) | |
with gr.Row(elem_classes="button-row"): | |
process_button = gr.Button( | |
"Process Video", | |
variant="primary", | |
elem_id="video-process-btn" | |
) | |
with gr.Row(elem_classes="output-container"): | |
video_output = gr.Video( | |
label="Processed Video", | |
elem_id="video-output" | |
) | |
process_button.click( | |
fn=process_video, | |
inputs=video_input, | |
outputs=video_output | |
) | |
with gr.TabItem("Image Detection", elem_classes="tab-item"): | |
with gr.Row(): | |
image_input = gr.Image( | |
type="pil", | |
label="Upload Image", | |
interactive=True | |
) | |
with gr.Row(elem_classes="button-row"): | |
clear_button = gr.Button( | |
"Clear", | |
variant="secondary", | |
elem_id="clear-btn" | |
) | |
submit_button = gr.Button( | |
"Detect Objects", | |
variant="primary", | |
elem_id="submit-btn" | |
) | |
with gr.Row(elem_classes="output-container"): | |
image_output = gr.Image( | |
label="Detected Objects", | |
elem_id="image-output" | |
) | |
clear_button.click( | |
fn=lambda: None, | |
inputs=None, | |
outputs=image_output | |
) | |
submit_button.click( | |
fn=process_image, | |
inputs=image_input, | |
outputs=image_output | |
) | |
gr.Markdown(""" | |
### Powered by YOLOv5. | |
This application enables seamless object detection using the YOLOv5 model, allowing users to analyze images and videos with high accuracy and efficiency. | |
""", elem_classes="footer") | |
if __name__ == "__main__": | |
demo.launch() |