KRISH09bha's picture
Create app.py
dea3c82 verified
raw
history blame
4.11 kB
import cv2
import numpy as np
import torch
import base64
import asyncio
from collections import deque
from ultralytics import YOLO
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
# Load YOLO model with optimized settings
device = "cuda" if torch.cuda.is_available() else "cpu"
model = YOLO("yolov11s-face.pt").to(device)
# Constants for distance estimation
KNOWN_DISTANCE = 50 # cm
KNOWN_FACE_WIDTH = 14 # cm
REF_IMAGE_FACE_WIDTH = 120 # Reference face width in pixels at the known distance
FOCAL_LENGTH = (REF_IMAGE_FACE_WIDTH * KNOWN_DISTANCE) / KNOWN_FACE_WIDTH
SCALING_FACTOR = 2.0 # Adjust based on testing
# FastAPI initialization
app = FastAPI()
# Optimized tracking of previous detections using a deque
MAX_HISTORY = 10
detected_people_history = deque(maxlen=MAX_HISTORY)
DISTANCE_THRESHOLD = 30 # cm
@app.on_event("startup")
async def startup_event():
print("\nπŸš€ WebSocket API is running on ws://0.0.0.0:7860/ws\n")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket for real-time face detection"""
await websocket.accept()
print("βœ… Client connected")
try:
while True:
frame_data = await websocket.receive_text()
frame_bytes = base64.b64decode(frame_data)
image_np = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
# Resize frame for faster inference
h, w, _ = frame.shape
resized_frame = cv2.resize(frame, (w // 2, h // 2))
# Run YOLO model
results = model(resized_frame, imgsz=320, half=True, verbose=False)
new_people_data = {}
change_detected = False
person_id = 1
frame_width = resized_frame.shape[1]
for result in results:
for box in result.boxes.data.tolist():
x1, y1, x2, y2, conf, _ = box[:6]
x1, y1, x2, y2 = map(int, [x1 * 2, y1 * 2, x2 * 2, y2 * 2])
if conf > 0.5:
center_x = (x1 + x2) // 2
face_width_pixels = x2 - x1
if center_x < frame_width // 3:
position = "Left"
elif center_x > 2 * frame_width // 3:
position = "Right"
else:
position = "Center"
estimated_distance = (
(FOCAL_LENGTH * KNOWN_FACE_WIDTH) / face_width_pixels
) * SCALING_FACTOR if face_width_pixels > 0 else -1
new_people_data[f"person{person_id}"] = {
"distance_cm": round(estimated_distance, 2),
"position": position,
}
if detected_people_history:
prev_data = detected_people_history[-1].get(f"person{person_id}")
if (
not prev_data
or prev_data["position"] != position
or abs(prev_data["distance_cm"] - estimated_distance) > DISTANCE_THRESHOLD
):
change_detected = True
person_id += 1
if not detected_people_history or len(new_people_data) != len(detected_people_history[-1]):
change_detected = True
if change_detected:
detected_people_history.append(new_people_data)
await websocket.send_json({"people": new_people_data})
else:
await websocket.send_json({"people": []})
await asyncio.sleep(0.05)
except WebSocketDisconnect:
print("❌ Client disconnected")
except Exception as e:
print(f"⚠️ Error: {e}")
if __name__ == "__main__":
print("\nπŸ”₯ Starting WebSocket server...")
uvicorn.run(app, host="0.0.0.0", port=7860)