|
|
|
|
|
import os |
|
import subprocess |
|
import sys |
|
import importlib |
|
import pkg_resources |
|
|
|
def install_package(package, version=None): |
|
package_spec = f"{package}=={version}" if version else package |
|
print(f"Installing {package_spec}...") |
|
try: |
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "--no-cache-dir", package_spec]) |
|
except subprocess.CalledProcessError as e: |
|
print(f"Failed to install {package_spec}: {e}") |
|
raise |
|
|
|
|
|
required_packages = { |
|
"opencv-python": None, |
|
"numpy": None, |
|
"gradio": None, |
|
"mediapipe": None, |
|
"tensorflow": None, |
|
"gitpython": None |
|
} |
|
|
|
installed_packages = {pkg.key for pkg in pkg_resources.working_set} |
|
for package, version in required_packages.items(): |
|
if package not in installed_packages: |
|
install_package(package, version) |
|
|
|
|
|
import cv2 |
|
import numpy as np |
|
import gradio as gr |
|
import mediapipe as mp |
|
import tensorflow as tf |
|
from tensorflow.keras.preprocessing.image import img_to_array |
|
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input |
|
import time |
|
from pathlib import Path |
|
import tempfile |
|
import git |
|
|
|
|
|
physical_devices = tf.config.list_physical_devices('GPU') |
|
if physical_devices: |
|
try: |
|
for device in physical_devices: |
|
tf.config.experimental.set_memory_growth(device, True) |
|
except: |
|
print("Memory growth setting failed") |
|
|
|
|
|
mp_face_detection = mp.solutions.face_detection |
|
mp_drawing = mp.solutions.drawing_utils |
|
|
|
|
|
mask_model = None |
|
|
|
def download_model_repo(): |
|
"""Download the face mask detection model from GitHub""" |
|
repo_url = "https://github.com/misbah4064/face_mask_detection.git" |
|
repo_dir = "face_mask_detection" |
|
model_path = os.path.join(repo_dir, "mask_recog.h5") |
|
|
|
|
|
if os.path.exists(model_path): |
|
print(f"Model already exists at {model_path}") |
|
return model_path |
|
|
|
|
|
if os.path.exists(repo_dir): |
|
print(f"Repository directory already exists at {repo_dir}") |
|
else: |
|
print(f"Cloning repository from {repo_url}...") |
|
try: |
|
git.Repo.clone_from(repo_url, repo_dir) |
|
print("Repository cloned successfully") |
|
except Exception as e: |
|
print(f"Error cloning repository: {e}") |
|
|
|
try: |
|
subprocess.check_call(["git", "clone", repo_url]) |
|
print("Repository cloned with subprocess") |
|
except Exception as sub_e: |
|
print(f"Error with subprocess clone: {sub_e}") |
|
return None |
|
|
|
|
|
if os.path.exists(model_path): |
|
print(f"Model file found at {model_path}") |
|
return model_path |
|
else: |
|
print(f"Model file not found at {model_path}") |
|
return None |
|
|
|
def load_mask_model(): |
|
"""Load the mask detection model once and cache it""" |
|
global mask_model |
|
if mask_model is None: |
|
try: |
|
|
|
model_path = download_model_repo() |
|
if model_path and os.path.exists(model_path): |
|
|
|
mask_model = tf.keras.models.load_model(model_path) |
|
print(f"Loaded {model_path} successfully") |
|
return True |
|
else: |
|
print("Failed to download or find the model") |
|
return False |
|
except Exception as e: |
|
print(f"Error loading model: {e}") |
|
return False |
|
return True |
|
|
|
def variance_of_laplacian(image): |
|
"""Compute the variance of the Laplacian of the image (a measure of blur).""" |
|
return cv2.Laplacian(image, cv2.CV_64F).var() |
|
|
|
def is_image_blurry(image, threshold=100.0): |
|
"""Determine if an image is blurry based on Laplacian variance""" |
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
blur_score = variance_of_laplacian(gray) |
|
return blur_score < threshold, blur_score |
|
|
|
def detect_beard(face_image): |
|
"""Detect beard using texture analysis of lower face region""" |
|
h, w = face_image.shape[:2] |
|
lower_face = face_image[h//2:, :] |
|
|
|
if lower_face.size == 0: |
|
return False, 0 |
|
|
|
|
|
gray = cv2.cvtColor(lower_face, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
std_val = np.std(gray) |
|
|
|
|
|
sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) |
|
sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) |
|
gradient_magnitude = np.sqrt(sobelx**2 + sobely**2) |
|
gradient_mean = np.mean(gradient_magnitude) |
|
|
|
|
|
beard_score = std_val * 0.5 + gradient_mean * 0.5 |
|
threshold = 45 |
|
|
|
return beard_score > threshold, beard_score |
|
|
|
def predict_mask(face_img): |
|
"""Predict if a face is wearing a mask""" |
|
global mask_model |
|
|
|
|
|
face_rgb = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) |
|
face_resized = cv2.resize(face_rgb, (224, 224)) |
|
face_array = img_to_array(face_resized) |
|
face_array = np.expand_dims(face_array, axis=0) |
|
face_array = preprocess_input(face_array) |
|
|
|
|
|
preds = mask_model.predict(face_array, verbose=0) |
|
|
|
mask_prob = float(preds[0][0]) |
|
return mask_prob > 0.5, mask_prob |
|
|
|
def analyze_frame(frame, face_detector, min_detection_confidence=0.5, blur_threshold=100): |
|
""" |
|
Analyze a single frame for faces, masks, beards, and blur |
|
""" |
|
|
|
annotated_frame = frame.copy() |
|
h, w = frame.shape[:2] |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
results = face_detector.process(rgb_frame) |
|
|
|
|
|
is_blurry, blur_score = is_image_blurry(frame, blur_threshold) |
|
blur_status = "Blurry" if is_blurry else "Clear" |
|
blur_color = (0, 0, 255) if is_blurry else (0, 255, 0) |
|
|
|
|
|
cv2.putText(annotated_frame, f"Video Quality: {blur_status} ({blur_score:.1f})", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, blur_color, 2) |
|
|
|
face_count = 0 |
|
|
|
|
|
if results.detections: |
|
for detection in results.detections: |
|
|
|
bbox = detection.location_data.relative_bounding_box |
|
x = int(bbox.xmin * w) |
|
y = int(bbox.ymin * h) |
|
face_width = int(bbox.width * w) |
|
face_height = int(bbox.height * h) |
|
|
|
|
|
x = max(0, x) |
|
y = max(0, y) |
|
right = min(w, x + face_width) |
|
bottom = min(h, y + face_height) |
|
|
|
|
|
face_img = frame[y:bottom, x:right] |
|
if face_img.size == 0: |
|
continue |
|
|
|
face_count += 1 |
|
|
|
|
|
has_mask, mask_prob = predict_mask(face_img) |
|
mask_status = "Mask" if has_mask else "No Mask" |
|
mask_color = (0, 255, 0) if has_mask else (0, 0, 255) |
|
|
|
|
|
cv2.rectangle(annotated_frame, (x, y), (right, bottom), mask_color, 2) |
|
|
|
|
|
cv2.putText(annotated_frame, f"{mask_status}: {mask_prob:.2f}", |
|
(x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, mask_color, 2) |
|
|
|
|
|
if not has_mask: |
|
has_beard, beard_score = detect_beard(face_img) |
|
beard_status = "Beard" if has_beard else "No Beard" |
|
cv2.putText(annotated_frame, f"{beard_status}: {beard_score:.1f}", |
|
(x, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 165, 0), 2) |
|
|
|
|
|
cv2.putText(annotated_frame, f"Faces: {face_count}", |
|
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
|
|
|
return annotated_frame |
|
|
|
def process_video(video_path, progress=gr.Progress(), min_detection_confidence=0.5, blur_threshold=100): |
|
"""Process video file and return the path to the processed video""" |
|
if not load_mask_model(): |
|
return None, "Error: Could not load the mask detection model. Please check the console for details." |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file: |
|
output_path = temp_file.name |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return None, "Error: Could not open video file." |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
with mp_face_detection.FaceDetection( |
|
model_selection=1, |
|
min_detection_confidence=min_detection_confidence |
|
) as face_detector: |
|
|
|
|
|
frame_count = 0 |
|
start_time = time.time() |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
progress((frame_count + 1) / total_frames, "Processing video...") |
|
|
|
|
|
annotated_frame = analyze_frame(frame, face_detector, min_detection_confidence, blur_threshold) |
|
|
|
|
|
out.write(annotated_frame) |
|
|
|
frame_count += 1 |
|
|
|
|
|
cap.release() |
|
out.release() |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
processing_speed = frame_count / elapsed_time if elapsed_time > 0 else 0 |
|
|
|
return output_path, f"Processed {frame_count} frames in {elapsed_time:.1f} seconds ({processing_speed:.1f} FPS)" |
|
|
|
def process_webcam_frame(frame, min_detection_confidence, blur_threshold): |
|
"""Process a single webcam frame""" |
|
if not load_mask_model(): |
|
return frame |
|
|
|
|
|
|
|
with mp_face_detection.FaceDetection( |
|
model_selection=1, |
|
min_detection_confidence=min_detection_confidence |
|
) as face_detector: |
|
return analyze_frame(frame, face_detector, min_detection_confidence, blur_threshold) |
|
|
|
|
|
with gr.Blocks(title="Enhanced Face Analysis System") as demo: |
|
gr.Markdown(""" |
|
# Advanced Face Analysis System |
|
|
|
This app detects and analyzes faces in videos to determine: |
|
|
|
* 😷 If a person is wearing a **mask** |
|
* 🧔 If a person has a **beard** (when no mask is present) |
|
* 🎥 The **quality/blurriness** of the video |
|
|
|
Upload a video or use your webcam for real-time analysis. |
|
""") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Video Upload"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
video_input = gr.Video(label="Upload Video") |
|
with gr.Row(): |
|
min_confidence = gr.Slider( |
|
minimum=0.1, maximum=0.9, value=0.5, step=0.1, |
|
label="Face Detection Confidence" |
|
) |
|
blur_threshold = gr.Slider( |
|
minimum=50, maximum=200, value=100, step=10, |
|
label="Blur Threshold" |
|
) |
|
process_btn = gr.Button("Process Video") |
|
status_text = gr.Textbox(label="Processing Status") |
|
|
|
with gr.Column(scale=1): |
|
video_output = gr.Video(label="Processed Video") |
|
|
|
process_btn.click( |
|
fn=process_video, |
|
inputs=[video_input, min_confidence, blur_threshold], |
|
outputs=[video_output, status_text] |
|
) |
|
|
|
with gr.TabItem("Webcam (Real-time)"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
webcam_confidence = gr.Slider( |
|
minimum=0.1, maximum=0.9, value=0.5, step=0.1, |
|
label="Face Detection Confidence" |
|
) |
|
webcam_blur = gr.Slider( |
|
minimum=50, maximum=200, value=100, step=10, |
|
label="Blur Threshold" |
|
) |
|
|
|
with gr.Column(scale=2): |
|
webcam = gr.Image(sources=["webcam"], streaming=True, label="Webcam Feed") |
|
|
|
webcam.stream( |
|
fn=process_webcam_frame, |
|
inputs=[webcam_confidence, webcam_blur] |
|
) |
|
|
|
gr.Markdown(""" |
|
### How to Use |
|
|
|
1. **Video Upload Tab**: Upload a video file and click "Process Video." Adjust sliders to tune detection parameters. |
|
2. **Webcam Tab**: Allow camera access for real-time analysis. |
|
|
|
### Tips |
|
|
|
- Higher face detection confidence gives fewer false positives but might miss some faces |
|
- Higher blur threshold means more tolerance for blurry video |
|
""") |
|
|
|
|
|
def initialize_app(): |
|
print("Initializing app and downloading model...") |
|
if load_mask_model(): |
|
print("Model loaded successfully!") |
|
else: |
|
print("Failed to load model, some features may not work.") |
|
|
|
if __name__ == "__main__": |
|
initialize_app() |
|
demo.launch() |