import cv2 import numpy as np import torch import base64 import asyncio from collections import deque from ultralytics import YOLO from fastapi import FastAPI, WebSocket, WebSocketDisconnect import uvicorn # Load YOLO model with optimized settings device = "cuda" if torch.cuda.is_available() else "cpu" model = YOLO("yolov11s-face.pt").to(device) # Constants for distance estimation KNOWN_DISTANCE = 50 # cm KNOWN_FACE_WIDTH = 14 # cm REF_IMAGE_FACE_WIDTH = 120 # Reference face width in pixels at the known distance FOCAL_LENGTH = (REF_IMAGE_FACE_WIDTH * KNOWN_DISTANCE) / KNOWN_FACE_WIDTH SCALING_FACTOR = 2.0 # Adjust based on testing # FastAPI initialization app = FastAPI() # Optimized tracking of previous detections using a deque MAX_HISTORY = 10 detected_people_history = deque(maxlen=MAX_HISTORY) DISTANCE_THRESHOLD = 30 # cm @app.on_event("startup") async def startup_event(): print("\nšŸš€ WebSocket API is running on ws://0.0.0.0:7860/ws\n") @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket for real-time face detection""" await websocket.accept() print("āœ… Client connected") try: while True: frame_data = await websocket.receive_text() frame_bytes = base64.b64decode(frame_data) image_np = np.frombuffer(frame_bytes, np.uint8) frame = cv2.imdecode(image_np, cv2.IMREAD_COLOR) # Resize frame for faster inference h, w, _ = frame.shape resized_frame = cv2.resize(frame, (w // 2, h // 2)) # Run YOLO model results = model(resized_frame, imgsz=320, half=True, verbose=False) new_people_data = {} change_detected = False person_id = 1 frame_width = resized_frame.shape[1] for result in results: for box in result.boxes.data.tolist(): x1, y1, x2, y2, conf, _ = box[:6] x1, y1, x2, y2 = map(int, [x1 * 2, y1 * 2, x2 * 2, y2 * 2]) if conf > 0.5: center_x = (x1 + x2) // 2 face_width_pixels = x2 - x1 if center_x < frame_width // 3: position = "Left" elif center_x > 2 * frame_width // 3: position = "Right" else: position = "Center" estimated_distance = ( (FOCAL_LENGTH * KNOWN_FACE_WIDTH) / face_width_pixels ) * SCALING_FACTOR if face_width_pixels > 0 else -1 new_people_data[f"person{person_id}"] = { "distance_cm": round(estimated_distance, 2), "position": position, } if detected_people_history: prev_data = detected_people_history[-1].get(f"person{person_id}") if ( not prev_data or prev_data["position"] != position or abs(prev_data["distance_cm"] - estimated_distance) > DISTANCE_THRESHOLD ): change_detected = True person_id += 1 if not detected_people_history or len(new_people_data) != len(detected_people_history[-1]): change_detected = True if change_detected: detected_people_history.append(new_people_data) await websocket.send_json({"people": new_people_data}) else: await websocket.send_json({"people": []}) await asyncio.sleep(0.05) except WebSocketDisconnect: print("āŒ Client disconnected") except Exception as e: print(f"āš ļø Error: {e}") if __name__ == "__main__": print("\nšŸ”„ Starting WebSocket server...") uvicorn.run(app, host="0.0.0.0", port=7860)