# mask_beard_blur_app.py import os import subprocess import sys import importlib import pkg_resources def install_package(package, version=None): package_spec = f"{package}=={version}" if version else package print(f"Installing {package_spec}...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", "--no-cache-dir", package_spec]) except subprocess.CalledProcessError as e: print(f"Failed to install {package_spec}: {e}") raise # Check and install required packages required_packages = { "opencv-python": None, "numpy": None, "gradio": None, "mediapipe": None, "tensorflow": None, "gitpython": None # For git operations } installed_packages = {pkg.key for pkg in pkg_resources.working_set} for package, version in required_packages.items(): if package not in installed_packages: install_package(package, version) # Now import all necessary packages import cv2 import numpy as np import gradio as gr import mediapipe as mp import tensorflow as tf from tensorflow.keras.preprocessing.image import img_to_array from tensorflow.keras.applications.mobilenet_v2 import preprocess_input import time from pathlib import Path import tempfile import git # Set TensorFlow to use memory growth to avoid consuming all GPU memory physical_devices = tf.config.list_physical_devices('GPU') if physical_devices: try: for device in physical_devices: tf.config.experimental.set_memory_growth(device, True) except: print("Memory growth setting failed") # Load face detection from MediaPipe (much faster than Haar cascades) mp_face_detection = mp.solutions.face_detection mp_drawing = mp.solutions.drawing_utils # Global variable for model mask_model = None def download_model_repo(): """Download the face mask detection model from GitHub""" repo_url = "https://github.com/misbah4064/face_mask_detection.git" repo_dir = "face_mask_detection" model_path = os.path.join(repo_dir, "mask_recog.h5") # Check if model already exists if os.path.exists(model_path): print(f"Model already exists at {model_path}") return model_path # Check if repository directory exists if os.path.exists(repo_dir): print(f"Repository directory already exists at {repo_dir}") else: print(f"Cloning repository from {repo_url}...") try: git.Repo.clone_from(repo_url, repo_dir) print("Repository cloned successfully") except Exception as e: print(f"Error cloning repository: {e}") # Try alternative method with subprocess try: subprocess.check_call(["git", "clone", repo_url]) print("Repository cloned with subprocess") except Exception as sub_e: print(f"Error with subprocess clone: {sub_e}") return None # Verify model file exists if os.path.exists(model_path): print(f"Model file found at {model_path}") return model_path else: print(f"Model file not found at {model_path}") return None def load_mask_model(): """Load the mask detection model once and cache it""" global mask_model if mask_model is None: try: # First try to download/access the model from GitHub model_path = download_model_repo() if model_path and os.path.exists(model_path): # Use standard TF model mask_model = tf.keras.models.load_model(model_path) print(f"Loaded {model_path} successfully") return True else: print("Failed to download or find the model") return False except Exception as e: print(f"Error loading model: {e}") return False return True def variance_of_laplacian(image): """Compute the variance of the Laplacian of the image (a measure of blur).""" return cv2.Laplacian(image, cv2.CV_64F).var() def is_image_blurry(image, threshold=100.0): """Determine if an image is blurry based on Laplacian variance""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) blur_score = variance_of_laplacian(gray) return blur_score < threshold, blur_score def detect_beard(face_image): """Detect beard using texture analysis of lower face region""" h, w = face_image.shape[:2] lower_face = face_image[h//2:, :] if lower_face.size == 0: return False, 0 # Convert to grayscale for texture analysis gray = cv2.cvtColor(lower_face, cv2.COLOR_BGR2GRAY) # Calculate standard deviation (texture measure) std_val = np.std(gray) # Calculate gradient magnitude (another texture measure) sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) gradient_magnitude = np.sqrt(sobelx**2 + sobely**2) gradient_mean = np.mean(gradient_magnitude) # Combined score beard_score = std_val * 0.5 + gradient_mean * 0.5 threshold = 45 # Adjustable threshold return beard_score > threshold, beard_score def predict_mask(face_img): """Predict if a face is wearing a mask""" global mask_model # Resize and preprocess face_rgb = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) face_resized = cv2.resize(face_rgb, (224, 224)) face_array = img_to_array(face_resized) face_array = np.expand_dims(face_array, axis=0) face_array = preprocess_input(face_array) # Use standard TF model preds = mask_model.predict(face_array, verbose=0) mask_prob = float(preds[0][0]) return mask_prob > 0.5, mask_prob def analyze_frame(frame, face_detector, min_detection_confidence=0.5, blur_threshold=100): """ Analyze a single frame for faces, masks, beards, and blur """ # Make a copy to avoid modifying the original annotated_frame = frame.copy() h, w = frame.shape[:2] # Convert to RGB for MediaPipe rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect faces results = face_detector.process(rgb_frame) # Blur detection for the whole frame is_blurry, blur_score = is_image_blurry(frame, blur_threshold) blur_status = "Blurry" if is_blurry else "Clear" blur_color = (0, 0, 255) if is_blurry else (0, 255, 0) # Add blur information cv2.putText(annotated_frame, f"Video Quality: {blur_status} ({blur_score:.1f})", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, blur_color, 2) face_count = 0 # Process detected faces if results.detections: for detection in results.detections: # Get face bounding box bbox = detection.location_data.relative_bounding_box x = int(bbox.xmin * w) y = int(bbox.ymin * h) face_width = int(bbox.width * w) face_height = int(bbox.height * h) # Ensure coordinates are within frame boundaries x = max(0, x) y = max(0, y) right = min(w, x + face_width) bottom = min(h, y + face_height) # Extract face face_img = frame[y:bottom, x:right] if face_img.size == 0: continue face_count += 1 # Predict mask has_mask, mask_prob = predict_mask(face_img) mask_status = "Mask" if has_mask else "No Mask" mask_color = (0, 255, 0) if has_mask else (0, 0, 255) # Draw face bounding box cv2.rectangle(annotated_frame, (x, y), (right, bottom), mask_color, 2) # Add mask information cv2.putText(annotated_frame, f"{mask_status}: {mask_prob:.2f}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, mask_color, 2) # Detect beard only if no mask if not has_mask: has_beard, beard_score = detect_beard(face_img) beard_status = "Beard" if has_beard else "No Beard" cv2.putText(annotated_frame, f"{beard_status}: {beard_score:.1f}", (x, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 165, 0), 2) # Add face count cv2.putText(annotated_frame, f"Faces: {face_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) return annotated_frame def process_video(video_path, progress=gr.Progress(), min_detection_confidence=0.5, blur_threshold=100): """Process video file and return the path to the processed video""" if not load_mask_model(): return None, "Error: Could not load the mask detection model. Please check the console for details." # Create a temporary file for the output with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file: output_path = temp_file.name # Initialize video capture cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "Error: Could not open video file." # Get video properties width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Initialize video writer with H.264 codec fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # Initialize face detector with mp_face_detection.FaceDetection( model_selection=1, # 0 for short-range, 1 for full-range detection min_detection_confidence=min_detection_confidence ) as face_detector: # Process frames frame_count = 0 start_time = time.time() while True: ret, frame = cap.read() if not ret: break # Update progress progress((frame_count + 1) / total_frames, "Processing video...") # Process frame annotated_frame = analyze_frame(frame, face_detector, min_detection_confidence, blur_threshold) # Write to output video out.write(annotated_frame) frame_count += 1 # Clean up cap.release() out.release() # Calculate processing speed elapsed_time = time.time() - start_time processing_speed = frame_count / elapsed_time if elapsed_time > 0 else 0 return output_path, f"Processed {frame_count} frames in {elapsed_time:.1f} seconds ({processing_speed:.1f} FPS)" def process_webcam_frame(frame, min_detection_confidence, blur_threshold): """Process a single webcam frame""" if not load_mask_model(): return frame # Return original frame if model couldn't be loaded # Initialize face detector for each frame in webcam mode # This is less efficient but necessary for the Gradio webcam interface with mp_face_detection.FaceDetection( model_selection=1, min_detection_confidence=min_detection_confidence ) as face_detector: return analyze_frame(frame, face_detector, min_detection_confidence, blur_threshold) # Create Gradio interface with gr.Blocks(title="Enhanced Face Analysis System") as demo: gr.Markdown(""" # Advanced Face Analysis System This app detects and analyzes faces in videos to determine: * 😷 If a person is wearing a **mask** * 🧔 If a person has a **beard** (when no mask is present) * 🎥 The **quality/blurriness** of the video Upload a video or use your webcam for real-time analysis. """) with gr.Tabs(): with gr.TabItem("Video Upload"): with gr.Row(): with gr.Column(scale=1): video_input = gr.Video(label="Upload Video") with gr.Row(): min_confidence = gr.Slider( minimum=0.1, maximum=0.9, value=0.5, step=0.1, label="Face Detection Confidence" ) blur_threshold = gr.Slider( minimum=50, maximum=200, value=100, step=10, label="Blur Threshold" ) process_btn = gr.Button("Process Video") status_text = gr.Textbox(label="Processing Status") with gr.Column(scale=1): video_output = gr.Video(label="Processed Video") process_btn.click( fn=process_video, inputs=[video_input, min_confidence, blur_threshold], outputs=[video_output, status_text] ) with gr.TabItem("Webcam (Real-time)"): with gr.Row(): with gr.Column(scale=1): webcam_confidence = gr.Slider( minimum=0.1, maximum=0.9, value=0.5, step=0.1, label="Face Detection Confidence" ) webcam_blur = gr.Slider( minimum=50, maximum=200, value=100, step=10, label="Blur Threshold" ) with gr.Column(scale=2): webcam = gr.Image(sources=["webcam"], streaming=True, label="Webcam Feed") webcam.stream( fn=process_webcam_frame, inputs=[webcam_confidence, webcam_blur] ) gr.Markdown(""" ### How to Use 1. **Video Upload Tab**: Upload a video file and click "Process Video." Adjust sliders to tune detection parameters. 2. **Webcam Tab**: Allow camera access for real-time analysis. ### Tips - Higher face detection confidence gives fewer false positives but might miss some faces - Higher blur threshold means more tolerance for blurry video """) # Ensure the model is downloaded when the app starts def initialize_app(): print("Initializing app and downloading model...") if load_mask_model(): print("Model loaded successfully!") else: print("Failed to load model, some features may not work.") if __name__ == "__main__": initialize_app() demo.launch()