Spaces:
Running
on
Zero
Running
on
Zero
from DepthEstimator import DepthEstimator | |
import numpy as np | |
from PIL import Image | |
import os | |
from GenerateCaptions import generate_caption | |
import re | |
from config import LOGS_DIR | |
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection | |
import torch | |
from PIL import Image, ImageDraw, ImageFont | |
import spacy | |
import gc | |
class SoundMapper: | |
def __init__(self): | |
self.depth_estimator = DepthEstimator() | |
# List of depth maps in dict["predicted_depth" ,"depth"] in (tensor, PIL.Image) format | |
self.device = "cuda" | |
# self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir) | |
self.map_list = None | |
self.image_dir = self.depth_estimator.image_dir | |
# self.nlp = spacy.load("en_core_web_sm") | |
self.nlp = None | |
self.dino = None | |
self.dino_processor = None | |
# self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny").to(self.device) | |
# self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny") | |
def _load_nlp(self): | |
if self.nlp is None: | |
self.nlp = spacy.load("en_core_web_sm") | |
return self.nlp | |
def _load_depth_maps(self): | |
if self.map_list is None: | |
self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir) | |
return self.map_list | |
def process_depth_maps(self) -> list: | |
depth_maps = self._load_depth_maps() | |
processed_maps = [] | |
for item in depth_maps: | |
depth_map = item["depth"] | |
depth_array = np.array(depth_map) | |
normalization = depth_array / 255.0 | |
processed_maps.append({ | |
"original": depth_map, | |
"normalization": normalization | |
}) | |
return processed_maps | |
# def create_depth_zone(self, processed_maps : list, num_zones = 3): | |
# zones_data = [] | |
# for depth_data in processed_maps: | |
# normalized = depth_data["normalization"] | |
# thresholds = np.linspace(0, 1, num_zones+1) | |
# zones = [] | |
# for i in range(num_zones): | |
# zone_mask = (normalized >= thresholds[i]) & (normalized < thresholds[i+1]) | |
# zone_percentage = zone_mask.sum() / zone_mask.size | |
# zones.append({ | |
# "range": (thresholds[i], thresholds[i+1]), | |
# "percentage": zone_percentage, | |
# "mask": zone_mask | |
# }) | |
# zones_data.append(zones) | |
# return zones_data | |
def detect_sound_sources(self, caption_text: str) -> dict: | |
""" | |
Extract nouns and their sound descriptions from caption text. | |
Returns a dictionary mapping nouns to their descriptions. | |
""" | |
sound_sources = {} | |
nlp = self._load_nlp() | |
print(f"\n[DEBUG] Beginning sound source detection") | |
print(f"Raw caption text length: {len(caption_text)}") | |
print(f"First 100 chars: {caption_text[:100]}...") | |
# Split the caption by newlines to separate entries | |
lines = caption_text.strip().split('\n') | |
print(f"Found {len(lines)} lines after splitting") | |
for i, line in enumerate(lines): | |
# Skip empty lines | |
if not line.strip(): | |
continue | |
print(f"Processing line {i}: {line[:50]}{'...' if len(line) > 50 else ''}") | |
# Check if line matches the expected format (Noun: description) | |
if ':' in line: | |
parts = line.split(':', 1) # Split only on the first colon | |
# Clean up the noun part - remove numbers and leading/trailing whitespace | |
noun_part = parts[0].strip().lower() | |
# Remove list numbering (e.g., "1. ", "2. ", etc.) | |
noun_part = re.sub(r'^\d+\.\s*', '', noun_part) | |
description = parts[1].strip() | |
# Clean any markdown formatting | |
noun = re.sub(r'[*()]', '', noun_part).strip() | |
description = re.sub(r'[*()]', '', description).strip() | |
# Separate the description at em dash if present | |
if ' — ' in description: | |
description = description.split(' — ', 1)[0].strip() | |
elif ' - ' in description: | |
description = description.split(' - ', 1)[0].strip() | |
print(f" - Found potential noun: '{noun}' with description: '{description[:30]}...'") | |
# Skip if noun contains invalid characters or is too short | |
if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): | |
sound_sources[noun] = description | |
print(f" √ Added to sound sources") | |
else: | |
print(f" × Skipped (invalid format)") | |
# If no structured format found, try to extract nouns from the text | |
if not sound_sources: | |
print("No structured format found, falling back to noun extraction") | |
all_nouns = [] | |
doc = nlp(caption_text) | |
for token in doc: | |
if token.pos_ == "NOUN" and len(token.text) > 1: | |
if token.text[0].isalpha(): | |
all_nouns.append(token.text.lower()) | |
print(f" - Extracted noun: '{token.text.lower()}'") | |
for noun in all_nouns: | |
sound_sources[noun] = "" # Empty description | |
print(f"[DEBUG] Final detected sound sources: {list(sound_sources.keys())}") | |
return sound_sources | |
def map_bbox_to_depth_zone(self, bbox, depth_map, num_zones=3): | |
x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
height, width = depth_map.shape | |
x1, y1 = max(0, x1), max(0, y1) | |
x2, y2 = min(width, x2), min(height, y2) | |
depth_roi = depth_map[y1:y2, x1:x2] | |
if depth_roi.size == 0: | |
return num_zones - 1 | |
mean_depth = np.mean(depth_roi) | |
thresholds = self.create_histogram_depth_zones(depth_map, num_zones) | |
for i in range(num_zones): | |
if thresholds[i] <= mean_depth < thresholds[i+1]: | |
return i | |
return num_zones - 1 | |
def detect_objects(self, nouns : list, image: Image): | |
filtered_nouns = [] | |
for noun in nouns: | |
if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): | |
filtered_nouns.append(noun) | |
print(f"Detecting objects for nouns: {filtered_nouns}") | |
if self.dino is None: | |
self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-base").to(self.device) | |
self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base") | |
else: | |
self.dino = self.dino.to(self.device) | |
text_prompt = " . ".join(filtered_nouns) | |
inputs = self.dino_processor(images=image, text=text_prompt, return_tensors="pt").to(self.device) | |
with torch.no_grad(): | |
outputs = self.dino(**inputs) | |
results = self.dino_processor.post_process_grounded_object_detection( | |
outputs, | |
inputs.input_ids, | |
box_threshold=0.25, | |
text_threshold=0.25, | |
target_sizes=[image.size[::-1]] | |
) | |
result = results[0] | |
labels = result["labels"] | |
bboxes = result["boxes"] | |
clean_labels = [] | |
for label in labels: | |
clean_label = re.sub(r'##\w+', '', label) | |
clean_label = self._split_combined_words(clean_label, filtered_nouns) | |
clean_labels.append(clean_label) | |
self.dino = self.dino.to("cpu") | |
torch.cuda.empty_cache() | |
del inputs, outputs, results | |
print(f"Detected objects: {clean_labels}") | |
return (clean_labels, bboxes) | |
def _split_combined_words(self, text, nouns=None): | |
nlp = self._load_nlp() | |
if nouns is None: | |
known_words = set() | |
doc = nlp(text) | |
for token in doc: | |
if token.pos_ == "NOUN" and len(token.text) > 1: | |
known_words.add(token.text.lower()) | |
else: | |
known_words = set(nouns) | |
result = [] | |
for word in text.split(): | |
if word in known_words: | |
result.append(word) | |
continue | |
found = False | |
for known in known_words: | |
if known in word and len(known) > 2: | |
result.append(known) | |
found = True | |
if not found: | |
result.append(word) | |
return " ".join(result) | |
def process_dino_labels(self, labels): | |
processed_labels = [] | |
nlp = self._load_nlp() | |
for label in labels: | |
if label.startswith('##'): | |
continue | |
label = re.sub(r'[*()]', '', label).strip() | |
parts = label.split() | |
for part in parts: | |
if part.startswith('##'): | |
continue | |
doc = nlp(part) | |
for token in doc: | |
if token.pos_ == "NOUN" and len(token.text) > 1: | |
processed_labels.append(token.text.lower()) | |
unique_labels = [] | |
for label in processed_labels: | |
if label not in unique_labels: | |
unique_labels.append(label) | |
return unique_labels | |
def create_histogram_depth_zones(self, depth_map, num_zones = 3): | |
# using 50 bins because it is faster | |
hist, bin_edge = np.histogram(depth_map.flatten(), bins=50, range=(0, 1)) | |
cumulative = np.cumsum(hist) / np.sum(hist) | |
thresholds = [0.0] | |
for i in range(1, num_zones): | |
target = i / num_zones | |
idx = np.argmin(np.abs(cumulative - target)) | |
thresholds.append(bin_edge[idx + 1]) | |
thresholds.append(1.0) | |
return thresholds | |
def analyze_object_depths(self, image_path, depth_map, lat, lon, caption_data=None, all_objects=False): | |
image = Image.open(image_path) | |
if caption_data is None: | |
caption = generate_caption(lat, lon) | |
if not caption: | |
print(f"Failed to generate caption for {image_path}") | |
return [] | |
caption_text = caption.get("sound_description", "") | |
else: | |
caption_text = caption_data.get("sound_description", "") | |
# Debug: Print the raw caption text | |
print(f"\n[DEBUG] Raw caption text for {os.path.basename(image_path)}:") | |
print(caption_text) | |
print("-" * 50) | |
if not caption_text: | |
print(f"No caption text available for {image_path}") | |
return [] | |
# Extract nouns and their sound descriptions | |
sound_sources = self.detect_sound_sources(caption_text) | |
# Debug: Print the extracted sound sources | |
print(f"[DEBUG] Extracted sound sources:") | |
for noun, desc in sound_sources.items(): | |
print(f" - {noun}: {desc}") | |
print("-" * 50) | |
if not sound_sources: | |
print(f"No sound sources detected in caption for {image_path}") | |
return [] | |
# Get list of nouns only for object detection | |
nouns = list(sound_sources.keys()) | |
# Debug: Print the list of nouns being used for detection | |
print(f"[DEBUG] Nouns for object detection: {nouns}") | |
print("-" * 50) | |
labels, bboxes = self.detect_objects(nouns, image) | |
if len(labels) == 0 or len(bboxes) == 0: | |
print(f"No objects detected in {image_path}") | |
return [] | |
object_data = [] | |
known_objects = set(nouns) if nouns else set() | |
for i, (label, bbox) in enumerate(zip(labels, bboxes)): | |
if '##' in label: | |
continue | |
x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
height, width = depth_map.shape | |
x1, y1 = max(0, x1), max(0, y1) | |
x2, y2 = min(width, x2), min(height, y2) | |
depth_roi = depth_map[y1:y2, x1:x2] | |
if depth_roi.size == 0: | |
continue | |
mean_depth = np.mean(depth_roi) | |
matched_noun = None | |
matched_desc = None | |
for word in label.split(): | |
word = word.lower() | |
if word in sound_sources: | |
matched_noun = word | |
matched_desc = sound_sources[word] | |
break | |
if matched_noun is None: | |
for noun in sound_sources: | |
if noun in label.lower(): | |
matched_noun = noun | |
matched_desc = sound_sources[noun] | |
break | |
if matched_noun is None: | |
for word in label.split(): | |
if len(word) > 1 and word[0].isalpha() and '##' not in word: | |
matched_noun = word.lower() | |
matched_desc = "" # No description available | |
break | |
if matched_noun: | |
thresholds = self.create_histogram_depth_zones(depth_map, num_zones=3) | |
zone = 0 # The default is 0 which is the closest zone | |
for i in range(3): | |
if thresholds[i] <= mean_depth < thresholds[i+1]: | |
zone = i | |
break | |
object_data.append({ | |
"original_label": matched_noun, | |
"bbox": bbox.tolist(), | |
"depth_zone": zone, | |
"zone_description": ["near", "medium", "far"][zone], | |
"mean_depth": mean_depth, | |
"weight": 1.0 - mean_depth, | |
"sound_description": matched_desc | |
}) | |
if all_objects: | |
object_data.sort(key=lambda x: x["mean_depth"]) | |
return object_data | |
else: | |
if not object_data: | |
return [] | |
closest_object = min(object_data, key=lambda x: x["mean_depth"]) | |
return [closest_object] | |
def cleanup(self): | |
if hasattr(self, 'depth_estimator') and self.depth_estimator is not None: | |
del self.depth_estimator | |
self.depth_estimator = None | |
if self.map_list is not None: | |
del self.map_list | |
self.map_list = None | |
if self.dino is not None: | |
self.dino = self.dino.to("cpu") | |
del self.dino | |
self.dino = None | |
del self.dino_processor | |
self.dino_processor = None | |
if self.nlp is not None: | |
del self.nlp | |
self.nlp = None | |
torch.cuda.empty_cache() | |
gc.collect() | |
def test_object_depth_analysis(self): | |
""" | |
Test the object depth analysis on all images in the directory. | |
""" | |
# Process depth maps first | |
processed_maps = self.process_depth_maps() | |
# Get list of original image paths | |
image_dir = self.depth_estimator.image_dir | |
image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".jpg")] | |
results = [] | |
# For each image and its corresponding depth map | |
for i, (image_path, processed_map) in enumerate(zip(image_paths, processed_maps)): | |
# Extract the normalized depth map | |
depth_map = processed_map["normalization"] | |
# Analyze objects and their depths | |
object_depths = self.analyze_object_depths(image_path, depth_map) | |
# Store results | |
results.append({ | |
"image_path": image_path, | |
"object_depths": object_depths | |
}) | |
# Print some information for debugging | |
print(f"Analyzed {image_path}:") | |
for obj in object_depths: | |
print(f" - {obj['original_label']} (Zone: {obj['zone_description']})") | |
return results | |