|
import gradio as gr |
|
import cv2 |
|
import numpy as np |
|
import tempfile |
|
import os |
|
import time |
|
from scripts.inference import GazePredictor |
|
from utils.ear_utils import BlinkDetector |
|
|
|
def smooth_values(history, current_value, window_size=5): |
|
if current_value is not None: |
|
history.append(current_value) |
|
if len(history) > window_size: |
|
history.pop(0) |
|
return np.mean(history, axis=0) if isinstance(current_value, np.ndarray) and history else current_value if current_value is not None else 0 |
|
|
|
MODEL_PATH = os.path.join("models", "gaze_estimation_model.pth") |
|
|
|
def analyze_video(input_video): |
|
cap = cv2.VideoCapture(input_video) |
|
gaze_predictor = GazePredictor(MODEL_PATH) |
|
blink_detector = BlinkDetector() |
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
temp_fd, temp_path = tempfile.mkstemp(suffix='.mp4') |
|
os.close(temp_fd) |
|
out = None |
|
|
|
GAZE_STABILITY_THRESHOLD = 0.5 |
|
TIME_THRESHOLD = 15 |
|
BLINK_RATE_THRESHOLD = 1 |
|
EYE_CLOSURE_THRESHOLD = 10 |
|
HEAD_STABILITY_THRESHOLD = 0.05 |
|
|
|
gaze_history = [] |
|
head_history = [] |
|
ear_history = [] |
|
stable_gaze_time = 0 |
|
stable_head_time = 0 |
|
eye_closed_time = 0 |
|
blink_count = 0 |
|
start_time = 0 |
|
is_unconscious = False |
|
|
|
frame_count = 0 |
|
fps = cap.get(cv2.CAP_PROP_FPS) or 20 |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
frame_count += 1 |
|
if start_time == 0: |
|
start_time = frame_count / fps |
|
|
|
head_pose_gaze, gaze_h, gaze_v = gaze_predictor.predict_gaze(frame) |
|
current_gaze = np.array([gaze_h, gaze_v]) |
|
smoothed_gaze = smooth_values(gaze_history, current_gaze) |
|
|
|
ear, left_eye, right_eye, head_pose, left_iris, right_iris = blink_detector.detect_blinks(frame) |
|
if ear is None: |
|
cv2.putText(frame, "No face detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
smoothed_head = smooth_values(head_history, None) |
|
smoothed_ear = smooth_values(ear_history, None) |
|
else: |
|
smoothed_head = smooth_values(head_history, head_pose) |
|
smoothed_ear = smooth_values(ear_history, ear) |
|
if smoothed_ear >= blink_detector.EAR_THRESHOLD: |
|
cv2.drawMarker(frame, left_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) |
|
cv2.drawMarker(frame, right_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) |
|
|
|
cv2.putText(frame, f"Gaze H: {smoothed_gaze[0]:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
cv2.putText(frame, f"Gaze V: {smoothed_gaze[1]:.2f}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
cv2.putText(frame, f"Head Pose: {smoothed_head:.2f}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
cv2.putText(frame, f"EAR: {smoothed_ear:.2f}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
|
if len(gaze_history) > 1: |
|
gaze_diff = np.sqrt(np.sum((smoothed_gaze - gaze_history[-2])**2)) |
|
if gaze_diff < GAZE_STABILITY_THRESHOLD: |
|
if stable_gaze_time == 0: |
|
stable_gaze_time = frame_count / fps |
|
else: |
|
stable_gaze_time = 0 |
|
|
|
if len(head_history) > 1 and head_pose is not None: |
|
head_diff = abs(smoothed_head - head_history[-2]) |
|
if head_diff < HEAD_STABILITY_THRESHOLD: |
|
if stable_head_time == 0: |
|
stable_head_time = frame_count / fps |
|
else: |
|
stable_head_time = 0 |
|
|
|
if ear is not None and smoothed_ear < blink_detector.EAR_THRESHOLD: |
|
if eye_closed_time == 0: |
|
eye_closed_time = frame_count / fps |
|
elif (frame_count / fps) - eye_closed_time > EYE_CLOSURE_THRESHOLD: |
|
cv2.putText(frame, "Eyes Closed", (10, 210), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
else: |
|
if eye_closed_time > 0 and (frame_count / fps) - eye_closed_time < 0.5: |
|
blink_count += 1 |
|
eye_closed_time = 0 |
|
|
|
elapsed_minutes = ((frame_count / fps) - start_time) / 60 if start_time > 0 else 0 |
|
blink_rate = blink_count / elapsed_minutes if elapsed_minutes > 0 else 0 |
|
cv2.putText(frame, f"Blink Rate: {blink_rate:.1f}/min", (10, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
|
unconscious_conditions = [ |
|
stable_gaze_time > 0 and (frame_count / fps) - stable_gaze_time > TIME_THRESHOLD, |
|
blink_rate < BLINK_RATE_THRESHOLD and elapsed_minutes > 1, |
|
eye_closed_time > 0 and (frame_count / fps) - eye_closed_time > EYE_CLOSURE_THRESHOLD, |
|
stable_head_time > 0 and (frame_count / fps) - stable_head_time > TIME_THRESHOLD |
|
] |
|
if sum(unconscious_conditions) >= 2: |
|
cv2.putText(frame, "Unconscious Detected", (10, 270), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
is_unconscious = True |
|
else: |
|
is_unconscious = False |
|
|
|
if out is None: |
|
h, w = frame.shape[:2] |
|
out = cv2.VideoWriter(temp_path, fourcc, fps, (w, h)) |
|
out.write(frame) |
|
cap.release() |
|
if out: |
|
out.release() |
|
return temp_path |
|
|
|
def process_webcam(state): |
|
"""Process webcam frames in real-time and update log output""" |
|
if state is None: |
|
|
|
gaze_predictor = GazePredictor(MODEL_PATH) |
|
blink_detector = BlinkDetector() |
|
cap = cv2.VideoCapture(0) |
|
|
|
if not cap.isOpened(): |
|
return None, "Error: Could not open webcam.", None |
|
|
|
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) |
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) |
|
|
|
GAZE_STABILITY_THRESHOLD = 0.5 |
|
TIME_THRESHOLD = 15 |
|
BLINK_RATE_THRESHOLD = 1 |
|
EYE_CLOSURE_THRESHOLD = 10 |
|
HEAD_STABILITY_THRESHOLD = 0.05 |
|
|
|
gaze_history = [] |
|
head_history = [] |
|
ear_history = [] |
|
stable_gaze_time = 0 |
|
stable_head_time = 0 |
|
eye_closed_time = 0 |
|
blink_count = 0 |
|
start_time = time.time() |
|
is_unconscious = False |
|
log_output = "" |
|
|
|
state = { |
|
"gaze_predictor": gaze_predictor, |
|
"blink_detector": blink_detector, |
|
"cap": cap, |
|
"gaze_history": gaze_history, |
|
"head_history": head_history, |
|
"ear_history": ear_history, |
|
"stable_gaze_time": stable_gaze_time, |
|
"stable_head_time": stable_head_time, |
|
"eye_closed_time": eye_closed_time, |
|
"blink_count": blink_count, |
|
"start_time": start_time, |
|
"is_unconscious": is_unconscious, |
|
"GAZE_STABILITY_THRESHOLD": GAZE_STABILITY_THRESHOLD, |
|
"TIME_THRESHOLD": TIME_THRESHOLD, |
|
"BLINK_RATE_THRESHOLD": BLINK_RATE_THRESHOLD, |
|
"EYE_CLOSURE_THRESHOLD": EYE_CLOSURE_THRESHOLD, |
|
"HEAD_STABILITY_THRESHOLD": HEAD_STABILITY_THRESHOLD, |
|
"log_output": log_output |
|
} |
|
return state, "Initializing webcam...", None |
|
|
|
|
|
cap = state["cap"] |
|
gaze_predictor = state["gaze_predictor"] |
|
blink_detector = state["blink_detector"] |
|
gaze_history = state["gaze_history"] |
|
head_history = state["head_history"] |
|
ear_history = state["ear_history"] |
|
log_output = state["log_output"] |
|
|
|
|
|
ret, frame = cap.read() |
|
if not ret or frame is None: |
|
|
|
cap.release() |
|
cap = cv2.VideoCapture(0) |
|
if not cap.isOpened(): |
|
return state, log_output + "\nError: Could not read from webcam.", None |
|
state["cap"] = cap |
|
ret, frame = cap.read() |
|
if not ret or frame is None: |
|
return state, log_output + "\nError: Failed to capture frame after reinitialization.", None |
|
|
|
|
|
try: |
|
head_pose_gaze, gaze_h, gaze_v = gaze_predictor.predict_gaze(frame) |
|
current_gaze = np.array([gaze_h, gaze_v]) |
|
smoothed_gaze = smooth_values(gaze_history, current_gaze) |
|
|
|
ear, left_eye, right_eye, head_pose, left_iris, right_iris = blink_detector.detect_blinks(frame) |
|
|
|
|
|
current_time = time.time() |
|
logs = [] |
|
|
|
if ear is None: |
|
cv2.putText(frame, "No face detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
smoothed_head = smooth_values(head_history, None) |
|
smoothed_ear = smooth_values(ear_history, None) |
|
logs.append("No face detected") |
|
else: |
|
smoothed_head = smooth_values(head_history, head_pose) |
|
smoothed_ear = smooth_values(ear_history, ear) |
|
if smoothed_ear >= blink_detector.EAR_THRESHOLD: |
|
cv2.drawMarker(frame, left_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) |
|
cv2.drawMarker(frame, right_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) |
|
|
|
|
|
cv2.putText(frame, f"Gaze H: {smoothed_gaze[0]:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
cv2.putText(frame, f"Gaze V: {smoothed_gaze[1]:.2f}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
cv2.putText(frame, f"Head Pose: {smoothed_head:.2f}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
cv2.putText(frame, f"EAR: {smoothed_ear:.2f}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
|
|
|
if len(gaze_history) > 1: |
|
gaze_diff = np.sqrt(np.sum((smoothed_gaze - gaze_history[-2])**2)) |
|
if gaze_diff < state["GAZE_STABILITY_THRESHOLD"]: |
|
if state["stable_gaze_time"] == 0: |
|
state["stable_gaze_time"] = current_time |
|
else: |
|
state["stable_gaze_time"] = 0 |
|
|
|
|
|
if len(head_history) > 1 and head_pose is not None: |
|
head_diff = abs(smoothed_head - head_history[-2]) |
|
if head_diff < state["HEAD_STABILITY_THRESHOLD"]: |
|
if state["stable_head_time"] == 0: |
|
state["stable_head_time"] = current_time |
|
else: |
|
state["stable_head_time"] = 0 |
|
|
|
|
|
if ear is not None and smoothed_ear < blink_detector.EAR_THRESHOLD: |
|
if state["eye_closed_time"] == 0: |
|
state["eye_closed_time"] = current_time |
|
elif current_time - state["eye_closed_time"] > state["EYE_CLOSURE_THRESHOLD"]: |
|
cv2.putText(frame, "Eyes Closed", (10, 210), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
logs.append("Eyes have been closed for an extended period") |
|
else: |
|
if state["eye_closed_time"] > 0 and current_time - state["eye_closed_time"] < 0.5: |
|
state["blink_count"] += 1 |
|
logs.append("Blink detected") |
|
state["eye_closed_time"] = 0 |
|
|
|
elapsed_seconds = current_time - state["start_time"] |
|
elapsed_minutes = elapsed_seconds / 60 |
|
blink_rate = state["blink_count"] / elapsed_minutes if elapsed_minutes > 0 else 0 |
|
cv2.putText(frame, f"Blink Rate: {blink_rate:.1f}/min", (10, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
logs.append(f"Blink rate: {blink_rate:.1f}/min") |
|
|
|
|
|
unconscious_conditions = [ |
|
state["stable_gaze_time"] > 0 and current_time - state["stable_gaze_time"] > state["TIME_THRESHOLD"], |
|
blink_rate < state["BLINK_RATE_THRESHOLD"] and elapsed_minutes > 1, |
|
state["eye_closed_time"] > 0 and current_time - state["eye_closed_time"] > state["EYE_CLOSURE_THRESHOLD"], |
|
state["stable_head_time"] > 0 and current_time - state["stable_head_time"] > state["TIME_THRESHOLD"] |
|
] |
|
|
|
if sum(unconscious_conditions) >= 2: |
|
cv2.putText(frame, "Unconscious Detected", (10, 270), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
|
state["is_unconscious"] = True |
|
logs.append("WARNING: Possible unconscious state detected!") |
|
else: |
|
state["is_unconscious"] = False |
|
|
|
|
|
logs.append(f"Gaze: ({smoothed_gaze[0]:.2f}, {smoothed_gaze[1]:.2f}) | Head: {smoothed_head:.2f} | EAR: {smoothed_ear:.2f}") |
|
log_text = "\n".join(logs) |
|
|
|
|
|
log_lines = log_output.split("\n") if log_output else [] |
|
log_lines.append(log_text) |
|
if len(log_lines) > 20: |
|
log_lines = log_lines[-20:] |
|
state["log_output"] = "\n".join(log_lines) |
|
|
|
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
return state, state["log_output"], frame_rgb |
|
|
|
except Exception as e: |
|
error_msg = f"Error processing frame: {str(e)}" |
|
return state, log_output + "\n" + error_msg, None |
|
|
|
def create_webcam_interface(): |
|
log_output = gr.Textbox(label="Gaze Tracking Log", lines=10) |
|
processed_frame = gr.Image(label="Processed Frame") |
|
|
|
webcam_demo = gr.Interface( |
|
fn=process_webcam, |
|
inputs=[gr.State()], |
|
outputs=[gr.State(), log_output, processed_frame], |
|
live=True, |
|
title="Real-time Gaze Tracking" |
|
) |
|
return webcam_demo |
|
|
|
def create_video_interface(): |
|
video_demo = gr.Interface( |
|
fn=analyze_video, |
|
inputs=gr.Video(), |
|
outputs=gr.Video(), |
|
title="Video Analysis", |
|
description="Upload a video to analyze gaze and drowsiness." |
|
) |
|
return video_demo |
|
|
|
|
|
demo = gr.TabbedInterface( |
|
[create_video_interface(), create_webcam_interface()], |
|
["Video Upload", "Webcam"], |
|
title="Gaze Tracker" |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|