import torch import numpy as np import gradio as gr import cv2 import time import os import threading from queue import Queue from pathlib import Path # Create cache directory for models os.makedirs("models", exist_ok=True) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") model_path = Path("models/yolov5x.pt") if model_path.exists(): print(f"Loading model from cache: {model_path}") model = torch.hub.load("ultralytics/yolov5", "yolov5x", pretrained=True, source="local", path=str(model_path)).to(device) else: print("Downloading YOLOv5x model and caching...") model = torch.hub.load("ultralytics/yolov5", "yolov5x", pretrained=True).to(device) torch.save(model.state_dict(), model_path) # Model configurations for better performance model.conf = 0.5 # Slightly lower confidence threshold for real-time model.iou = 0.45 # Slightly lower IOU threshold for real-time model.classes = None # Detect all classes model.max_det = 20 # Limit detections for speed if device.type == "cuda": model.half() # Half precision for CUDA else: torch.set_num_threads(os.cpu_count()) model.eval() # Precompute colors for bounding boxes np.random.seed(42) colors = np.random.uniform(0, 255, size=(len(model.names), 3)) # Performance tracking total_inference_time = 0 inference_count = 0 fps_queue = Queue(maxsize=30) # Store last 30 FPS values for smoothing # Threading variables processing_lock = threading.Lock() stop_event = threading.Event() frame_queue = Queue(maxsize=2) # Small queue to avoid lag result_queue = Queue(maxsize=2) def detect_objects(image): """Process a single image for object detection""" global total_inference_time, inference_count if image is None: return None start_time = time.time() output_image = image.copy() input_size = 640 # Optimize input for inference with torch.no_grad(): results = model(image, size=input_size) inference_time = time.time() - start_time total_inference_time += inference_time inference_count += 1 avg_inference_time = total_inference_time / inference_count detections = results.pred[0].cpu().numpy() # Draw detections for *xyxy, conf, cls in detections: x1, y1, x2, y2 = map(int, xyxy) class_id = int(cls) color = colors[class_id].tolist() # Bounding box cv2.rectangle(output_image, (x1, y1), (x2, y2), color, 3, lineType=cv2.LINE_AA) # Label with class name and confidence label = f"{model.names[class_id]} {conf:.2f}" font_scale, font_thickness = 0.9, 2 (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness) cv2.rectangle(output_image, (x1, y1 - h - 10), (x1 + w + 10, y1), color, -1) cv2.putText(output_image, label, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness, lineType=cv2.LINE_AA) fps = 1 / inference_time # Stylish FPS display overlay = output_image.copy() cv2.rectangle(overlay, (10, 10), (300, 80), (0, 0, 0), -1) output_image = cv2.addWeighted(overlay, 0.6, output_image, 0.4, 0) cv2.putText(output_image, f"FPS: {fps:.2f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, lineType=cv2.LINE_AA) cv2.putText(output_image, f"Avg FPS: {1/avg_inference_time:.2f}", (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, lineType=cv2.LINE_AA) return output_image def process_frame_thread(): """Background thread for processing frames""" while not stop_event.is_set(): if not frame_queue.empty(): frame = frame_queue.get() # Skip if there's a processing lock (from image upload) if processing_lock.locked(): result_queue.put(frame) # Return unprocessed frame continue # Process the frame with torch.no_grad(): # Ensure no gradients for inference input_size = 384 # Smaller size for real-time processing results = model(frame, size=input_size) # Calculate FPS inference_time = time.time() - frame.get('timestamp', time.time()) current_fps = 1 / inference_time if inference_time > 0 else 30 # Update rolling FPS average fps_queue.put(current_fps) avg_fps = sum(list(fps_queue.queue)) / fps_queue.qsize() # Draw detections output = frame['image'].copy() detections = results.pred[0].cpu().numpy() for *xyxy, conf, cls in detections: x1, y1, x2, y2 = map(int, xyxy) class_id = int(cls) color = colors[class_id].tolist() # Draw rectangle and label cv2.rectangle(output, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA) label = f"{model.names[class_id]} {conf:.2f}" font_scale, font_thickness = 0.6, 1 # Smaller for real-time (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness) cv2.rectangle(output, (x1, y1 - h - 5), (x1 + w + 5, y1), color, -1) cv2.putText(output, label, (x1 + 3, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness, lineType=cv2.LINE_AA) # Add FPS counter cv2.rectangle(output, (10, 10), (210, 80), (0, 0, 0), -1) cv2.putText(output, f"FPS: {current_fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, lineType=cv2.LINE_AA) cv2.putText(output, f"Avg FPS: {avg_fps:.1f}", (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, lineType=cv2.LINE_AA) # Put the processed frame in the result queue result_queue.put({'image': output, 'fps': current_fps}) else: time.sleep(0.001) # Small sleep to prevent CPU spinning def webcam_feed(): """Generator function for webcam feed""" # Start the processing thread if not already running if not any(thread.name == "frame_processor" for thread in threading.enumerate()): stop_event.clear() processor = threading.Thread(target=process_frame_thread, name="frame_processor", daemon=True) processor.start() # Open webcam cap = cv2.VideoCapture(0) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) try: while True: success, frame = cap.read() if not success: break # Put frame in queue for processing if not frame_queue.full(): frame_queue.put({'image': frame, 'timestamp': time.time()}) # Get processed frame from result queue if not result_queue.empty(): result = result_queue.get() yield result['image'] else: # If no processed frame is available, yield the raw frame yield frame # Control frame rate to not overwhelm the system time.sleep(0.01) finally: cap.release() def process_uploaded_image(image): """Process an uploaded image (this will be separate from real-time)""" with processing_lock: # Acquire lock to pause real-time processing return detect_objects(image) # Setup Gradio interface example_images = ["spring_street_after.jpg", "pexels-hikaique-109919.jpg"] os.makedirs("examples", exist_ok=True) with gr.Blocks(title="YOLOv5 Object Detection - Real-time & Image Upload") as demo: gr.Markdown(""" # YOLOv5 Object Detection ## Real-time webcam detection and image upload processing """) with gr.Tabs(): with gr.TabItem("Real-time Detection"): gr.Markdown(""" ### Real-time Object Detection Using your webcam for continuous object detection at 30+ FPS. """) webcam_output = gr.Image(label="Real-time Detection", type="numpy") with gr.TabItem("Image Upload"): gr.Markdown(""" ### Image Upload Detection Upload an image to detect objects. """) with gr.Row(): with gr.Column(scale=1): input_image = gr.Image(label="Input Image", type="numpy") submit_button = gr.Button("Submit", variant="primary") clear_button = gr.Button("Clear") with gr.Column(scale=1): output_image = gr.Image(label="Detected Objects", type="numpy") gr.Examples( examples=example_images, inputs=input_image, outputs=output_image, fn=process_uploaded_image, cache_examples=True ) # Set up event handlers submit_button.click(fn=process_uploaded_image, inputs=input_image, outputs=output_image) clear_button.click(lambda: (None, None), None, [input_image, output_image]) # Connect webcam feed demo.load(lambda: None, None, webcam_output, _js=""" () => { // Keep the webcam tab refreshing at high frequency setInterval(() => { if (document.querySelector('.tabitem:first-child').style.display !== 'none') { const webcamImg = document.querySelector('.tabitem:first-child img'); if (webcamImg) { const src = webcamImg.src; webcamImg.src = src.includes('?') ? src.split('?')[0] + '?t=' + Date.now() : src + '?t=' + Date.now(); } } }, 33); // ~30 FPS refresh rate return []; } """) # Start webcam feed webcam_output.update(webcam_feed) # Cleanup function to stop threads when app closes def cleanup(): stop_event.set() print("Cleaning up threads...") demo.close = cleanup demo.launch()