import gradio as gr import cv2 import numpy as np import tempfile import os import time from scripts.inference import GazePredictor from utils.ear_utils import BlinkDetector def smooth_values(history, current_value, window_size=5): if current_value is not None: history.append(current_value) if len(history) > window_size: history.pop(0) return np.mean(history, axis=0) if isinstance(current_value, np.ndarray) and history else current_value if current_value is not None else 0 MODEL_PATH = os.path.join("models", "gaze_estimation_model.pth") def analyze_video(input_video): cap = cv2.VideoCapture(input_video) gaze_predictor = GazePredictor(MODEL_PATH) blink_detector = BlinkDetector() fourcc = cv2.VideoWriter_fourcc(*'mp4v') temp_fd, temp_path = tempfile.mkstemp(suffix='.mp4') os.close(temp_fd) out = None GAZE_STABILITY_THRESHOLD = 0.5 TIME_THRESHOLD = 15 BLINK_RATE_THRESHOLD = 1 EYE_CLOSURE_THRESHOLD = 10 HEAD_STABILITY_THRESHOLD = 0.05 gaze_history = [] head_history = [] ear_history = [] stable_gaze_time = 0 stable_head_time = 0 eye_closed_time = 0 blink_count = 0 start_time = 0 is_unconscious = False frame_count = 0 fps = cap.get(cv2.CAP_PROP_FPS) or 20 while True: ret, frame = cap.read() if not ret: break frame_count += 1 if start_time == 0: start_time = frame_count / fps head_pose_gaze, gaze_h, gaze_v = gaze_predictor.predict_gaze(frame) current_gaze = np.array([gaze_h, gaze_v]) smoothed_gaze = smooth_values(gaze_history, current_gaze) ear, left_eye, right_eye, head_pose, left_iris, right_iris = blink_detector.detect_blinks(frame) if ear is None: cv2.putText(frame, "No face detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) smoothed_head = smooth_values(head_history, None) smoothed_ear = smooth_values(ear_history, None) else: smoothed_head = smooth_values(head_history, head_pose) smoothed_ear = smooth_values(ear_history, ear) if smoothed_ear >= blink_detector.EAR_THRESHOLD: cv2.drawMarker(frame, left_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) cv2.drawMarker(frame, right_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) cv2.putText(frame, f"Gaze H: {smoothed_gaze[0]:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"Gaze V: {smoothed_gaze[1]:.2f}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"Head Pose: {smoothed_head:.2f}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"EAR: {smoothed_ear:.2f}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) if len(gaze_history) > 1: gaze_diff = np.sqrt(np.sum((smoothed_gaze - gaze_history[-2])**2)) if gaze_diff < GAZE_STABILITY_THRESHOLD: if stable_gaze_time == 0: stable_gaze_time = frame_count / fps else: stable_gaze_time = 0 if len(head_history) > 1 and head_pose is not None: head_diff = abs(smoothed_head - head_history[-2]) if head_diff < HEAD_STABILITY_THRESHOLD: if stable_head_time == 0: stable_head_time = frame_count / fps else: stable_head_time = 0 if ear is not None and smoothed_ear < blink_detector.EAR_THRESHOLD: if eye_closed_time == 0: eye_closed_time = frame_count / fps elif (frame_count / fps) - eye_closed_time > EYE_CLOSURE_THRESHOLD: cv2.putText(frame, "Eyes Closed", (10, 210), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) else: if eye_closed_time > 0 and (frame_count / fps) - eye_closed_time < 0.5: blink_count += 1 eye_closed_time = 0 elapsed_minutes = ((frame_count / fps) - start_time) / 60 if start_time > 0 else 0 blink_rate = blink_count / elapsed_minutes if elapsed_minutes > 0 else 0 cv2.putText(frame, f"Blink Rate: {blink_rate:.1f}/min", (10, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) unconscious_conditions = [ stable_gaze_time > 0 and (frame_count / fps) - stable_gaze_time > TIME_THRESHOLD, blink_rate < BLINK_RATE_THRESHOLD and elapsed_minutes > 1, eye_closed_time > 0 and (frame_count / fps) - eye_closed_time > EYE_CLOSURE_THRESHOLD, stable_head_time > 0 and (frame_count / fps) - stable_head_time > TIME_THRESHOLD ] if sum(unconscious_conditions) >= 2: cv2.putText(frame, "Unconscious Detected", (10, 270), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) is_unconscious = True else: is_unconscious = False if out is None: h, w = frame.shape[:2] out = cv2.VideoWriter(temp_path, fourcc, fps, (w, h)) out.write(frame) cap.release() if out: out.release() return temp_path def process_webcam(state, log_output): """Process webcam frames in real-time and update log output""" if state is None: # Initialize state gaze_predictor = GazePredictor(MODEL_PATH) blink_detector = BlinkDetector() cap = cv2.VideoCapture(0) if not cap.isOpened(): return None, None, "Error: Could not open webcam." GAZE_STABILITY_THRESHOLD = 0.5 TIME_THRESHOLD = 15 BLINK_RATE_THRESHOLD = 1 EYE_CLOSURE_THRESHOLD = 10 HEAD_STABILITY_THRESHOLD = 0.05 gaze_history = [] head_history = [] ear_history = [] stable_gaze_time = 0 stable_head_time = 0 eye_closed_time = 0 blink_count = 0 start_time = time.time() is_unconscious = False state = { "gaze_predictor": gaze_predictor, "blink_detector": blink_detector, "cap": cap, "gaze_history": gaze_history, "head_history": head_history, "ear_history": ear_history, "stable_gaze_time": stable_gaze_time, "stable_head_time": stable_head_time, "eye_closed_time": eye_closed_time, "blink_count": blink_count, "start_time": start_time, "is_unconscious": is_unconscious, "GAZE_STABILITY_THRESHOLD": GAZE_STABILITY_THRESHOLD, "TIME_THRESHOLD": TIME_THRESHOLD, "BLINK_RATE_THRESHOLD": BLINK_RATE_THRESHOLD, "EYE_CLOSURE_THRESHOLD": EYE_CLOSURE_THRESHOLD, "HEAD_STABILITY_THRESHOLD": HEAD_STABILITY_THRESHOLD } # Extract state variables cap = state["cap"] gaze_predictor = state["gaze_predictor"] blink_detector = state["blink_detector"] gaze_history = state["gaze_history"] head_history = state["head_history"] ear_history = state["ear_history"] # Capture frame ret, frame = cap.read() if not ret: return state, None, log_output + "\nError: Could not read from webcam." # Process frame head_pose_gaze, gaze_h, gaze_v = gaze_predictor.predict_gaze(frame) current_gaze = np.array([gaze_h, gaze_v]) smoothed_gaze = smooth_values(gaze_history, current_gaze) ear, left_eye, right_eye, head_pose, left_iris, right_iris = blink_detector.detect_blinks(frame) # Update display and logs current_time = time.time() logs = [] if ear is None: cv2.putText(frame, "No face detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) smoothed_head = smooth_values(head_history, None) smoothed_ear = smooth_values(ear_history, None) logs.append("No face detected") else: smoothed_head = smooth_values(head_history, head_pose) smoothed_ear = smooth_values(ear_history, ear) if smoothed_ear >= blink_detector.EAR_THRESHOLD: cv2.drawMarker(frame, left_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) cv2.drawMarker(frame, right_iris, (0, 255, 0), markerType=cv2.MARKER_CROSS, markerSize=10, thickness=2) # Add metrics to frame cv2.putText(frame, f"Gaze H: {smoothed_gaze[0]:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"Gaze V: {smoothed_gaze[1]:.2f}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"Head Pose: {smoothed_head:.2f}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.putText(frame, f"EAR: {smoothed_ear:.2f}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # Check for gaze stability if len(gaze_history) > 1: gaze_diff = np.sqrt(np.sum((smoothed_gaze - gaze_history[-2])**2)) if gaze_diff < state["GAZE_STABILITY_THRESHOLD"]: if state["stable_gaze_time"] == 0: state["stable_gaze_time"] = current_time else: state["stable_gaze_time"] = 0 # Check for head stability if len(head_history) > 1 and head_pose is not None: head_diff = abs(smoothed_head - head_history[-2]) if head_diff < state["HEAD_STABILITY_THRESHOLD"]: if state["stable_head_time"] == 0: state["stable_head_time"] = current_time else: state["stable_head_time"] = 0 # Check for eye closure if ear is not None and smoothed_ear < blink_detector.EAR_THRESHOLD: if state["eye_closed_time"] == 0: state["eye_closed_time"] = current_time elif current_time - state["eye_closed_time"] > state["EYE_CLOSURE_THRESHOLD"]: cv2.putText(frame, "Eyes Closed", (10, 210), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) logs.append("Eyes have been closed for an extended period") else: if state["eye_closed_time"] > 0 and current_time - state["eye_closed_time"] < 0.5: state["blink_count"] += 1 logs.append("Blink detected") state["eye_closed_time"] = 0 elapsed_seconds = current_time - state["start_time"] elapsed_minutes = elapsed_seconds / 60 blink_rate = state["blink_count"] / elapsed_minutes if elapsed_minutes > 0 else 0 cv2.putText(frame, f"Blink Rate: {blink_rate:.1f}/min", (10, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) logs.append(f"Blink rate: {blink_rate:.1f}/min") # Check for unconscious state unconscious_conditions = [ state["stable_gaze_time"] > 0 and current_time - state["stable_gaze_time"] > state["TIME_THRESHOLD"], blink_rate < state["BLINK_RATE_THRESHOLD"] and elapsed_minutes > 1, state["eye_closed_time"] > 0 and current_time - state["eye_closed_time"] > state["EYE_CLOSURE_THRESHOLD"], state["stable_head_time"] > 0 and current_time - state["stable_head_time"] > state["TIME_THRESHOLD"] ] if sum(unconscious_conditions) >= 2: cv2.putText(frame, "Unconscious Detected", (10, 270), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) state["is_unconscious"] = True logs.append("WARNING: Possible unconscious state detected!") else: state["is_unconscious"] = False # Update log output with latest information logs.append(f"Gaze: ({smoothed_gaze[0]:.2f}, {smoothed_gaze[1]:.2f}) | Head: {smoothed_head:.2f} | EAR: {smoothed_ear:.2f}") log_text = "\n".join(logs) # Keep log_output to a reasonable size log_lines = log_output.split("\n") if log_output else [] log_lines.append(log_text) if len(log_lines) > 20: # Keep only last 20 entries log_lines = log_lines[-20:] updated_log = "\n".join(log_lines) # Convert from BGR to RGB for Gradio frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return state, frame_rgb, updated_log def create_webcam_interface(): webcam = gr.Image(source="webcam", streaming=True) log_output = gr.Textbox(label="Gaze Tracking Log", lines=10) processed_frame = gr.Image(label="Processed Frame") webcam_demo = gr.Interface( fn=process_webcam, inputs=[gr.State(), gr.State("")], outputs=[gr.State(), processed_frame, log_output], live=True, title="Real-time Gaze Tracking" ) return webcam_demo def create_video_interface(): video_demo = gr.Interface( fn=analyze_video, inputs=gr.Video(), outputs=gr.Video(), title="Video Analysis", description="Upload a video to analyze gaze and drowsiness." ) return video_demo demo = gr.TabbedInterface( [create_video_interface(), create_webcam_interface()], ["Video Upload", "Webcam"], title="Gaze Tracker", description="Analyze gaze and detect drowsiness in videos or using webcam." ) if __name__ == "__main__": demo.launch()