Spaces:
Sleeping
Sleeping
import cv2 | |
import numpy as np | |
import torch | |
import base64 | |
import asyncio | |
from collections import deque | |
from ultralytics import YOLO | |
from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
import uvicorn | |
# Load YOLO model with optimized settings | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model = YOLO("yolov11s-face.pt").to(device) | |
# Constants for distance estimation | |
KNOWN_DISTANCE = 50 # cm | |
KNOWN_FACE_WIDTH = 14 # cm | |
REF_IMAGE_FACE_WIDTH = 120 # Reference face width in pixels at the known distance | |
FOCAL_LENGTH = (REF_IMAGE_FACE_WIDTH * KNOWN_DISTANCE) / KNOWN_FACE_WIDTH | |
SCALING_FACTOR = 2.0 # Adjust based on testing | |
# FastAPI initialization | |
app = FastAPI() | |
# Optimized tracking of previous detections using a deque | |
MAX_HISTORY = 10 | |
detected_people_history = deque(maxlen=MAX_HISTORY) | |
DISTANCE_THRESHOLD = 30 # cm | |
async def startup_event(): | |
print("\nπ WebSocket API is running on ws://0.0.0.0:7860/ws\n") | |
async def websocket_endpoint(websocket: WebSocket): | |
"""WebSocket for real-time face detection""" | |
await websocket.accept() | |
print("β Client connected") | |
try: | |
while True: | |
frame_data = await websocket.receive_text() | |
frame_bytes = base64.b64decode(frame_data) | |
image_np = np.frombuffer(frame_bytes, np.uint8) | |
frame = cv2.imdecode(image_np, cv2.IMREAD_COLOR) | |
# Resize frame for faster inference | |
h, w, _ = frame.shape | |
resized_frame = cv2.resize(frame, (w // 2, h // 2)) | |
# Run YOLO model | |
results = model(resized_frame, imgsz=320, half=True, verbose=False) | |
new_people_data = {} | |
change_detected = False | |
person_id = 1 | |
frame_width = resized_frame.shape[1] | |
for result in results: | |
for box in result.boxes.data.tolist(): | |
x1, y1, x2, y2, conf, _ = box[:6] | |
x1, y1, x2, y2 = map(int, [x1 * 2, y1 * 2, x2 * 2, y2 * 2]) | |
if conf > 0.5: | |
center_x = (x1 + x2) // 2 | |
face_width_pixels = x2 - x1 | |
if center_x < frame_width // 3: | |
position = "Left" | |
elif center_x > 2 * frame_width // 3: | |
position = "Right" | |
else: | |
position = "Center" | |
estimated_distance = ( | |
(FOCAL_LENGTH * KNOWN_FACE_WIDTH) / face_width_pixels | |
) * SCALING_FACTOR if face_width_pixels > 0 else -1 | |
new_people_data[f"person{person_id}"] = { | |
"distance_cm": round(estimated_distance, 2), | |
"position": position, | |
} | |
if detected_people_history: | |
prev_data = detected_people_history[-1].get(f"person{person_id}") | |
if ( | |
not prev_data | |
or prev_data["position"] != position | |
or abs(prev_data["distance_cm"] - estimated_distance) > DISTANCE_THRESHOLD | |
): | |
change_detected = True | |
person_id += 1 | |
if not detected_people_history or len(new_people_data) != len(detected_people_history[-1]): | |
change_detected = True | |
if change_detected: | |
detected_people_history.append(new_people_data) | |
await websocket.send_json({"people": new_people_data}) | |
else: | |
await websocket.send_json({"people": []}) | |
await asyncio.sleep(0.05) | |
except WebSocketDisconnect: | |
print("β Client disconnected") | |
except Exception as e: | |
print(f"β οΈ Error: {e}") | |
if __name__ == "__main__": | |
print("\nπ₯ Starting WebSocket server...") | |
uvicorn.run(app, host="0.0.0.0", port=7860) |