Spaces:
Running
Running
import gradio as gr | |
import os | |
import json | |
import pandas as pd | |
from datasets import load_dataset, DatasetDict, Dataset, Audio | |
from huggingface_hub import HfApi, whoami, login, hf_hub_download | |
import tempfile | |
import shutil | |
import gc | |
import time | |
import psutil | |
from pydub import AudioSegment | |
import soundfile as sf | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
import re | |
import numpy as np | |
from pydantic import BaseModel | |
from typing import Optional, List, Tuple | |
from datetime import datetime | |
# Log in with Hugging Face token | |
token = os.getenv("hf_token") | |
if token: | |
login(token) | |
else: | |
print("Warning: hf_token environment variable not set. Hugging Face Hub operations might fail.") | |
# Configuration | |
HF_DATASET_NAME = "navidved/channelb-raw-data" | |
AUDIO_DIR = "audio" # Not actively used if paths are absolute or in dataset item | |
SAVE_PATH = "annotations.json" | |
ALLOWED_USERS = ["vargha", "navidved", "userC"] | |
REVIEWERS = ["vargha"] | |
ANNOTATORS = [user for user in ALLOWED_USERS] | |
CURRENT_USERNAME = None | |
PAGE_SIZE = 100 | |
SAVE_INTERVAL = 10 | |
# --- SECOND PHASE CONFIGURATION --- | |
SECOND_PHASE = False # Set to True to activate second phase review | |
SECOND_PHASE_REVIEW_MAPPING = {"navidved": "vargha"} # Populated if SECOND_PHASE is True. Maps: reviewer_username -> original_annotator_username | |
# Global state variables | |
current_page = 0 # Stores the USER-RELATIVE page index | |
ds_iter = None # No longer maintained globally for streaming robustness | |
current_page_data = None # Pandas DataFrame for the current page's data | |
audio_backup = {} # For undo_trim, if needed (simplified) | |
annotation_count = 0 | |
unsaved_changes = {} | |
total_samples = 0 # Total samples in the HF_DATASET_NAME | |
annotator_ranges = {} # Stores {annotator_username: (start_abs_idx, end_abs_idx)} | |
# Pydantic data models | |
class AudioTrim(BaseModel): | |
start: float | |
end: float | |
class Annotation(BaseModel): | |
annotator: str | |
annotated_subtitle: Optional[str] = None | |
audio_trims: Optional[List[AudioTrim]] = None | |
is_first_phase_accepted: bool = False | |
first_phase_reviewer_username: Optional[str] = None | |
second_phase_reviewed_by: Optional[str] = None | |
second_phase_review_status: Optional[str] = None | |
second_phase_review_timestamp: Optional[datetime] = None | |
create_at: datetime | |
update_at: datetime | |
class Sample(BaseModel): | |
id: int # Absolute index in the dataset | |
voice_name: str | |
original_subtitle: str | |
ignore_it: bool = False | |
description: Optional[str] = None | |
annotations: Optional[List[Annotation]] = None | |
is_approved_in_second_phase: bool = False | |
class DatasetModel(BaseModel): | |
samples: Optional[List[Sample]] = None | |
# Utility functions | |
def load_saved_annotations(): | |
dataset_model = None | |
if os.path.exists(SAVE_PATH): | |
try: | |
with open(SAVE_PATH, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
dataset_model = DatasetModel(**data) | |
print("Loaded annotations from local JSON file") | |
except Exception as e: | |
print(f"Error loading local JSON file: {str(e)}. Removing invalid file.") | |
# os.remove(SAVE_PATH) # Be cautious with auto-removing | |
dataset_model = None | |
if dataset_model is None and token: | |
try: | |
hf_path = hf_hub_download( | |
repo_id=HF_DATASET_NAME, | |
filename=SAVE_PATH, | |
repo_type="dataset", | |
token=token | |
) | |
with open(hf_path, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
dataset_model = DatasetModel(**data) | |
with open(SAVE_PATH, "w", encoding="utf-8") as f: # Cache locally | |
f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) | |
print("Loaded annotations from HF dataset repository and cached locally") | |
except Exception as e: | |
print(f"Error loading JSON file from HF repo: {str(e)}") | |
dataset_model = None | |
if dataset_model is None: | |
dataset_model = DatasetModel(samples=[]) | |
print("Created new empty DatasetModel for annotations") | |
return dataset_model | |
def save_annotations(dataset_model: DatasetModel): | |
global annotation_count | |
try: | |
with open(SAVE_PATH, "w", encoding="utf-8") as f: | |
f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) | |
print(f"Saved annotations to {SAVE_PATH}") | |
annotation_count += 1 # This is a simple counter, not total annotations in file | |
if annotation_count % SAVE_INTERVAL == 0 and token: | |
push_json_to_hf() | |
except Exception as e: | |
print(f"Error saving annotations: {str(e)}") | |
def push_json_to_hf(): | |
if not token: | |
print("Cannot push to HF: token not available.") | |
return | |
try: | |
api = HfApi() | |
api.upload_file( | |
path_or_fileobj=SAVE_PATH, | |
path_in_repo=SAVE_PATH, | |
repo_type="dataset", | |
repo_id=HF_DATASET_NAME, | |
token=token | |
) | |
print("Uploaded annotations.json to Hugging Face repository") | |
except Exception as e: | |
print(f"Error uploading JSON file: {str(e)}") | |
def calculate_annotator_ranges(total_samples_val, annotators_list): | |
num_annotators = len(annotators_list) | |
if num_annotators == 0 or total_samples_val <= 0: | |
return {} | |
samples_per_annotator = total_samples_val // num_annotators | |
extra_samples = total_samples_val % num_annotators | |
ranges = {} | |
start_idx = 0 | |
for i, annotator in enumerate(annotators_list): | |
end_idx = start_idx + samples_per_annotator - 1 | |
if i < extra_samples: | |
end_idx += 1 | |
if end_idx >= total_samples_val: | |
end_idx = total_samples_val -1 | |
if start_idx <= end_idx: # Ensure valid range | |
ranges[annotator] = (start_idx, end_idx) | |
start_idx = end_idx + 1 | |
print(f"Calculated annotator ranges: {ranges}") | |
return ranges | |
def initialize_second_phase_assignments(): | |
global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges, total_samples | |
if not ANNOTATORS or len(ANNOTATORS) < 1: | |
print("Not enough annotators for second phase review.") | |
SECOND_PHASE_REVIEW_MAPPING = {} | |
return | |
if not annotator_ranges and total_samples > 0: | |
print("Populating annotator_ranges for second phase initialization (was empty).") | |
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) | |
elif not annotator_ranges and total_samples <= 0: | |
print("Warning: Cannot initialize second phase assignments without total_samples and annotator_ranges.") | |
return | |
if len(ANNOTATORS) == 1: | |
annotator = ANNOTATORS[0] | |
SECOND_PHASE_REVIEW_MAPPING[annotator] = annotator | |
print(f"Second phase: {annotator} will review their own work.") | |
else: | |
for i, reviewer_user in enumerate(ANNOTATORS): # In 2nd phase, ANNOTATORS become reviewers of other ANNOTATORS | |
original_annotator_idx = (i - 1 + len(ANNOTATORS)) % len(ANNOTATORS) | |
original_annotator_user = ANNOTATORS[original_annotator_idx] | |
SECOND_PHASE_REVIEW_MAPPING[reviewer_user] = original_annotator_user | |
print(f"Second phase: {reviewer_user} will review {original_annotator_user}'s work.") | |
for reviewer, original_annotator in SECOND_PHASE_REVIEW_MAPPING.items(): | |
if original_annotator not in annotator_ranges: | |
print(f"Warning: Original annotator {original_annotator} (being reviewed by {reviewer}) has no range defined in annotator_ranges.") | |
def get_user_allowed_range(username): | |
global annotator_ranges, total_samples | |
if SECOND_PHASE: | |
if not SECOND_PHASE_REVIEW_MAPPING: | |
initialize_second_phase_assignments() | |
original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(username) | |
if original_annotator_to_review: | |
if not annotator_ranges and total_samples > 0: | |
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) | |
user_range = annotator_ranges.get(original_annotator_to_review) | |
# print(f"DEBUG: User {username} (reviewer) gets range of {original_annotator_to_review}: {user_range}") | |
return user_range | |
else: | |
# print(f"DEBUG: User {username} not in SECOND_PHASE_REVIEW_MAPPING or no original annotator assigned.") | |
return None | |
else: # First Phase Logic | |
if get_user_role(username) == "reviewer": | |
return (0, total_samples - 1) if total_samples > 0 else None | |
elif username in annotator_ranges: | |
return annotator_ranges[username] | |
else: | |
# print(f"DEBUG: User {username} not a reviewer and not in annotator_ranges for first phase.") | |
return None | |
def is_within_range(absolute_idx, allowed_range): | |
if allowed_range is None: | |
return False | |
return allowed_range[0] <= absolute_idx <= allowed_range[1] | |
def get_user_role(username): | |
return "reviewer" if username in REVIEWERS else "annotator" | |
# init_dataset_iterator is not needed if we load on demand for streaming | |
# def init_dataset_iterator(): ... | |
def get_dataset_info(): | |
global total_samples | |
if total_samples > 0: | |
return {'num_samples': total_samples} | |
try: | |
# For streaming=True, info.num_examples might be unreliable. | |
# Load non-streaming just for info, then can use streaming for actual data. | |
ds_info_obj = load_dataset(HF_DATASET_NAME, split="train", streaming=False) # Load non-streaming for info | |
num_samples_val = ds_info_obj.num_rows | |
if num_samples_val and num_samples_val > 0: | |
total_samples = num_samples_val | |
print(f"Dataset info: total_samples set to {total_samples}") | |
return {'num_samples': total_samples} | |
else: # Fallback if num_rows is not reliable | |
print("Warning: ds_info_obj.num_rows was not positive. Trying iteration for count (may be slow).") | |
# Iterating a large streaming dataset to count is very inefficient. | |
# Consider alternative ways to get total_samples (e.g., hardcode, separate metadata file). | |
# For this example, if num_rows fails, we'll proceed with caution. | |
# If it's critical, this part needs a robust solution for getting total_samples. | |
ds_stream = load_dataset(HF_DATASET_NAME, split="train", streaming=True) | |
count = 0 | |
for _ in ds_stream: | |
count +=1 | |
if count > 0: | |
total_samples = count | |
print(f"Dataset info: total_samples set to {total_samples} by iteration.") | |
return {'num_samples': total_samples} | |
else: | |
print("Warning: Could not determine total_samples from dataset info or iteration.") | |
total_samples = -1 # Indicate failure | |
return {'num_samples': -1} | |
except Exception as e: | |
print(f"Error getting dataset info: {e}") | |
total_samples = -1 | |
return {'num_samples': -1} | |
# Initial data load (moved after functions it calls are defined) | |
dataset_info = get_dataset_info() # This sets global total_samples | |
if total_samples > 0: | |
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) | |
if SECOND_PHASE: # Initialize only if SECOND_PHASE is true from the start | |
initialize_second_phase_assignments() | |
else: | |
print("Warning: total_samples is not positive. Annotation ranges and second phase assignments may be incorrect.") | |
annotator_ranges = {} | |
def get_audio_path(audio_entry): | |
if isinstance(audio_entry, dict): | |
if "array" in audio_entry and "sampling_rate" in audio_entry: | |
return (audio_entry["sampling_rate"], audio_entry["array"]) | |
return audio_entry.get("path", None) | |
if isinstance(audio_entry, str): | |
if audio_entry.startswith("http://") or audio_entry.startswith("https://"): | |
return audio_entry | |
if os.path.exists(audio_entry): | |
return audio_entry | |
if AUDIO_DIR: # Not strictly necessary if paths are always absolute/in dataset | |
joined_path = os.path.join(AUDIO_DIR, audio_entry) | |
if os.path.exists(joined_path): | |
return joined_path | |
return audio_entry # Return as is (e.g. relative path for datasets lib) | |
return None | |
def load_page_data(page_num_within_user_view=0): | |
global current_page_data, current_page, total_samples | |
try: | |
ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) | |
temp_ds_iter = iter(ds) | |
except Exception as e: | |
print(f"Error loading dataset for page data: {e}") | |
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) | |
return current_page_data | |
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) | |
if not user_allowed_range: | |
print(f"User {CURRENT_USERNAME} has no allowed range.") | |
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) | |
return current_page_data | |
user_start_abs, user_end_abs = user_allowed_range | |
num_samples_for_user = user_end_abs - user_start_abs + 1 | |
if num_samples_for_user <= 0: | |
print(f"User {CURRENT_USERNAME} has an invalid or empty allowed range: {user_allowed_range} (num_samples_for_user: {num_samples_for_user})") | |
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) | |
return current_page_data | |
effective_start_idx = user_start_abs + (page_num_within_user_view * PAGE_SIZE) | |
if effective_start_idx > user_end_abs: | |
print(f"Requested page {page_num_within_user_view} (abs start {effective_start_idx}) is beyond user {CURRENT_USERNAME}'s allowed samples end ({user_end_abs}).") | |
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) | |
current_page = page_num_within_user_view # Still update current_page to reflect the attempt | |
return current_page_data | |
effective_end_idx = min(effective_start_idx + PAGE_SIZE - 1, user_end_abs) | |
samples_on_page = [] | |
idx_counter_for_page = 0 | |
print(f"Loading page {page_num_within_user_view} for user {CURRENT_USERNAME}. Effective absolute range for this page: [{effective_start_idx}-{effective_end_idx}] from user range [{user_start_abs}-{user_end_abs}]") | |
current_dataset_absolute_idx = -1 | |
skipped_count = 0 | |
for sample_data in temp_ds_iter: | |
current_dataset_absolute_idx += 1 | |
if current_dataset_absolute_idx < effective_start_idx: | |
skipped_count += 1 | |
if skipped_count % 1000 == 0: # Log progress of skipping if it's a lot | |
print(f" Skipping... at abs_idx {current_dataset_absolute_idx}, target start {effective_start_idx}") | |
continue | |
if current_dataset_absolute_idx > effective_end_idx: | |
break | |
sample_data['absolute_idx'] = current_dataset_absolute_idx | |
sample_data['id_within_page'] = idx_counter_for_page | |
samples_on_page.append(sample_data) | |
idx_counter_for_page +=1 | |
if skipped_count > 0: print(f" Finished skipping {skipped_count} samples.") | |
current_page = page_num_within_user_view | |
if samples_on_page: | |
current_page_data = pd.DataFrame(samples_on_page) | |
print(f"Loaded {len(samples_on_page)} samples for page {page_num_within_user_view}. First abs_idx: {samples_on_page[0]['absolute_idx']}, Last abs_idx: {samples_on_page[-1]['absolute_idx']}.") | |
else: | |
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) | |
print(f"No samples found for user {CURRENT_USERNAME} on their page {page_num_within_user_view} (effective absolute range {effective_start_idx}-{effective_end_idx})") | |
gc.collect() | |
return current_page_data | |
# Core functions | |
def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_action, accepted_flag=False): | |
global current_page_data, unsaved_changes | |
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): | |
return "Invalid index or data not loaded for current page." | |
actual_sample_info = current_page_data.iloc[idx_on_page] | |
absolute_idx = actual_sample_info['absolute_idx'] | |
# In first phase, range check is important. | |
# In second phase, page loading itself should restrict to allowed samples. | |
if not SECOND_PHASE: | |
allowed_range = get_user_allowed_range(current_user_performing_action) | |
if not is_within_range(absolute_idx, allowed_range): | |
return f"You are not allowed to annotate this sample {absolute_idx} (out of range {allowed_range})." | |
audio_entry_original = actual_sample_info["audio"] | |
voice_name = os.path.basename(str(get_audio_path(audio_entry_original) or f"sample_{absolute_idx}")) | |
dataset_model = load_saved_annotations() | |
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) | |
if not sample: | |
sample = Sample( | |
id=absolute_idx, | |
voice_name=voice_name, | |
original_subtitle=actual_sample_info["sentence"], | |
annotations=[] | |
) | |
dataset_model.samples = dataset_model.samples or [] | |
dataset_model.samples.append(sample) | |
now = datetime.now() | |
annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) # Find by actual annotator | |
if get_user_role(current_user_performing_action) == "reviewer" and not SECOND_PHASE : # First phase reviewer action | |
# Reviewer might be acting on another's annotation or making their own. | |
# If accepted_flag is true, they are "accepting" *some* annotation for this sample. | |
# The current model is: if a reviewer saves, their input becomes *an* annotation. | |
# If they check "accept", this annotation is marked as accepted. | |
if annotation: | |
annotation.annotated_subtitle = transcript.strip() | |
annotation.update_at = now | |
annotation.is_first_phase_accepted = accepted_flag | |
annotation.first_phase_reviewer_username = current_user_performing_action if accepted_flag else None | |
else: | |
annotation = Annotation( | |
annotator=current_user_performing_action, | |
annotated_subtitle=transcript.strip(), | |
create_at=now, | |
update_at=now, | |
is_first_phase_accepted=accepted_flag, | |
first_phase_reviewer_username=current_user_performing_action if accepted_flag else None | |
) | |
sample.annotations = sample.annotations or [] | |
sample.annotations.append(annotation) | |
else: # Annotator in first phase, or any user in a context where the simple save applies | |
if annotation: | |
annotation.annotated_subtitle = transcript.strip() | |
annotation.update_at = now | |
# Annotators cannot set first_phase_accepted themselves | |
else: | |
annotation = Annotation( | |
annotator=current_user_performing_action, | |
annotated_subtitle=transcript.strip(), | |
create_at=now, | |
update_at=now, | |
is_first_phase_accepted=False # Default for new annotations by non-reviewers or non-accepting reviewers | |
) | |
sample.annotations = sample.annotations or [] | |
sample.annotations.append(annotation) | |
if absolute_idx in unsaved_changes: # Clear if it was marked as unsaved | |
del unsaved_changes[absolute_idx] | |
save_annotations(dataset_model) | |
return f"✓ Saved annotation for sample {absolute_idx}" | |
def handle_second_phase_action(page_idx, idx_on_page, action: str): | |
global current_page_data, CURRENT_USERNAME | |
if not SECOND_PHASE: | |
return "Not in second phase." | |
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): | |
return "Invalid index or data not loaded for current page (second phase)." | |
actual_sample_info = current_page_data.iloc[idx_on_page] | |
absolute_idx = actual_sample_info['absolute_idx'] | |
original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(CURRENT_USERNAME) | |
if not original_annotator_to_review: | |
return f"User {CURRENT_USERNAME} is not assigned to review any user's work in SECOND_PHASE_REVIEW_MAPPING." | |
dataset_model = load_saved_annotations() | |
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) | |
if not sample: | |
return f"Error: Sample {absolute_idx} not found in annotations.json for review." | |
annotation_to_review = next((ann for ann in sample.annotations or [] if ann.annotator == original_annotator_to_review), None) | |
if not annotation_to_review: | |
print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Creating placeholder for review.") | |
annotation_to_review = Annotation( | |
annotator=original_annotator_to_review, | |
annotated_subtitle=sample.original_subtitle, | |
create_at=datetime.now(), # Or try to find other annotation's timestamp | |
update_at=datetime.now() | |
) | |
sample.annotations = sample.annotations or [] | |
sample.annotations.append(annotation_to_review) | |
annotation_to_review.second_phase_reviewed_by = CURRENT_USERNAME | |
annotation_to_review.second_phase_review_status = action | |
annotation_to_review.second_phase_review_timestamp = datetime.now() | |
annotation_to_review.update_at = datetime.now() | |
if action == "approved": | |
sample.is_approved_in_second_phase = True | |
# else: # If rejected, is_approved_in_second_phase remains as is or set to False | |
# sample.is_approved_in_second_phase = False # Explicitly set to False on rejection | |
save_annotations(dataset_model) | |
return f"✓ Review ({action}) saved for sample {absolute_idx} (Original annotator: {original_annotator_to_review})" | |
def get_sample(page_idx_user_relative, idx_on_page, current_user_displaying): | |
global current_page_data, total_samples | |
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): | |
return None, "", f"Invalid index ({idx_on_page}) for current page data (len {len(current_page_data) if current_page_data is not None else 'None'}).", "unreviewed", "white", True, False, "" | |
actual_sample_info = current_page_data.iloc[idx_on_page] | |
absolute_idx = actual_sample_info['absolute_idx'] | |
audio_entry_original = actual_sample_info["audio"] | |
audio_val = get_audio_path(audio_entry_original) | |
default_transcript = actual_sample_info["sentence"] | |
transcript_to_display = default_transcript | |
ui_reviewer_field = "unreviewed" | |
ui_color = "white" | |
ui_editable = True | |
ui_is_accepted_flag = False | |
# Build status message | |
status_prefix = "" | |
user_allowed_range = get_user_allowed_range(current_user_displaying) | |
if user_allowed_range: | |
user_start_abs, user_end_abs = user_allowed_range | |
current_sample_num_in_user_assignment = absolute_idx - user_start_abs + 1 | |
total_samples_for_user = user_end_abs - user_start_abs + 1 | |
status_prefix = f"Sample {current_sample_num_in_user_assignment} of {total_samples_for_user} for you (Abs Idx {absolute_idx})." | |
else: | |
status_prefix = f"Sample (Abs Idx {absolute_idx})." | |
dataset_model = load_saved_annotations() | |
sample_from_json = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) | |
if sample_from_json: | |
if sample_from_json.ignore_it: | |
audio_val = None | |
transcript_to_display = "AUDIO DELETED (This audio has been removed.)" | |
ui_reviewer_field = "deleted" | |
ui_color = "red" | |
ui_editable = False | |
elif SECOND_PHASE: | |
ui_editable = False | |
original_annotator_being_reviewed = SECOND_PHASE_REVIEW_MAPPING.get(current_user_displaying) | |
if not original_annotator_being_reviewed: | |
transcript_to_display = "Error: You are not mapped to review any user." | |
ui_color = "red" | |
ui_reviewer_field = "Error" | |
else: | |
ui_reviewer_field = f"Reviewing: {original_annotator_being_reviewed}" | |
annotation_under_review = next((ann for ann in sample_from_json.annotations or [] if ann.annotator == original_annotator_being_reviewed), None) | |
if annotation_under_review: | |
transcript_to_display = annotation_under_review.annotated_subtitle or default_transcript | |
ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved") # Reflects the action by THIS reviewer IF they reviewed | |
if annotation_under_review.second_phase_reviewed_by: | |
if annotation_under_review.second_phase_reviewed_by == current_user_displaying: | |
ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange" | |
else: | |
ui_color = "gray" | |
ui_reviewer_field += f" (Already reviewed by {annotation_under_review.second_phase_reviewed_by} as {annotation_under_review.second_phase_review_status})" | |
else: | |
ui_color = "yellow" # Pending this user's review | |
else: | |
transcript_to_display = default_transcript | |
ui_reviewer_field += " (No submission by original annotator)" | |
ui_color = "lightgray" | |
else: # First Phase Logic | |
accepted_first_phase_annotation = next((a for a in sample_from_json.annotations or [] if a.is_first_phase_accepted and a.first_phase_reviewer_username), None) | |
if accepted_first_phase_annotation: | |
transcript_to_display = accepted_first_phase_annotation.annotated_subtitle or default_transcript | |
ui_reviewer_field = f"Accepted by: {accepted_first_phase_annotation.first_phase_reviewer_username}" | |
ui_color = "green" | |
ui_is_accepted_flag = True | |
ui_editable = (get_user_role(current_user_displaying) == "reviewer") | |
else: | |
user_specific_annotation = next((a for a in sample_from_json.annotations or [] if a.annotator == current_user_displaying), None) | |
if user_specific_annotation: | |
transcript_to_display = user_specific_annotation.annotated_subtitle or default_transcript | |
ui_reviewer_field = f"Your draft (as {user_specific_annotation.annotator})" | |
ui_color = "yellow" | |
ui_editable = True | |
else: | |
other_annotations = [a for a in sample_from_json.annotations or [] if not a.is_first_phase_accepted] # any unaccepted | |
if other_annotations: | |
# If current user is reviewer, they see the first other annotator's work | |
if get_user_role(current_user_displaying) == "reviewer": | |
other_ann_to_show = other_annotations[0] | |
transcript_to_display = other_ann_to_show.annotated_subtitle or default_transcript | |
ui_reviewer_field = f"Draft by: {other_ann_to_show.annotator}" | |
ui_color = "blue" | |
ui_editable = True | |
else: # Annotator sees "labeled by another" if they didn't do it | |
transcript_to_display = default_transcript # Show original if not their work | |
ui_reviewer_field = f"Labeled by: {other_annotations[0].annotator}" | |
ui_color = "lightblue" | |
ui_editable = False # Cannot edit others' unreviewed work if you are also annotator | |
# else default_transcript, unreviewed, white, editable=True already set | |
# If absolute_idx in unsaved_changes, it's a visual cue, actual text is already from above logic. | |
if not SECOND_PHASE and absolute_idx in unsaved_changes: | |
ui_color = "pink" # Overrides previous color if unsaved changes by current user | |
ui_status_message = f"{status_prefix} Page {page_idx_user_relative + 1} (User-view)." | |
if SECOND_PHASE: | |
ui_status_message += " (Review Phase)" | |
else: | |
ui_status_message += " (Annotation Phase)" | |
# For reviewer checkbox in first phase | |
show_accept_checkbox = not SECOND_PHASE and get_user_role(current_user_displaying) == "reviewer" | |
return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript, gr.update(visible=show_accept_checkbox) | |
def load_interface_data(page_idx_user_relative, idx_on_page): | |
audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text, accept_cb_update = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) | |
return ( | |
page_idx_user_relative, | |
idx_on_page, | |
audio, | |
gr.update(value=text, interactive=editable), | |
gr.update(value=saved_reviewer_text, elem_classes=[color]), | |
base_status, | |
original_dataset_text, | |
accept_cb_update, # For the first_phase_accept_cb visibility | |
accepted_flag # For the first_phase_accept_cb value state | |
) | |
def navigate_sample(page_idx_user_relative, idx_on_page, direction: int): | |
global current_page_data | |
if current_page_data is None or len(current_page_data) == 0: | |
# This case might happen if initial load failed or user has no samples | |
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) | |
err_msg = "No data loaded. Try reloading or check your assigned range." | |
if not user_allowed_range or user_allowed_range[0] > user_allowed_range[1]: | |
err_msg = "You have no samples assigned or your range is invalid." | |
# Return a state that indicates error but doesn't crash UI | |
# The number of outputs must match `navigation_outputs_extended` | |
return page_idx_user_relative, idx_on_page, None, gr.update(value="Error", interactive=False), gr.update(value="Error"), err_msg, "", gr.update(visible=False), False | |
target_idx_on_page = idx_on_page + direction | |
new_page_idx_user_relative = page_idx_user_relative | |
new_idx_on_page = target_idx_on_page | |
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) | |
if not user_allowed_range: # Should not happen if page data exists, but as a safeguard | |
return page_idx_user_relative, idx_on_page, gr.update(), gr.update(), gr.update(), "Error: No allowed range for navigation.", gr.update(), gr.update(visible=False), False | |
if target_idx_on_page < 0: | |
if page_idx_user_relative > 0: | |
new_page_idx_user_relative = page_idx_user_relative - 1 | |
temp_data = load_page_data(new_page_idx_user_relative) # This updates global current_page_data | |
if temp_data is not None and not temp_data.empty: | |
new_idx_on_page = len(temp_data) - 1 | |
else: # Prev user-relative page is empty (should not happen if page_idx_user_relative > 0 and ranges are correct) | |
# Stay on current sample, show message | |
audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) | |
status = status + " [Already at the first sample of this page/range]" | |
return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag | |
else: # Already on first item of first user-relative page | |
audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) | |
status = status + " [At the beginning of your assigned samples]" | |
return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag | |
elif target_idx_on_page >= len(current_page_data): | |
# Try to go to next user-relative page | |
new_page_idx_user_relative = page_idx_user_relative + 1 | |
temp_data = load_page_data(new_page_idx_user_relative) # Updates global current_page_data | |
if temp_data is not None and not temp_data.empty: | |
new_idx_on_page = 0 | |
else: # Next user-relative page is empty (means we are at the end of user's allowed samples) | |
current_abs_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] | |
is_at_very_end = user_allowed_range and current_abs_idx >= user_allowed_range[1] | |
audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) | |
if is_at_very_end: | |
status = status + " [At the end of your assigned samples]" | |
else: # Should ideally not be hit if temp_data is empty after trying next page | |
status = status + " [No more samples in this direction (next page empty)]" | |
return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag | |
return load_interface_data(new_page_idx_user_relative, new_idx_on_page) | |
def go_next_sample_wrapper(page_idx_user_relative, idx_on_page): | |
return navigate_sample(page_idx_user_relative, idx_on_page, 1) | |
def go_prev_sample_wrapper(page_idx_user_relative, idx_on_page): | |
return navigate_sample(page_idx_user_relative, idx_on_page, -1) | |
def save_and_next_sample_first_phase(page_idx_user_relative, idx_on_page, current_text, is_accepted_by_reviewer_flag): | |
user_is_reviewer = get_user_role(CURRENT_USERNAME) == "reviewer" | |
accepted_to_save = is_accepted_by_reviewer_flag if user_is_reviewer else False | |
save_msg = save_sample_data(page_idx_user_relative, idx_on_page, current_text, CURRENT_USERNAME, accepted_flag=accepted_to_save) | |
print(save_msg) | |
# After saving, navigate to the next sample | |
# The navigation outputs will reload the UI for the next sample | |
return navigate_sample(page_idx_user_relative, idx_on_page, 1) | |
def review_and_next_sample_second_phase(page_idx_user_relative, idx_on_page, review_action: str): | |
feedback_msg = handle_second_phase_action(page_idx_user_relative, idx_on_page, review_action) | |
print(feedback_msg) | |
return navigate_sample(page_idx_user_relative, idx_on_page, 1) | |
def jump_to_absolute_idx(target_abs_idx_str, current_page_idx_user_relative, current_idx_on_page): | |
global current_page_data | |
try: | |
target_abs_idx = int(target_abs_idx_str) | |
if target_abs_idx < 0: target_abs_idx = 0 | |
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) | |
if not user_allowed_range or not is_within_range(target_abs_idx, user_allowed_range): | |
status_msg = f"Target index {target_abs_idx} is outside your assigned range {user_allowed_range or 'N/A'}." | |
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) | |
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc | |
user_start_abs, _ = user_allowed_range | |
offset_from_user_start = target_abs_idx - user_start_abs | |
if offset_from_user_start < 0: | |
status_msg = f"Logic Error: Target index {target_abs_idx} has negative offset from user start {user_start_abs}." | |
print(status_msg) | |
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) | |
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc | |
new_user_relative_page_idx = offset_from_user_start // PAGE_SIZE | |
temp_page_data_df = load_page_data(new_user_relative_page_idx) # This updates global current_page_data | |
if temp_page_data_df is None or temp_page_data_df.empty: | |
status_msg = f"No data found for your page {new_user_relative_page_idx} (containing abs index {target_abs_idx})." | |
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) | |
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc | |
matching_rows = current_page_data[current_page_data['absolute_idx'] == target_abs_idx] | |
if not matching_rows.empty: | |
new_idx_on_page_actual = matching_rows.iloc[0]['id_within_page'] | |
else: | |
status_msg = f"Index {target_abs_idx} (your page {new_user_relative_page_idx}) in range, but not found on loaded page. Displaying start of page." | |
print(status_msg) | |
new_idx_on_page_actual = 0 | |
if current_page_data.empty : | |
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) | |
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg + " (Page empty)", orig_txt, cb_vis, acc | |
return load_interface_data(new_user_relative_page_idx, new_idx_on_page_actual) | |
except ValueError: | |
status_msg = "Invalid index format for jump." | |
except Exception as e: | |
import traceback | |
status_msg = f"Error jumping to index: {str(e)}" | |
print(f"{status_msg}\n{traceback.format_exc()}") | |
# Fallback for errors | |
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) | |
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc | |
# Audio editing functions | |
def trim_audio_action(page_idx_user_relative, idx_on_page, trim_start_str, trim_end_str): | |
# Outputs must match navigation_outputs_extended | |
default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], # page, idx, audio, transcript, reviewer | |
load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", # status | |
*load_interface_data(page_idx_user_relative, idx_on_page)[6:]) # original_text, cb_vis, cb_val | |
if SECOND_PHASE: return default_return("Trimming disabled in Review Phase.") | |
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): | |
return default_return("Audio data not available (page error).") | |
audio_entry_from_df = current_page_data.iloc[idx_on_page]["audio"] | |
original_audio_path_info = get_audio_path(audio_entry_from_df) # This could be (sr, array) or path str | |
# Trimming requires a loadable file path. If it's raw array, pydub needs it saved first or handled differently. | |
# For simplicity, assume get_audio_path gives a string path if it's not raw data. | |
if not isinstance(original_audio_path_info, str) or not (os.path.exists(original_audio_path_info) or original_audio_path_info.startswith("http")): | |
# If it is raw data, we might need to save it to a temp file to use pydub.from_file | |
# Or, if pydub can handle the array directly (it can via AudioSegment(data=..., sample_width=..., frame_rate=..., channels=...)) | |
# This part needs more robust handling if original_audio_path_info is (sr, array) often. | |
# For now, only proceed if it's a usable path. | |
if isinstance(original_audio_path_info, tuple): # Raw data | |
return default_return("Trimming raw audio array directly not yet implemented via pydub.from_file. Save first or use array ops.") | |
return default_return("Trimming not supported for this audio source or it's not a local/remote file.") | |
absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] | |
# voice_name_original = os.path.basename(original_audio_path_info if isinstance(original_audio_path_info, str) else f"sample_{absolute_idx}") | |
voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}")) | |
try: | |
# If original_audio_path_info is a URL, pydub might need it downloaded first. | |
# Let's assume local paths for simplicity of example with from_file. | |
# For URLs, you'd fetch then load. | |
temp_dir_for_download = None | |
audio_to_load = original_audio_path_info | |
if isinstance(original_audio_path_info, str) and original_audio_path_info.startswith("http"): | |
temp_dir_for_download = tempfile.mkdtemp() | |
# Simple way to get filename from URL for extension | |
url_fname = original_audio_path_info.split("/")[-1].split("?")[0] | |
local_fpath = os.path.join(temp_dir_for_download, url_fname or "downloaded_audio") | |
import requests | |
response = requests.get(original_audio_path_info, stream=True) | |
response.raise_for_status() | |
with open(local_fpath, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
audio_to_load = local_fpath | |
audio_seg = AudioSegment.from_file(audio_to_load) | |
start_ms = int(float(trim_start_str) * 1000) | |
end_ms = int(float(trim_end_str) * 1000) | |
trimmed_seg = audio_seg[start_ms:end_ms] | |
os.makedirs("trimmed_audio", exist_ok=True) | |
# Sanitize voice_name_original further if it comes from URL or complex path | |
safe_voice_name = re.sub(r'[^\w.-]', '_', voice_name_original) | |
trimmed_filename = f"trimmed_{absolute_idx}_{safe_voice_name}" | |
if not os.path.splitext(trimmed_filename)[1]: trimmed_filename += ".wav" # ensure extension | |
trimmed_path = os.path.join("trimmed_audio", trimmed_filename) | |
export_format = os.path.splitext(trimmed_path)[1][1:] | |
if not export_format: export_format = "wav" | |
trimmed_seg.export(trimmed_path, format=export_format) | |
if temp_dir_for_download: # Cleanup downloaded file | |
shutil.rmtree(temp_dir_for_download) | |
dataset_model = load_saved_annotations() | |
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) | |
if not sample: return default_return(f"Error: Sample {absolute_idx} not found in annotations for trimming.") | |
now = datetime.now() | |
annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) | |
if not annotation: | |
annotation = Annotation(annotator=CURRENT_USERNAME, create_at=now, update_at=now) # Subtitle from UI | |
sample.annotations = sample.annotations or [] | |
sample.annotations.append(annotation) | |
annotation.audio_trims = [AudioTrim(start=float(trim_start_str), end=float(trim_end_str))] | |
annotation.update_at = now | |
# The transcript itself isn't changed by trim, user saves that separately. | |
save_annotations(dataset_model) | |
# Update UI to play the new trimmed_path | |
current_ui_state = load_interface_data(page_idx_user_relative, idx_on_page) | |
return (current_ui_state[0], current_ui_state[1], trimmed_path, # page,idx, new audio | |
current_ui_state[3], current_ui_state[4], # transcript, reviewer | |
current_ui_state[5] + " [Trimmed]", # status | |
*current_ui_state[6:]) # original_text, cb_vis, cb_val | |
except Exception as e: | |
if temp_dir_for_download and os.path.exists(temp_dir_for_download): # Ensure cleanup on error too | |
shutil.rmtree(temp_dir_for_download) | |
return default_return(f"Error trimming audio: {str(e)}") | |
def undo_trim_action(page_idx_user_relative, idx_on_page): | |
default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], | |
load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", | |
*load_interface_data(page_idx_user_relative, idx_on_page)[6:]) | |
if SECOND_PHASE: return default_return("Undo Trim disabled in Review Phase.") | |
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): | |
return default_return("Audio data not available (page error).") | |
absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] | |
dataset_model = load_saved_annotations() | |
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) | |
if sample: | |
annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) | |
if annotation and annotation.audio_trims: | |
annotation.audio_trims = None | |
annotation.update_at = datetime.now() | |
save_annotations(dataset_model) | |
# Restore original audio path from current_page_data | |
original_audio_entry = current_page_data.iloc[idx_on_page]["audio"] | |
restored_audio_val = get_audio_path(original_audio_entry) | |
current_ui_state = load_interface_data(page_idx_user_relative, idx_on_page) | |
return (current_ui_state[0], current_ui_state[1], restored_audio_val, # page,idx, restored audio | |
current_ui_state[3], current_ui_state[4], # transcript, reviewer | |
current_ui_state[5] + " [Trim undone]", # status | |
*current_ui_state[6:]) | |
def confirm_delete_audio_action(page_idx_user_relative, idx_on_page): | |
default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], | |
load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", | |
*load_interface_data(page_idx_user_relative, idx_on_page)[6:]) | |
if SECOND_PHASE: return default_return("Delete disabled in Review Phase.") | |
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): | |
return default_return("Audio data not available (page error).") | |
absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] | |
voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}")) | |
dataset_model = load_saved_annotations() | |
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) | |
if not sample: | |
sample = Sample( | |
id=absolute_idx, | |
voice_name=voice_name_original, | |
original_subtitle=current_page_data.iloc[idx_on_page]["sentence"], | |
annotations=[] | |
) | |
dataset_model.samples = dataset_model.samples or [] # Ensure list exists | |
dataset_model.samples.append(sample) | |
sample.ignore_it = True | |
now = datetime.now() | |
deleted_text_marker = "AUDIO DELETED (This audio has been removed.)" | |
annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) | |
if annotation: | |
annotation.annotated_subtitle = deleted_text_marker | |
annotation.audio_trims = None | |
annotation.update_at = now | |
else: | |
annotation = Annotation(annotator=CURRENT_USERNAME, annotated_subtitle=deleted_text_marker, create_at=now, update_at=now) | |
sample.annotations = sample.annotations or [] | |
sample.annotations.append(annotation) | |
save_annotations(dataset_model) | |
# After deleting, reload the interface for this item, which will show it as deleted | |
return load_interface_data(page_idx_user_relative, idx_on_page) | |
# Export functions | |
def sanitize_string(s): | |
if not isinstance(s, str): s = str(s) | |
return re.sub(r'[^\w-./]', '_', s) | |
def sanitize_sentence(s): | |
if not isinstance(s, str): s = str(s) | |
# Basic sanitization, consider more robust methods if complex characters are common | |
return s.encode('utf-8', errors='ignore').decode('utf-8') | |
def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token_val=None): | |
if not token_val: | |
print("Cannot push to hub: No token provided for push_to_hub_with_retry.") | |
return | |
print(f"Pushing dataset to {repo_id}") | |
dataset_dict.push_to_hub(repo_id, private=private, token=token_val) | |
def export_to_huggingface(repo_name_str, hf_token_for_export, progress=gr.Progress()): | |
if not hf_token_for_export: | |
return "Export failed: Hugging Face token is missing." | |
if not repo_name_str or len(repo_name_str.split('/')) != 2: | |
return "Export failed: Repository name must be in 'username/dataset-name' format." | |
try: | |
start_time = time.time() | |
# repo_name_str = sanitize_string(repo_name_str) # Sanitization might be too aggressive for HF repo names | |
print(f"Export started at {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
dataset_model_annotations = load_saved_annotations() | |
if total_samples <= 0: # Use global total_samples | |
# Try to fetch again if not set | |
info = get_dataset_info() | |
if total_samples <= 0: | |
return "Export failed: Total number of samples is unknown or invalid." | |
# Load source dataset non-streamed for easier full iteration | |
ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False) | |
exported_data_list = [] | |
progress(0, f"Preparing {total_samples} samples for export...") | |
num_processed_from_source = 0 | |
for i, source_sample in enumerate(ds_source): | |
if i >= total_samples: break # Should not happen if ds_source.num_rows matches total_samples | |
num_processed_from_source +=1 | |
absolute_idx = i # Assuming source gives ordered samples matching index | |
audio_entry = source_sample.get("audio") # This is {path: str} or {array:..., sampling_rate:...} | |
sentence_val = source_sample.get("sentence", "") | |
# Base audio data for export (can be path string or dict for Audio feature) | |
# If source_sample['audio'] is like {'path': '...', 'bytes': None}, datasets lib handles it. | |
# If it's {'array': ..., 'sampling_rate': ...}, it's also fine. | |
audio_dict_to_export = audio_entry | |
annotation_data = next((s for s in dataset_model_annotations.samples or [] if s.id == absolute_idx), None) | |
if annotation_data: | |
if annotation_data.ignore_it: | |
sentence_val = "AUDIO DELETED (This audio has been removed.)" | |
# Represent deleted audio: empty array or specific silent audio path | |
audio_dict_to_export = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} | |
else: | |
best_ann = None | |
if annotation_data.annotations: | |
approved_anns = [a for a in annotation_data.annotations if a.second_phase_review_status == "approved"] | |
if SECOND_PHASE and approved_anns: # Priority to 2nd phase approved | |
best_ann = sorted(approved_anns, key=lambda x: x.second_phase_review_timestamp, reverse=True)[0] if approved_anns else None | |
if not best_ann: # Then 1st phase accepted | |
accepted_anns = [a for a in annotation_data.annotations if a.is_first_phase_accepted] | |
best_ann = sorted(accepted_anns, key=lambda x: x.update_at, reverse=True)[0] if accepted_anns else None | |
if not best_ann: # Fallback to latest annotation if no formal approval/acceptance | |
best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0] | |
if best_ann: | |
sentence_val = best_ann.annotated_subtitle if best_ann.annotated_subtitle is not None else sentence_val | |
if best_ann.audio_trims and audio_dict_to_export: # If audio exists and is trimmed | |
# Reconstruct trimmed audio path (must be consistent with saving) | |
original_voice_name_for_trim = os.path.basename(str(get_audio_path(audio_entry) or f"sample_{absolute_idx}")) | |
safe_voice_name_for_trim = re.sub(r'[^\w.-]', '_', original_voice_name_for_trim) | |
trimmed_fname_base = f"trimmed_{absolute_idx}_{safe_voice_name_for_trim}" | |
# Try common extensions or use one stored in AudioTrim if available | |
potential_trimmed_path = os.path.join("trimmed_audio", trimmed_fname_base + ".wav") # Assume wav for now | |
if os.path.exists(potential_trimmed_path): | |
# Load trimmed audio into array/sr format for export | |
arr, sr = sf.read(potential_trimmed_path) | |
audio_dict_to_export = {"array": arr, "sampling_rate": sr} | |
else: | |
print(f"Warning: Trimmed audio file {potential_trimmed_path} not found for sample {absolute_idx}. Exporting original/untrimmed.") | |
exported_data_list.append({ | |
"audio": audio_dict_to_export, | |
"sentence": sanitize_sentence(sentence_val) | |
}) | |
if (i + 1) % 100 == 0: | |
progress((i + 1) / total_samples, f"Processed {i+1}/{total_samples} samples") | |
gc.collect() | |
if num_processed_from_source != total_samples: | |
print(f"Warning: Processed {num_processed_from_source} from source, but expected total_samples {total_samples}.") | |
if not exported_data_list: | |
return "No data to export after processing." | |
# Ensure audio format is compatible before creating Dataset | |
for item in exported_data_list: | |
if item["audio"] is None: # Should have been replaced by placeholder if deleted | |
item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} | |
elif isinstance(item["audio"], dict) and 'path' in item["audio"] and item["audio"]['path'] is None: # Path is None, but dict exists | |
item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Replace with placeholder | |
try: | |
final_dataset = Dataset.from_list(exported_data_list) | |
final_dataset = final_dataset.cast_column("audio", Audio(sampling_rate=16000)) # Specify common sampling rate | |
except Exception as e_cast: | |
# Detailed error for casting issues | |
print(f"Error during Dataset.from_list or cast_column: {e_cast}") | |
# Inspect a few items from exported_data_list if casting fails | |
for idx, problematic_item in enumerate(exported_data_list[:5]): | |
print(f"Sample item {idx} for export: Audio type {type(problematic_item['audio'])}, Audio content: {str(problematic_item['audio'])[:200]}") | |
return f"Export failed during data conversion: {e_cast}. Check audio data formats." | |
dataset_dict_export = DatasetDict({"train": final_dataset}) | |
progress(0.95, "Uploading to Hugging Face...") | |
target_repo_id = f"{whoami(token=hf_token_for_export)['name']}/{repo_name_str.split('/')[-1]}" # Ensures user owns repo | |
push_to_hub_with_retry( | |
dataset_dict=dataset_dict_export, | |
repo_id=target_repo_id, # Use sanitized and correctly owned repo ID | |
private=True, | |
token_val=hf_token_for_export | |
) | |
end_time = time.time() | |
print(f"Upload done, total time: {end_time - start_time:.2f}s") | |
progress(1.0, "Upload complete!") | |
return f"Exported to huggingface.co/datasets/{target_repo_id}" | |
except Exception as e: | |
import traceback | |
error_msg = f"Export failed: {str(e)}" | |
print(f"{error_msg}\n{traceback.format_exc()}") | |
return error_msg | |
# Login function | |
def hf_login(hf_token_val): | |
global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges, SECOND_PHASE_REVIEW_MAPPING | |
if not hf_token_val: | |
# Keep main_container hidden. Return tuple must match login_outputs. | |
# The number of outputs is 16. | |
return (gr.update(visible=True), gr.update(visible=False), # login_container, main_container | |
gr.update(value="N/A"), hf_token_val, "Login failed: Token cannot be empty.", # reviewer_tb, hf_token_state, login_message | |
# Visibility updates (all default to not visible or specific phase logic) | |
gr.update(visible=not SECOND_PHASE), gr.update(interactive=not SECOND_PHASE), # save_next, transcript_tb | |
gr.update(visible=not SECOND_PHASE), gr.update(visible=not SECOND_PHASE), gr.update(visible=not SECOND_PHASE), # trim, undo, delete | |
gr.update(visible=False), # first_phase_accept_cb (depends on role) | |
gr.update(visible=SECOND_PHASE), gr.update(visible=SECOND_PHASE), # approve, reject | |
# Initial data states | |
0, 0, None, gr.update(value=""), # page_idx, idx_on_page, audio_player, transcript_tb value | |
"Please log in.", "") # status_md, original_transcript_state | |
try: | |
user_info = whoami(token=hf_token_val) | |
username = user_info['name'] | |
if username in ALLOWED_USERS: | |
CURRENT_USERNAME = username | |
token = hf_token_val # Store validated token | |
# (Re)-initialize dataset info and user-specific ranges/assignments | |
ds_info = get_dataset_info() # Sets global total_samples | |
if total_samples <= 0: | |
# Return tuple must match login_outputs. | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="Error"), hf_token_val, "Login OK, but failed to get dataset size. Cannot proceed.", | |
*( [gr.update(visible=False)] * 8 ), # visibility updates | |
0,0,None,gr.update(value=""), "Error", "") | |
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) | |
if SECOND_PHASE: | |
initialize_second_phase_assignments() # Depends on ANNOTATORS and ranges | |
user_allowed_range_check = get_user_allowed_range(CURRENT_USERNAME) | |
if not user_allowed_range_check or user_allowed_range_check[0] > user_allowed_range_check[1]: | |
# Return tuple must match login_outputs. | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="Error"), hf_token_val, f"Login OK, but user {CURRENT_USERNAME} has no samples assigned for {'review' if SECOND_PHASE else 'annotation'}.", | |
*( [gr.update(visible=False)] * 8 ), | |
0,0,None,gr.update(value=""), "Error: No samples assigned.", "") | |
# Load initial page data (user-relative page 0) | |
current_page_data = load_page_data(0) # Page 0 for this user | |
is_second_phase_active = SECOND_PHASE | |
user_is_first_phase_reviewer = not is_second_phase_active and get_user_role(CURRENT_USERNAME) == "reviewer" | |
# Initial load for UI elements | |
# load_interface_data returns: page_idx, idx_on_page, audio, transcript_update, reviewer_update, status, orig_text, cb_visibility_update, cb_value | |
initial_load_tuple = load_interface_data(0, 0) # User-relative page 0, item 0 | |
return ( | |
gr.update(visible=False), # login_container | |
gr.update(visible=True), # main_container | |
initial_load_tuple[4], # reviewer_tb gr.update object | |
hf_token_val, # hf_token_state | |
f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if is_second_phase_active else 'Annotation'}.", # login_message | |
gr.update(visible=not is_second_phase_active), # save_next_button | |
gr.update(interactive=not is_second_phase_active), # transcript_tb interactivity | |
gr.update(visible=not is_second_phase_active), # trim_button | |
gr.update(visible=not is_second_phase_active), # undo_trim_button | |
gr.update(visible=not is_second_phase_active), # delete_button | |
initial_load_tuple[7], # first_phase_accept_cb visibility update | |
gr.update(visible=is_second_phase_active), # approve_button | |
gr.update(visible=is_second_phase_active), # reject_button | |
initial_load_tuple[0], # page_idx_state | |
initial_load_tuple[1], # idx_on_page_state | |
initial_load_tuple[2], # audio_player | |
initial_load_tuple[3], # transcript_tb (already a gr.update obj with value & interactivity) | |
initial_load_tuple[5], # status_md | |
initial_load_tuple[6], # original_transcript_state | |
# The first_phase_accept_cb value is handled by its direct output connection later | |
) | |
else: # User not in ALLOWED_USERS | |
CURRENT_USERNAME = None | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="N/A"), hf_token_val, "User not authorized!", | |
*( [gr.update(visible=False)] * 8 ), | |
0,0,None,gr.update(value=""),"Not Authorized","") | |
except Exception as e: | |
CURRENT_USERNAME = None | |
import traceback | |
login_err_msg = f"Login failed: {str(e)}" | |
print(f"{login_err_msg}\n{traceback.format_exc()}") | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="Error"), hf_token_val, login_err_msg, | |
*( [gr.update(visible=False)] * 8 ), | |
0,0,None,gr.update(value=""), "Login Error","") | |
# Gradio Interface | |
css = """ | |
.white { background-color: white; color: black; } .yellow { background-color: yellow; color: black; } | |
.blue { background-color: lightblue; color: black; } .green { background-color: lightgreen; color: black; } | |
.pink { background-color: pink; color: black; } .red { background-color: #FF7F7F; color: black; } | |
.orange { background-color: orange; color: black; } .gray { background-color: lightgray; color: black; } | |
.lightgray { background-color: #f0f0f0; color: black; } | |
.reviewer-textbox input { text-align: center; font-weight: bold; } | |
""" | |
with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: | |
hf_token_state = gr.State(token) | |
current_page_idx_state = gr.State(0) # User-relative page index | |
current_idx_on_page_state = gr.State(0) | |
original_transcript_state = gr.State("") | |
with gr.Column(visible=True, elem_id="login_container") as login_container: | |
gr.Markdown("## HF Authentication") | |
hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", value="") | |
login_button = gr.Button("Login") | |
login_message = gr.Markdown("") | |
with gr.Column(visible=False, elem_id="main_container") as main_container: | |
gr.Markdown("# ASR Dataset Labeling Interface") | |
status_md = gr.Markdown("Please log in.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
audio_player = gr.Audio(label="Audio Sample", autoplay=False) | |
transcript_tb = gr.TextArea(label="Transcript", lines=5, interactive=False) | |
reviewer_tb = gr.Textbox(label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"]) | |
with gr.Column(scale=1): | |
gr.Markdown("### Navigation") | |
prev_button = gr.Button("← Previous") | |
next_button = gr.Button("Next (no save)") | |
# Phase 1 | |
save_next_button = gr.Button("Save & Next", variant="primary", visible=not SECOND_PHASE) | |
first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=False, value=False) # Visibility controlled by login & get_sample | |
# Phase 2 | |
approve_button = gr.Button("Approve & Next", variant="primary", visible=SECOND_PHASE) | |
reject_button = gr.Button("Reject & Next", variant="stop", visible=SECOND_PHASE) | |
gr.Markdown("### Audio Tools (Phase 1 only)") | |
with gr.Row(): | |
trim_start_tb = gr.Textbox(label="Trim Start (s)", placeholder="e.g., 1.5", scale=1) | |
trim_end_tb = gr.Textbox(label="Trim End (s)", placeholder="e.g., 3.0", scale=1) | |
trim_button = gr.Button("Trim Audio", visible=not SECOND_PHASE) | |
undo_trim_button = gr.Button("Undo Trim", visible=not SECOND_PHASE) | |
delete_button = gr.Button("Mark Audio as Deleted", variant="stop", visible=not SECOND_PHASE) | |
with gr.Accordion("Advanced Navigation & Export", open=False): | |
with gr.Row(): | |
jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter dataset absolute index") | |
jump_button = gr.Button("Jump") | |
with gr.Row(): | |
default_repo_name = f"{CURRENT_USERNAME}/my-annotated-dataset" if CURRENT_USERNAME else "your-username/asr-dataset" | |
hf_repo_name_tb = gr.Textbox(label="Export Repository Name (username/dataset-name)", value=default_repo_name) | |
hf_export_button = gr.Button("Export to Hugging Face", variant="primary") | |
hf_export_status_md = gr.Markdown("") | |
# Outputs for login_button (16 outputs) | |
login_outputs = [ | |
login_container, main_container, reviewer_tb, hf_token_state, login_message, | |
save_next_button, transcript_tb, trim_button, undo_trim_button, delete_button, | |
first_phase_accept_cb, # This will receive gr.update(visible=...) from login | |
approve_button, reject_button, | |
current_page_idx_state, current_idx_on_page_state, audio_player, transcript_tb, | |
status_md, original_transcript_state | |
# Note: transcript_tb appears twice: once for interactivity, once for value. Gradio handles this. | |
# The value for first_phase_accept_cb itself will be updated by navigation/load_interface_data | |
] | |
login_button.click(fn=hf_login, inputs=[hf_token_input], outputs=login_outputs) | |
# Common outputs for navigation and actions that reload sample view (9 outputs) | |
navigation_outputs_extended = [ | |
current_page_idx_state, current_idx_on_page_state, | |
audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state, | |
first_phase_accept_cb, # For visibility | |
first_phase_accept_cb # For value | |
] | |
# Phase 1 | |
save_next_button.click( | |
fn=save_and_next_sample_first_phase, | |
inputs=[current_page_idx_state, current_idx_on_page_state, transcript_tb, first_phase_accept_cb], | |
outputs=navigation_outputs_extended | |
) | |
next_button.click( | |
fn=go_next_sample_wrapper, | |
inputs=[current_page_idx_state, current_idx_on_page_state], | |
outputs=navigation_outputs_extended | |
) | |
prev_button.click( | |
fn=go_prev_sample_wrapper, | |
inputs=[current_page_idx_state, current_idx_on_page_state], | |
outputs=navigation_outputs_extended | |
) | |
# Phase 2 | |
approve_button.click( | |
fn=review_and_next_sample_second_phase, | |
inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("approved")], | |
outputs=navigation_outputs_extended | |
) | |
reject_button.click( | |
fn=review_and_next_sample_second_phase, | |
inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("rejected")], | |
outputs=navigation_outputs_extended | |
) | |
# Audio tools (Phase 1) | |
trim_button.click( | |
fn=trim_audio_action, | |
inputs=[current_page_idx_state, current_idx_on_page_state, trim_start_tb, trim_end_tb], | |
outputs=navigation_outputs_extended | |
) | |
undo_trim_button.click( | |
fn=undo_trim_action, | |
inputs=[current_page_idx_state, current_idx_on_page_state], | |
outputs=navigation_outputs_extended | |
) | |
delete_button.click( | |
fn=confirm_delete_audio_action, | |
inputs=[current_page_idx_state, current_idx_on_page_state], | |
outputs=navigation_outputs_extended | |
) | |
# Jump and Export | |
jump_button.click( | |
fn=jump_to_absolute_idx, | |
inputs=[jump_text_tb, current_page_idx_state, current_idx_on_page_state], | |
outputs=navigation_outputs_extended | |
) | |
hf_export_button.click( | |
fn=export_to_huggingface, | |
inputs=[hf_repo_name_tb, hf_token_state], | |
outputs=[hf_export_status_md], | |
queue=True | |
) | |
if __name__ == "__main__": | |
# --- Global config override for testing --- | |
# SECOND_PHASE = True # <<<<<<< SET THIS TO True TO TEST SECOND PHASE | |
# ALLOWED_USERS = ["vargha", "navidved", "userC"] | |
# REVIEWERS = ["vargha"] # First phase reviewers | |
# ANNOTATORS = ["navidved", "userC"] # First phase annotators (become reviewers in 2nd phase if in this list) | |
# Re-initialize based on potential overrides FOR TESTING | |
# In a real app, these would be set once at the top. | |
# ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] | |
# if total_samples > 0: # If total_samples was determined | |
# annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) | |
# if SECOND_PHASE: | |
# initialize_second_phase_assignments() | |
# else: | |
# print("Main block: total_samples not positive, ranges/assignments might be an issue if not loaded by login.") | |
if SECOND_PHASE: | |
print("==== APPLICATION LAUNCHING IN SECOND PHASE (REVIEW MODE) ====") | |
# Ensure assignments are made if they haven't been due to total_samples not being ready | |
if not SECOND_PHASE_REVIEW_MAPPING and total_samples > 0 and ANNOTATORS: | |
print("Late initialization of second phase assignments...") | |
if not annotator_ranges: # Should be populated by now if total_samples is known | |
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) | |
initialize_second_phase_assignments() | |
elif not SECOND_PHASE_REVIEW_MAPPING : | |
print("Warning: Second phase active, but review mapping is empty. Check total_samples and ANNOTATORS list.") | |
else: | |
print("==== APPLICATION LAUNCHING IN FIRST PHASE (ANNOTATION MODE) ====") | |
demo.queue().launch(debug=True, share=False) |