Spaces:
Sleeping
Sleeping
File size: 4,112 Bytes
dea3c82 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import cv2
import numpy as np
import torch
import base64
import asyncio
from collections import deque
from ultralytics import YOLO
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
# Load YOLO model with optimized settings
device = "cuda" if torch.cuda.is_available() else "cpu"
model = YOLO("yolov11s-face.pt").to(device)
# Constants for distance estimation
KNOWN_DISTANCE = 50 # cm
KNOWN_FACE_WIDTH = 14 # cm
REF_IMAGE_FACE_WIDTH = 120 # Reference face width in pixels at the known distance
FOCAL_LENGTH = (REF_IMAGE_FACE_WIDTH * KNOWN_DISTANCE) / KNOWN_FACE_WIDTH
SCALING_FACTOR = 2.0 # Adjust based on testing
# FastAPI initialization
app = FastAPI()
# Optimized tracking of previous detections using a deque
MAX_HISTORY = 10
detected_people_history = deque(maxlen=MAX_HISTORY)
DISTANCE_THRESHOLD = 30 # cm
@app.on_event("startup")
async def startup_event():
print("\nπ WebSocket API is running on ws://0.0.0.0:7860/ws\n")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket for real-time face detection"""
await websocket.accept()
print("β
Client connected")
try:
while True:
frame_data = await websocket.receive_text()
frame_bytes = base64.b64decode(frame_data)
image_np = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
# Resize frame for faster inference
h, w, _ = frame.shape
resized_frame = cv2.resize(frame, (w // 2, h // 2))
# Run YOLO model
results = model(resized_frame, imgsz=320, half=True, verbose=False)
new_people_data = {}
change_detected = False
person_id = 1
frame_width = resized_frame.shape[1]
for result in results:
for box in result.boxes.data.tolist():
x1, y1, x2, y2, conf, _ = box[:6]
x1, y1, x2, y2 = map(int, [x1 * 2, y1 * 2, x2 * 2, y2 * 2])
if conf > 0.5:
center_x = (x1 + x2) // 2
face_width_pixels = x2 - x1
if center_x < frame_width // 3:
position = "Left"
elif center_x > 2 * frame_width // 3:
position = "Right"
else:
position = "Center"
estimated_distance = (
(FOCAL_LENGTH * KNOWN_FACE_WIDTH) / face_width_pixels
) * SCALING_FACTOR if face_width_pixels > 0 else -1
new_people_data[f"person{person_id}"] = {
"distance_cm": round(estimated_distance, 2),
"position": position,
}
if detected_people_history:
prev_data = detected_people_history[-1].get(f"person{person_id}")
if (
not prev_data
or prev_data["position"] != position
or abs(prev_data["distance_cm"] - estimated_distance) > DISTANCE_THRESHOLD
):
change_detected = True
person_id += 1
if not detected_people_history or len(new_people_data) != len(detected_people_history[-1]):
change_detected = True
if change_detected:
detected_people_history.append(new_people_data)
await websocket.send_json({"people": new_people_data})
else:
await websocket.send_json({"people": []})
await asyncio.sleep(0.05)
except WebSocketDisconnect:
print("β Client disconnected")
except Exception as e:
print(f"β οΈ Error: {e}")
if __name__ == "__main__":
print("\nπ₯ Starting WebSocket server...")
uvicorn.run(app, host="0.0.0.0", port=7860) |