Spaces:
Running
Running
import torch | |
import numpy as np | |
import gradio as gr | |
import cv2 | |
import time | |
import os | |
import threading | |
from queue import Queue | |
from pathlib import Path | |
# Create cache directory for models | |
os.makedirs("models", exist_ok=True) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
model_path = Path("models/yolov5x.pt") | |
if model_path.exists(): | |
print(f"Loading model from cache: {model_path}") | |
model = torch.hub.load("ultralytics/yolov5", "yolov5x", pretrained=True, source="local", path=str(model_path)).to(device) | |
else: | |
print("Downloading YOLOv5x model and caching...") | |
model = torch.hub.load("ultralytics/yolov5", "yolov5x", pretrained=True).to(device) | |
torch.save(model.state_dict(), model_path) | |
# Model configurations for better performance | |
model.conf = 0.5 # Slightly lower confidence threshold for real-time | |
model.iou = 0.45 # Slightly lower IOU threshold for real-time | |
model.classes = None # Detect all classes | |
model.max_det = 20 # Limit detections for speed | |
if device.type == "cuda": | |
model.half() # Half precision for CUDA | |
else: | |
torch.set_num_threads(os.cpu_count()) | |
model.eval() | |
# Precompute colors for bounding boxes | |
np.random.seed(42) | |
colors = np.random.uniform(0, 255, size=(len(model.names), 3)) | |
# Performance tracking | |
total_inference_time = 0 | |
inference_count = 0 | |
fps_queue = Queue(maxsize=30) # Store last 30 FPS values for smoothing | |
# Threading variables | |
processing_lock = threading.Lock() | |
stop_event = threading.Event() | |
frame_queue = Queue(maxsize=2) # Small queue to avoid lag | |
result_queue = Queue(maxsize=2) | |
def detect_objects(image): | |
"""Process a single image for object detection""" | |
global total_inference_time, inference_count | |
if image is None: | |
return None | |
start_time = time.time() | |
output_image = image.copy() | |
input_size = 640 | |
# Optimize input for inference | |
with torch.no_grad(): | |
results = model(image, size=input_size) | |
inference_time = time.time() - start_time | |
total_inference_time += inference_time | |
inference_count += 1 | |
avg_inference_time = total_inference_time / inference_count | |
detections = results.pred[0].cpu().numpy() | |
# Draw detections | |
for *xyxy, conf, cls in detections: | |
x1, y1, x2, y2 = map(int, xyxy) | |
class_id = int(cls) | |
color = colors[class_id].tolist() | |
# Bounding box | |
cv2.rectangle(output_image, (x1, y1), (x2, y2), color, 3, lineType=cv2.LINE_AA) | |
# Label with class name and confidence | |
label = f"{model.names[class_id]} {conf:.2f}" | |
font_scale, font_thickness = 0.9, 2 | |
(w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness) | |
cv2.rectangle(output_image, (x1, y1 - h - 10), (x1 + w + 10, y1), color, -1) | |
cv2.putText(output_image, label, (x1 + 5, y1 - 5), | |
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness, lineType=cv2.LINE_AA) | |
fps = 1 / inference_time | |
# Stylish FPS display | |
overlay = output_image.copy() | |
cv2.rectangle(overlay, (10, 10), (300, 80), (0, 0, 0), -1) | |
output_image = cv2.addWeighted(overlay, 0.6, output_image, 0.4, 0) | |
cv2.putText(output_image, f"FPS: {fps:.2f}", (20, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, lineType=cv2.LINE_AA) | |
cv2.putText(output_image, f"Avg FPS: {1/avg_inference_time:.2f}", (20, 70), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, lineType=cv2.LINE_AA) | |
return output_image | |
def process_frame_thread(): | |
"""Background thread for processing frames""" | |
while not stop_event.is_set(): | |
if not frame_queue.empty(): | |
frame = frame_queue.get() | |
# Skip if there's a processing lock (from image upload) | |
if processing_lock.locked(): | |
result_queue.put(frame) # Return unprocessed frame | |
continue | |
# Process the frame | |
with torch.no_grad(): # Ensure no gradients for inference | |
input_size = 384 # Smaller size for real-time processing | |
results = model(frame, size=input_size) | |
# Calculate FPS | |
inference_time = time.time() - frame.get('timestamp', time.time()) | |
current_fps = 1 / inference_time if inference_time > 0 else 30 | |
# Update rolling FPS average | |
fps_queue.put(current_fps) | |
avg_fps = sum(list(fps_queue.queue)) / fps_queue.qsize() | |
# Draw detections | |
output = frame['image'].copy() | |
detections = results.pred[0].cpu().numpy() | |
for *xyxy, conf, cls in detections: | |
x1, y1, x2, y2 = map(int, xyxy) | |
class_id = int(cls) | |
color = colors[class_id].tolist() | |
# Draw rectangle and label | |
cv2.rectangle(output, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA) | |
label = f"{model.names[class_id]} {conf:.2f}" | |
font_scale, font_thickness = 0.6, 1 # Smaller for real-time | |
(w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness) | |
cv2.rectangle(output, (x1, y1 - h - 5), (x1 + w + 5, y1), color, -1) | |
cv2.putText(output, label, (x1 + 3, y1 - 3), | |
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness, lineType=cv2.LINE_AA) | |
# Add FPS counter | |
cv2.rectangle(output, (10, 10), (210, 80), (0, 0, 0), -1) | |
cv2.putText(output, f"FPS: {current_fps:.1f}", (20, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, lineType=cv2.LINE_AA) | |
cv2.putText(output, f"Avg FPS: {avg_fps:.1f}", (20, 70), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, lineType=cv2.LINE_AA) | |
# Put the processed frame in the result queue | |
result_queue.put({'image': output, 'fps': current_fps}) | |
else: | |
time.sleep(0.001) # Small sleep to prevent CPU spinning | |
def webcam_feed(): | |
"""Generator function for webcam feed""" | |
# Start the processing thread if not already running | |
if not any(thread.name == "frame_processor" for thread in threading.enumerate()): | |
stop_event.clear() | |
processor = threading.Thread(target=process_frame_thread, name="frame_processor", daemon=True) | |
processor.start() | |
# Open webcam | |
cap = cv2.VideoCapture(0) | |
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) | |
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
try: | |
while True: | |
success, frame = cap.read() | |
if not success: | |
break | |
# Put frame in queue for processing | |
if not frame_queue.full(): | |
frame_queue.put({'image': frame, 'timestamp': time.time()}) | |
# Get processed frame from result queue | |
if not result_queue.empty(): | |
result = result_queue.get() | |
yield result['image'] | |
else: | |
# If no processed frame is available, yield the raw frame | |
yield frame | |
# Control frame rate to not overwhelm the system | |
time.sleep(0.01) | |
finally: | |
cap.release() | |
def process_uploaded_image(image): | |
"""Process an uploaded image (this will be separate from real-time)""" | |
with processing_lock: # Acquire lock to pause real-time processing | |
return detect_objects(image) | |
# Setup Gradio interface | |
example_images = ["spring_street_after.jpg", "pexels-hikaique-109919.jpg"] | |
os.makedirs("examples", exist_ok=True) | |
with gr.Blocks(title="YOLOv5 Object Detection - Real-time & Image Upload") as demo: | |
gr.Markdown(""" | |
# YOLOv5 Object Detection | |
## Real-time webcam detection and image upload processing | |
""") | |
with gr.Tabs(): | |
with gr.TabItem("Real-time Detection"): | |
gr.Markdown(""" | |
### Real-time Object Detection | |
Using your webcam for continuous object detection at 30+ FPS. | |
""") | |
webcam_output = gr.Image(label="Real-time Detection", type="numpy") | |
with gr.TabItem("Image Upload"): | |
gr.Markdown(""" | |
### Image Upload Detection | |
Upload an image to detect objects. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
input_image = gr.Image(label="Input Image", type="numpy") | |
submit_button = gr.Button("Submit", variant="primary") | |
clear_button = gr.Button("Clear") | |
with gr.Column(scale=1): | |
output_image = gr.Image(label="Detected Objects", type="numpy") | |
gr.Examples( | |
examples=example_images, | |
inputs=input_image, | |
outputs=output_image, | |
fn=process_uploaded_image, | |
cache_examples=True | |
) | |
# Set up event handlers | |
submit_button.click(fn=process_uploaded_image, inputs=input_image, outputs=output_image) | |
clear_button.click(lambda: (None, None), None, [input_image, output_image]) | |
# Connect webcam feed | |
demo.load(lambda: None, None, webcam_output, _js=""" | |
() => { | |
// Keep the webcam tab refreshing at high frequency | |
setInterval(() => { | |
if (document.querySelector('.tabitem:first-child').style.display !== 'none') { | |
const webcamImg = document.querySelector('.tabitem:first-child img'); | |
if (webcamImg) { | |
const src = webcamImg.src; | |
webcamImg.src = src.includes('?') ? src.split('?')[0] + '?t=' + Date.now() : src + '?t=' + Date.now(); | |
} | |
} | |
}, 33); // ~30 FPS refresh rate | |
return []; | |
} | |
""") | |
# Start webcam feed | |
webcam_output.update(webcam_feed) | |
# Cleanup function to stop threads when app closes | |
def cleanup(): | |
stop_event.set() | |
print("Cleaning up threads...") | |
demo.close = cleanup | |
demo.launch() |