Natwar's picture
Update app.py
b1240b1 verified
raw
history blame contribute delete
14.6 kB
# mask_beard_blur_app.py
import os
import subprocess
import sys
import importlib
import pkg_resources
def install_package(package, version=None):
package_spec = f"{package}=={version}" if version else package
print(f"Installing {package_spec}...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "--no-cache-dir", package_spec])
except subprocess.CalledProcessError as e:
print(f"Failed to install {package_spec}: {e}")
raise
# Check and install required packages
required_packages = {
"opencv-python": None,
"numpy": None,
"gradio": None,
"mediapipe": None,
"tensorflow": None,
"gitpython": None # For git operations
}
installed_packages = {pkg.key for pkg in pkg_resources.working_set}
for package, version in required_packages.items():
if package not in installed_packages:
install_package(package, version)
# Now import all necessary packages
import cv2
import numpy as np
import gradio as gr
import mediapipe as mp
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
import time
from pathlib import Path
import tempfile
import git
# Set TensorFlow to use memory growth to avoid consuming all GPU memory
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
try:
for device in physical_devices:
tf.config.experimental.set_memory_growth(device, True)
except:
print("Memory growth setting failed")
# Load face detection from MediaPipe (much faster than Haar cascades)
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
# Global variable for model
mask_model = None
def download_model_repo():
"""Download the face mask detection model from GitHub"""
repo_url = "https://github.com/misbah4064/face_mask_detection.git"
repo_dir = "face_mask_detection"
model_path = os.path.join(repo_dir, "mask_recog.h5")
# Check if model already exists
if os.path.exists(model_path):
print(f"Model already exists at {model_path}")
return model_path
# Check if repository directory exists
if os.path.exists(repo_dir):
print(f"Repository directory already exists at {repo_dir}")
else:
print(f"Cloning repository from {repo_url}...")
try:
git.Repo.clone_from(repo_url, repo_dir)
print("Repository cloned successfully")
except Exception as e:
print(f"Error cloning repository: {e}")
# Try alternative method with subprocess
try:
subprocess.check_call(["git", "clone", repo_url])
print("Repository cloned with subprocess")
except Exception as sub_e:
print(f"Error with subprocess clone: {sub_e}")
return None
# Verify model file exists
if os.path.exists(model_path):
print(f"Model file found at {model_path}")
return model_path
else:
print(f"Model file not found at {model_path}")
return None
def load_mask_model():
"""Load the mask detection model once and cache it"""
global mask_model
if mask_model is None:
try:
# First try to download/access the model from GitHub
model_path = download_model_repo()
if model_path and os.path.exists(model_path):
# Use standard TF model
mask_model = tf.keras.models.load_model(model_path)
print(f"Loaded {model_path} successfully")
return True
else:
print("Failed to download or find the model")
return False
except Exception as e:
print(f"Error loading model: {e}")
return False
return True
def variance_of_laplacian(image):
"""Compute the variance of the Laplacian of the image (a measure of blur)."""
return cv2.Laplacian(image, cv2.CV_64F).var()
def is_image_blurry(image, threshold=100.0):
"""Determine if an image is blurry based on Laplacian variance"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
blur_score = variance_of_laplacian(gray)
return blur_score < threshold, blur_score
def detect_beard(face_image):
"""Detect beard using texture analysis of lower face region"""
h, w = face_image.shape[:2]
lower_face = face_image[h//2:, :]
if lower_face.size == 0:
return False, 0
# Convert to grayscale for texture analysis
gray = cv2.cvtColor(lower_face, cv2.COLOR_BGR2GRAY)
# Calculate standard deviation (texture measure)
std_val = np.std(gray)
# Calculate gradient magnitude (another texture measure)
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
gradient_magnitude = np.sqrt(sobelx**2 + sobely**2)
gradient_mean = np.mean(gradient_magnitude)
# Combined score
beard_score = std_val * 0.5 + gradient_mean * 0.5
threshold = 45 # Adjustable threshold
return beard_score > threshold, beard_score
def predict_mask(face_img):
"""Predict if a face is wearing a mask"""
global mask_model
# Resize and preprocess
face_rgb = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
face_resized = cv2.resize(face_rgb, (224, 224))
face_array = img_to_array(face_resized)
face_array = np.expand_dims(face_array, axis=0)
face_array = preprocess_input(face_array)
# Use standard TF model
preds = mask_model.predict(face_array, verbose=0)
mask_prob = float(preds[0][0])
return mask_prob > 0.5, mask_prob
def analyze_frame(frame, face_detector, min_detection_confidence=0.5, blur_threshold=100):
"""
Analyze a single frame for faces, masks, beards, and blur
"""
# Make a copy to avoid modifying the original
annotated_frame = frame.copy()
h, w = frame.shape[:2]
# Convert to RGB for MediaPipe
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect faces
results = face_detector.process(rgb_frame)
# Blur detection for the whole frame
is_blurry, blur_score = is_image_blurry(frame, blur_threshold)
blur_status = "Blurry" if is_blurry else "Clear"
blur_color = (0, 0, 255) if is_blurry else (0, 255, 0)
# Add blur information
cv2.putText(annotated_frame, f"Video Quality: {blur_status} ({blur_score:.1f})",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, blur_color, 2)
face_count = 0
# Process detected faces
if results.detections:
for detection in results.detections:
# Get face bounding box
bbox = detection.location_data.relative_bounding_box
x = int(bbox.xmin * w)
y = int(bbox.ymin * h)
face_width = int(bbox.width * w)
face_height = int(bbox.height * h)
# Ensure coordinates are within frame boundaries
x = max(0, x)
y = max(0, y)
right = min(w, x + face_width)
bottom = min(h, y + face_height)
# Extract face
face_img = frame[y:bottom, x:right]
if face_img.size == 0:
continue
face_count += 1
# Predict mask
has_mask, mask_prob = predict_mask(face_img)
mask_status = "Mask" if has_mask else "No Mask"
mask_color = (0, 255, 0) if has_mask else (0, 0, 255)
# Draw face bounding box
cv2.rectangle(annotated_frame, (x, y), (right, bottom), mask_color, 2)
# Add mask information
cv2.putText(annotated_frame, f"{mask_status}: {mask_prob:.2f}",
(x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, mask_color, 2)
# Detect beard only if no mask
if not has_mask:
has_beard, beard_score = detect_beard(face_img)
beard_status = "Beard" if has_beard else "No Beard"
cv2.putText(annotated_frame, f"{beard_status}: {beard_score:.1f}",
(x, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 165, 0), 2)
# Add face count
cv2.putText(annotated_frame, f"Faces: {face_count}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return annotated_frame
def process_video(video_path, progress=gr.Progress(), min_detection_confidence=0.5, blur_threshold=100):
"""Process video file and return the path to the processed video"""
if not load_mask_model():
return None, "Error: Could not load the mask detection model. Please check the console for details."
# Create a temporary file for the output
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
output_path = temp_file.name
# Initialize video capture
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "Error: Could not open video file."
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Initialize video writer with H.264 codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# Initialize face detector
with mp_face_detection.FaceDetection(
model_selection=1, # 0 for short-range, 1 for full-range detection
min_detection_confidence=min_detection_confidence
) as face_detector:
# Process frames
frame_count = 0
start_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
# Update progress
progress((frame_count + 1) / total_frames, "Processing video...")
# Process frame
annotated_frame = analyze_frame(frame, face_detector, min_detection_confidence, blur_threshold)
# Write to output video
out.write(annotated_frame)
frame_count += 1
# Clean up
cap.release()
out.release()
# Calculate processing speed
elapsed_time = time.time() - start_time
processing_speed = frame_count / elapsed_time if elapsed_time > 0 else 0
return output_path, f"Processed {frame_count} frames in {elapsed_time:.1f} seconds ({processing_speed:.1f} FPS)"
def process_webcam_frame(frame, min_detection_confidence, blur_threshold):
"""Process a single webcam frame"""
if not load_mask_model():
return frame # Return original frame if model couldn't be loaded
# Initialize face detector for each frame in webcam mode
# This is less efficient but necessary for the Gradio webcam interface
with mp_face_detection.FaceDetection(
model_selection=1,
min_detection_confidence=min_detection_confidence
) as face_detector:
return analyze_frame(frame, face_detector, min_detection_confidence, blur_threshold)
# Create Gradio interface
with gr.Blocks(title="Enhanced Face Analysis System") as demo:
gr.Markdown("""
# Advanced Face Analysis System
This app detects and analyzes faces in videos to determine:
* 😷 If a person is wearing a **mask**
* 🧔 If a person has a **beard** (when no mask is present)
* 🎥 The **quality/blurriness** of the video
Upload a video or use your webcam for real-time analysis.
""")
with gr.Tabs():
with gr.TabItem("Video Upload"):
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(label="Upload Video")
with gr.Row():
min_confidence = gr.Slider(
minimum=0.1, maximum=0.9, value=0.5, step=0.1,
label="Face Detection Confidence"
)
blur_threshold = gr.Slider(
minimum=50, maximum=200, value=100, step=10,
label="Blur Threshold"
)
process_btn = gr.Button("Process Video")
status_text = gr.Textbox(label="Processing Status")
with gr.Column(scale=1):
video_output = gr.Video(label="Processed Video")
process_btn.click(
fn=process_video,
inputs=[video_input, min_confidence, blur_threshold],
outputs=[video_output, status_text]
)
with gr.TabItem("Webcam (Real-time)"):
with gr.Row():
with gr.Column(scale=1):
webcam_confidence = gr.Slider(
minimum=0.1, maximum=0.9, value=0.5, step=0.1,
label="Face Detection Confidence"
)
webcam_blur = gr.Slider(
minimum=50, maximum=200, value=100, step=10,
label="Blur Threshold"
)
with gr.Column(scale=2):
webcam = gr.Image(sources=["webcam"], streaming=True, label="Webcam Feed")
webcam.stream(
fn=process_webcam_frame,
inputs=[webcam_confidence, webcam_blur]
)
gr.Markdown("""
### How to Use
1. **Video Upload Tab**: Upload a video file and click "Process Video." Adjust sliders to tune detection parameters.
2. **Webcam Tab**: Allow camera access for real-time analysis.
### Tips
- Higher face detection confidence gives fewer false positives but might miss some faces
- Higher blur threshold means more tolerance for blurry video
""")
# Ensure the model is downloaded when the app starts
def initialize_app():
print("Initializing app and downloading model...")
if load_mask_model():
print("Model loaded successfully!")
else:
print("Failed to load model, some features may not work.")
if __name__ == "__main__":
initialize_app()
demo.launch()