Spaces:
Sleeping
Sleeping
import os | |
import streamlit as st | |
from ultralytics import YOLO | |
import cv2 | |
import random | |
import time | |
from gtts import gTTS | |
import pygame | |
import tempfile | |
import threading | |
from datetime import datetime, timedelta | |
# Initialize pygame mixer for audio alerts | |
pygame.mixer.quit() | |
pygame.mixer.init() | |
# Load YOLOv8 model | |
yolo = YOLO("yolov8n.pt") | |
# Streamlit app layout | |
st.set_page_config(page_title="Assistive Vision App", layout="wide") | |
st.title("Real-Time Object Detection with Assistive Vision") | |
st.write("This app detects objects in real-time using a webcam feed and provides audio feedback for visually impaired people.") | |
# Placeholder for video feed | |
video_placeholder = st.empty() | |
# User controls | |
start_detection = st.button("Start Detection") | |
stop_detection = st.button("Stop Detection") | |
enable_audio = st.checkbox("Enable Audio Alerts", value=True) | |
# Audio alerts settings | |
alert_categories = {"person", "cat", "dog", "knife", "fire", "gun"} | |
last_alert_time = {} | |
alert_cooldown = timedelta(seconds=10) # Cooldown for audio alerts | |
# Create a directory for temporary audio files | |
audio_temp_dir = "audio_temp_files" | |
if not os.path.exists(audio_temp_dir): | |
os.makedirs(audio_temp_dir) | |
def play_audio_alert(label, position): | |
"""Generate and play an audio alert.""" | |
phrases = [ | |
f"Be careful, there's a {label} on your {position}.", | |
f"Watch out! {label} detected on your {position}.", | |
f"Alert! A {label} is on your {position}.", | |
] | |
alert_text = random.choice(phrases) | |
temp_audio_path = os.path.join(audio_temp_dir, f"alert_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.mp3") | |
tts = gTTS(alert_text) | |
tts.save(temp_audio_path) | |
try: | |
pygame.mixer.music.load(temp_audio_path) | |
pygame.mixer.music.play() | |
def cleanup_audio(): | |
while pygame.mixer.music.get_busy(): | |
time.sleep(0.1) | |
pygame.mixer.music.stop() | |
if os.path.exists(temp_audio_path): | |
os.remove(temp_audio_path) | |
threading.Thread(target=cleanup_audio, daemon=True).start() | |
except Exception as e: | |
print(f"Audio playback error: {e}") | |
def process_frame(frame, enable_audio): | |
"""Process a single video frame.""" | |
results = yolo(frame) | |
result = results[0] | |
detected_objects = {} | |
for box in result.boxes: | |
x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
label = result.names[int(box.cls[0])] | |
# Only alert for specific categories | |
if enable_audio and label not in alert_categories: | |
continue | |
# Determine object position | |
frame_center_x = frame.shape[1] // 2 | |
object_center_x = (x1 + x2) // 2 | |
position = "left" if object_center_x < frame_center_x else "right" | |
detected_objects[label] = position | |
# Draw bounding box and label on the frame | |
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
cv2.putText(frame, f"{label}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
return detected_objects, frame | |
# Real-time detection logic | |
if start_detection: | |
st.success("Starting real-time object detection...") | |
try: | |
video_capture = cv2.VideoCapture(0) | |
if not video_capture.isOpened(): | |
st.error("Error: Could not access the webcam. Please check your webcam settings.") | |
else: | |
while not stop_detection: | |
ret, frame = video_capture.read() | |
if not ret: | |
st.error("Error: Unable to read from webcam. Please check your webcam.") | |
break | |
detected_objects, processed_frame = process_frame(frame, enable_audio) | |
# Convert frame to RGB for Streamlit display | |
frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB) | |
video_placeholder.image(frame_rgb, channels="RGB", use_column_width=True) | |
# Play audio alerts | |
if enable_audio: | |
current_time = datetime.now() | |
for label, position in detected_objects.items(): | |
if label not in last_alert_time or current_time - last_alert_time[label] > alert_cooldown: | |
play_audio_alert(label, position) | |
last_alert_time[label] = current_time | |
time.sleep(0.1) | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
finally: | |
video_capture.release() | |
cv2.destroyAllWindows() | |
pygame.mixer.quit() | |
elif stop_detection: | |
st.warning("Real-time object detection stopped.") | |