audio-labelling / app.py
navidved's picture
Update app.py
e61df32 verified
import gradio as gr
import os
import json
import pandas as pd
from datasets import load_dataset, DatasetDict, Dataset, Audio
from huggingface_hub import HfApi, whoami, login, hf_hub_download
import tempfile
import shutil
import gc
import time
import psutil
from pydub import AudioSegment
import soundfile as sf
from tenacity import retry, stop_after_attempt, wait_exponential
import re
import numpy as np
from pydantic import BaseModel
from typing import Optional, List, Tuple
from datetime import datetime
# Log in with Hugging Face token
token = os.getenv("hf_token")
if token:
login(token)
else:
print("Warning: hf_token environment variable not set. Hugging Face Hub operations might fail.")
# Configuration
HF_DATASET_NAME = "navidved/channelb-raw-data"
AUDIO_DIR = "audio" # Not actively used if paths are absolute or in dataset item
SAVE_PATH = "annotations.json"
ALLOWED_USERS = ["vargha", "navidved", "userC"]
REVIEWERS = ["vargha"]
ANNOTATORS = [user for user in ALLOWED_USERS]
CURRENT_USERNAME = None
PAGE_SIZE = 100
SAVE_INTERVAL = 10
# --- SECOND PHASE CONFIGURATION ---
SECOND_PHASE = False # Set to True to activate second phase review
SECOND_PHASE_REVIEW_MAPPING = {"navidved": "vargha"} # Populated if SECOND_PHASE is True. Maps: reviewer_username -> original_annotator_username
# Global state variables
current_page = 0 # Stores the USER-RELATIVE page index
ds_iter = None # No longer maintained globally for streaming robustness
current_page_data = None # Pandas DataFrame for the current page's data
audio_backup = {} # For undo_trim, if needed (simplified)
annotation_count = 0
unsaved_changes = {}
total_samples = 0 # Total samples in the HF_DATASET_NAME
annotator_ranges = {} # Stores {annotator_username: (start_abs_idx, end_abs_idx)}
# Pydantic data models
class AudioTrim(BaseModel):
start: float
end: float
class Annotation(BaseModel):
annotator: str
annotated_subtitle: Optional[str] = None
audio_trims: Optional[List[AudioTrim]] = None
is_first_phase_accepted: bool = False
first_phase_reviewer_username: Optional[str] = None
second_phase_reviewed_by: Optional[str] = None
second_phase_review_status: Optional[str] = None
second_phase_review_timestamp: Optional[datetime] = None
create_at: datetime
update_at: datetime
class Sample(BaseModel):
id: int # Absolute index in the dataset
voice_name: str
original_subtitle: str
ignore_it: bool = False
description: Optional[str] = None
annotations: Optional[List[Annotation]] = None
is_approved_in_second_phase: bool = False
class DatasetModel(BaseModel):
samples: Optional[List[Sample]] = None
# Utility functions
def load_saved_annotations():
dataset_model = None
if os.path.exists(SAVE_PATH):
try:
with open(SAVE_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
dataset_model = DatasetModel(**data)
print("Loaded annotations from local JSON file")
except Exception as e:
print(f"Error loading local JSON file: {str(e)}. Removing invalid file.")
# os.remove(SAVE_PATH) # Be cautious with auto-removing
dataset_model = None
if dataset_model is None and token:
try:
hf_path = hf_hub_download(
repo_id=HF_DATASET_NAME,
filename=SAVE_PATH,
repo_type="dataset",
token=token
)
with open(hf_path, "r", encoding="utf-8") as f:
data = json.load(f)
dataset_model = DatasetModel(**data)
with open(SAVE_PATH, "w", encoding="utf-8") as f: # Cache locally
f.write(dataset_model.model_dump_json(exclude_none=True, indent=4))
print("Loaded annotations from HF dataset repository and cached locally")
except Exception as e:
print(f"Error loading JSON file from HF repo: {str(e)}")
dataset_model = None
if dataset_model is None:
dataset_model = DatasetModel(samples=[])
print("Created new empty DatasetModel for annotations")
return dataset_model
def save_annotations(dataset_model: DatasetModel):
global annotation_count
try:
with open(SAVE_PATH, "w", encoding="utf-8") as f:
f.write(dataset_model.model_dump_json(exclude_none=True, indent=4))
print(f"Saved annotations to {SAVE_PATH}")
annotation_count += 1 # This is a simple counter, not total annotations in file
if annotation_count % SAVE_INTERVAL == 0 and token:
push_json_to_hf()
except Exception as e:
print(f"Error saving annotations: {str(e)}")
def push_json_to_hf():
if not token:
print("Cannot push to HF: token not available.")
return
try:
api = HfApi()
api.upload_file(
path_or_fileobj=SAVE_PATH,
path_in_repo=SAVE_PATH,
repo_type="dataset",
repo_id=HF_DATASET_NAME,
token=token
)
print("Uploaded annotations.json to Hugging Face repository")
except Exception as e:
print(f"Error uploading JSON file: {str(e)}")
def calculate_annotator_ranges(total_samples_val, annotators_list):
num_annotators = len(annotators_list)
if num_annotators == 0 or total_samples_val <= 0:
return {}
samples_per_annotator = total_samples_val // num_annotators
extra_samples = total_samples_val % num_annotators
ranges = {}
start_idx = 0
for i, annotator in enumerate(annotators_list):
end_idx = start_idx + samples_per_annotator - 1
if i < extra_samples:
end_idx += 1
if end_idx >= total_samples_val:
end_idx = total_samples_val -1
if start_idx <= end_idx: # Ensure valid range
ranges[annotator] = (start_idx, end_idx)
start_idx = end_idx + 1
print(f"Calculated annotator ranges: {ranges}")
return ranges
def initialize_second_phase_assignments():
global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges, total_samples
if not ANNOTATORS or len(ANNOTATORS) < 1:
print("Not enough annotators for second phase review.")
SECOND_PHASE_REVIEW_MAPPING = {}
return
if not annotator_ranges and total_samples > 0:
print("Populating annotator_ranges for second phase initialization (was empty).")
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS)
elif not annotator_ranges and total_samples <= 0:
print("Warning: Cannot initialize second phase assignments without total_samples and annotator_ranges.")
return
if len(ANNOTATORS) == 1:
annotator = ANNOTATORS[0]
SECOND_PHASE_REVIEW_MAPPING[annotator] = annotator
print(f"Second phase: {annotator} will review their own work.")
else:
for i, reviewer_user in enumerate(ANNOTATORS): # In 2nd phase, ANNOTATORS become reviewers of other ANNOTATORS
original_annotator_idx = (i - 1 + len(ANNOTATORS)) % len(ANNOTATORS)
original_annotator_user = ANNOTATORS[original_annotator_idx]
SECOND_PHASE_REVIEW_MAPPING[reviewer_user] = original_annotator_user
print(f"Second phase: {reviewer_user} will review {original_annotator_user}'s work.")
for reviewer, original_annotator in SECOND_PHASE_REVIEW_MAPPING.items():
if original_annotator not in annotator_ranges:
print(f"Warning: Original annotator {original_annotator} (being reviewed by {reviewer}) has no range defined in annotator_ranges.")
def get_user_allowed_range(username):
global annotator_ranges, total_samples
if SECOND_PHASE:
if not SECOND_PHASE_REVIEW_MAPPING:
initialize_second_phase_assignments()
original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(username)
if original_annotator_to_review:
if not annotator_ranges and total_samples > 0:
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS)
user_range = annotator_ranges.get(original_annotator_to_review)
# print(f"DEBUG: User {username} (reviewer) gets range of {original_annotator_to_review}: {user_range}")
return user_range
else:
# print(f"DEBUG: User {username} not in SECOND_PHASE_REVIEW_MAPPING or no original annotator assigned.")
return None
else: # First Phase Logic
if get_user_role(username) == "reviewer":
return (0, total_samples - 1) if total_samples > 0 else None
elif username in annotator_ranges:
return annotator_ranges[username]
else:
# print(f"DEBUG: User {username} not a reviewer and not in annotator_ranges for first phase.")
return None
def is_within_range(absolute_idx, allowed_range):
if allowed_range is None:
return False
return allowed_range[0] <= absolute_idx <= allowed_range[1]
def get_user_role(username):
return "reviewer" if username in REVIEWERS else "annotator"
# init_dataset_iterator is not needed if we load on demand for streaming
# def init_dataset_iterator(): ...
def get_dataset_info():
global total_samples
if total_samples > 0:
return {'num_samples': total_samples}
try:
# For streaming=True, info.num_examples might be unreliable.
# Load non-streaming just for info, then can use streaming for actual data.
ds_info_obj = load_dataset(HF_DATASET_NAME, split="train", streaming=False) # Load non-streaming for info
num_samples_val = ds_info_obj.num_rows
if num_samples_val and num_samples_val > 0:
total_samples = num_samples_val
print(f"Dataset info: total_samples set to {total_samples}")
return {'num_samples': total_samples}
else: # Fallback if num_rows is not reliable
print("Warning: ds_info_obj.num_rows was not positive. Trying iteration for count (may be slow).")
# Iterating a large streaming dataset to count is very inefficient.
# Consider alternative ways to get total_samples (e.g., hardcode, separate metadata file).
# For this example, if num_rows fails, we'll proceed with caution.
# If it's critical, this part needs a robust solution for getting total_samples.
ds_stream = load_dataset(HF_DATASET_NAME, split="train", streaming=True)
count = 0
for _ in ds_stream:
count +=1
if count > 0:
total_samples = count
print(f"Dataset info: total_samples set to {total_samples} by iteration.")
return {'num_samples': total_samples}
else:
print("Warning: Could not determine total_samples from dataset info or iteration.")
total_samples = -1 # Indicate failure
return {'num_samples': -1}
except Exception as e:
print(f"Error getting dataset info: {e}")
total_samples = -1
return {'num_samples': -1}
# Initial data load (moved after functions it calls are defined)
dataset_info = get_dataset_info() # This sets global total_samples
if total_samples > 0:
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS)
if SECOND_PHASE: # Initialize only if SECOND_PHASE is true from the start
initialize_second_phase_assignments()
else:
print("Warning: total_samples is not positive. Annotation ranges and second phase assignments may be incorrect.")
annotator_ranges = {}
def get_audio_path(audio_entry):
if isinstance(audio_entry, dict):
if "array" in audio_entry and "sampling_rate" in audio_entry:
return (audio_entry["sampling_rate"], audio_entry["array"])
return audio_entry.get("path", None)
if isinstance(audio_entry, str):
if audio_entry.startswith("http://") or audio_entry.startswith("https://"):
return audio_entry
if os.path.exists(audio_entry):
return audio_entry
if AUDIO_DIR: # Not strictly necessary if paths are always absolute/in dataset
joined_path = os.path.join(AUDIO_DIR, audio_entry)
if os.path.exists(joined_path):
return joined_path
return audio_entry # Return as is (e.g. relative path for datasets lib)
return None
def load_page_data(page_num_within_user_view=0):
global current_page_data, current_page, total_samples
try:
ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True)
temp_ds_iter = iter(ds)
except Exception as e:
print(f"Error loading dataset for page data: {e}")
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"])
return current_page_data
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME)
if not user_allowed_range:
print(f"User {CURRENT_USERNAME} has no allowed range.")
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"])
return current_page_data
user_start_abs, user_end_abs = user_allowed_range
num_samples_for_user = user_end_abs - user_start_abs + 1
if num_samples_for_user <= 0:
print(f"User {CURRENT_USERNAME} has an invalid or empty allowed range: {user_allowed_range} (num_samples_for_user: {num_samples_for_user})")
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"])
return current_page_data
effective_start_idx = user_start_abs + (page_num_within_user_view * PAGE_SIZE)
if effective_start_idx > user_end_abs:
print(f"Requested page {page_num_within_user_view} (abs start {effective_start_idx}) is beyond user {CURRENT_USERNAME}'s allowed samples end ({user_end_abs}).")
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"])
current_page = page_num_within_user_view # Still update current_page to reflect the attempt
return current_page_data
effective_end_idx = min(effective_start_idx + PAGE_SIZE - 1, user_end_abs)
samples_on_page = []
idx_counter_for_page = 0
print(f"Loading page {page_num_within_user_view} for user {CURRENT_USERNAME}. Effective absolute range for this page: [{effective_start_idx}-{effective_end_idx}] from user range [{user_start_abs}-{user_end_abs}]")
current_dataset_absolute_idx = -1
skipped_count = 0
for sample_data in temp_ds_iter:
current_dataset_absolute_idx += 1
if current_dataset_absolute_idx < effective_start_idx:
skipped_count += 1
if skipped_count % 1000 == 0: # Log progress of skipping if it's a lot
print(f" Skipping... at abs_idx {current_dataset_absolute_idx}, target start {effective_start_idx}")
continue
if current_dataset_absolute_idx > effective_end_idx:
break
sample_data['absolute_idx'] = current_dataset_absolute_idx
sample_data['id_within_page'] = idx_counter_for_page
samples_on_page.append(sample_data)
idx_counter_for_page +=1
if skipped_count > 0: print(f" Finished skipping {skipped_count} samples.")
current_page = page_num_within_user_view
if samples_on_page:
current_page_data = pd.DataFrame(samples_on_page)
print(f"Loaded {len(samples_on_page)} samples for page {page_num_within_user_view}. First abs_idx: {samples_on_page[0]['absolute_idx']}, Last abs_idx: {samples_on_page[-1]['absolute_idx']}.")
else:
current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"])
print(f"No samples found for user {CURRENT_USERNAME} on their page {page_num_within_user_view} (effective absolute range {effective_start_idx}-{effective_end_idx})")
gc.collect()
return current_page_data
# Core functions
def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_action, accepted_flag=False):
global current_page_data, unsaved_changes
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data):
return "Invalid index or data not loaded for current page."
actual_sample_info = current_page_data.iloc[idx_on_page]
absolute_idx = actual_sample_info['absolute_idx']
# In first phase, range check is important.
# In second phase, page loading itself should restrict to allowed samples.
if not SECOND_PHASE:
allowed_range = get_user_allowed_range(current_user_performing_action)
if not is_within_range(absolute_idx, allowed_range):
return f"You are not allowed to annotate this sample {absolute_idx} (out of range {allowed_range})."
audio_entry_original = actual_sample_info["audio"]
voice_name = os.path.basename(str(get_audio_path(audio_entry_original) or f"sample_{absolute_idx}"))
dataset_model = load_saved_annotations()
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None)
if not sample:
sample = Sample(
id=absolute_idx,
voice_name=voice_name,
original_subtitle=actual_sample_info["sentence"],
annotations=[]
)
dataset_model.samples = dataset_model.samples or []
dataset_model.samples.append(sample)
now = datetime.now()
annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) # Find by actual annotator
if get_user_role(current_user_performing_action) == "reviewer" and not SECOND_PHASE : # First phase reviewer action
# Reviewer might be acting on another's annotation or making their own.
# If accepted_flag is true, they are "accepting" *some* annotation for this sample.
# The current model is: if a reviewer saves, their input becomes *an* annotation.
# If they check "accept", this annotation is marked as accepted.
if annotation:
annotation.annotated_subtitle = transcript.strip()
annotation.update_at = now
annotation.is_first_phase_accepted = accepted_flag
annotation.first_phase_reviewer_username = current_user_performing_action if accepted_flag else None
else:
annotation = Annotation(
annotator=current_user_performing_action,
annotated_subtitle=transcript.strip(),
create_at=now,
update_at=now,
is_first_phase_accepted=accepted_flag,
first_phase_reviewer_username=current_user_performing_action if accepted_flag else None
)
sample.annotations = sample.annotations or []
sample.annotations.append(annotation)
else: # Annotator in first phase, or any user in a context where the simple save applies
if annotation:
annotation.annotated_subtitle = transcript.strip()
annotation.update_at = now
# Annotators cannot set first_phase_accepted themselves
else:
annotation = Annotation(
annotator=current_user_performing_action,
annotated_subtitle=transcript.strip(),
create_at=now,
update_at=now,
is_first_phase_accepted=False # Default for new annotations by non-reviewers or non-accepting reviewers
)
sample.annotations = sample.annotations or []
sample.annotations.append(annotation)
if absolute_idx in unsaved_changes: # Clear if it was marked as unsaved
del unsaved_changes[absolute_idx]
save_annotations(dataset_model)
return f"✓ Saved annotation for sample {absolute_idx}"
def handle_second_phase_action(page_idx, idx_on_page, action: str):
global current_page_data, CURRENT_USERNAME
if not SECOND_PHASE:
return "Not in second phase."
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data):
return "Invalid index or data not loaded for current page (second phase)."
actual_sample_info = current_page_data.iloc[idx_on_page]
absolute_idx = actual_sample_info['absolute_idx']
original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(CURRENT_USERNAME)
if not original_annotator_to_review:
return f"User {CURRENT_USERNAME} is not assigned to review any user's work in SECOND_PHASE_REVIEW_MAPPING."
dataset_model = load_saved_annotations()
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None)
if not sample:
return f"Error: Sample {absolute_idx} not found in annotations.json for review."
annotation_to_review = next((ann for ann in sample.annotations or [] if ann.annotator == original_annotator_to_review), None)
if not annotation_to_review:
print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Creating placeholder for review.")
annotation_to_review = Annotation(
annotator=original_annotator_to_review,
annotated_subtitle=sample.original_subtitle,
create_at=datetime.now(), # Or try to find other annotation's timestamp
update_at=datetime.now()
)
sample.annotations = sample.annotations or []
sample.annotations.append(annotation_to_review)
annotation_to_review.second_phase_reviewed_by = CURRENT_USERNAME
annotation_to_review.second_phase_review_status = action
annotation_to_review.second_phase_review_timestamp = datetime.now()
annotation_to_review.update_at = datetime.now()
if action == "approved":
sample.is_approved_in_second_phase = True
# else: # If rejected, is_approved_in_second_phase remains as is or set to False
# sample.is_approved_in_second_phase = False # Explicitly set to False on rejection
save_annotations(dataset_model)
return f"✓ Review ({action}) saved for sample {absolute_idx} (Original annotator: {original_annotator_to_review})"
def get_sample(page_idx_user_relative, idx_on_page, current_user_displaying):
global current_page_data, total_samples
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data):
return None, "", f"Invalid index ({idx_on_page}) for current page data (len {len(current_page_data) if current_page_data is not None else 'None'}).", "unreviewed", "white", True, False, ""
actual_sample_info = current_page_data.iloc[idx_on_page]
absolute_idx = actual_sample_info['absolute_idx']
audio_entry_original = actual_sample_info["audio"]
audio_val = get_audio_path(audio_entry_original)
default_transcript = actual_sample_info["sentence"]
transcript_to_display = default_transcript
ui_reviewer_field = "unreviewed"
ui_color = "white"
ui_editable = True
ui_is_accepted_flag = False
# Build status message
status_prefix = ""
user_allowed_range = get_user_allowed_range(current_user_displaying)
if user_allowed_range:
user_start_abs, user_end_abs = user_allowed_range
current_sample_num_in_user_assignment = absolute_idx - user_start_abs + 1
total_samples_for_user = user_end_abs - user_start_abs + 1
status_prefix = f"Sample {current_sample_num_in_user_assignment} of {total_samples_for_user} for you (Abs Idx {absolute_idx})."
else:
status_prefix = f"Sample (Abs Idx {absolute_idx})."
dataset_model = load_saved_annotations()
sample_from_json = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None)
if sample_from_json:
if sample_from_json.ignore_it:
audio_val = None
transcript_to_display = "AUDIO DELETED (This audio has been removed.)"
ui_reviewer_field = "deleted"
ui_color = "red"
ui_editable = False
elif SECOND_PHASE:
ui_editable = False
original_annotator_being_reviewed = SECOND_PHASE_REVIEW_MAPPING.get(current_user_displaying)
if not original_annotator_being_reviewed:
transcript_to_display = "Error: You are not mapped to review any user."
ui_color = "red"
ui_reviewer_field = "Error"
else:
ui_reviewer_field = f"Reviewing: {original_annotator_being_reviewed}"
annotation_under_review = next((ann for ann in sample_from_json.annotations or [] if ann.annotator == original_annotator_being_reviewed), None)
if annotation_under_review:
transcript_to_display = annotation_under_review.annotated_subtitle or default_transcript
ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved") # Reflects the action by THIS reviewer IF they reviewed
if annotation_under_review.second_phase_reviewed_by:
if annotation_under_review.second_phase_reviewed_by == current_user_displaying:
ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange"
else:
ui_color = "gray"
ui_reviewer_field += f" (Already reviewed by {annotation_under_review.second_phase_reviewed_by} as {annotation_under_review.second_phase_review_status})"
else:
ui_color = "yellow" # Pending this user's review
else:
transcript_to_display = default_transcript
ui_reviewer_field += " (No submission by original annotator)"
ui_color = "lightgray"
else: # First Phase Logic
accepted_first_phase_annotation = next((a for a in sample_from_json.annotations or [] if a.is_first_phase_accepted and a.first_phase_reviewer_username), None)
if accepted_first_phase_annotation:
transcript_to_display = accepted_first_phase_annotation.annotated_subtitle or default_transcript
ui_reviewer_field = f"Accepted by: {accepted_first_phase_annotation.first_phase_reviewer_username}"
ui_color = "green"
ui_is_accepted_flag = True
ui_editable = (get_user_role(current_user_displaying) == "reviewer")
else:
user_specific_annotation = next((a for a in sample_from_json.annotations or [] if a.annotator == current_user_displaying), None)
if user_specific_annotation:
transcript_to_display = user_specific_annotation.annotated_subtitle or default_transcript
ui_reviewer_field = f"Your draft (as {user_specific_annotation.annotator})"
ui_color = "yellow"
ui_editable = True
else:
other_annotations = [a for a in sample_from_json.annotations or [] if not a.is_first_phase_accepted] # any unaccepted
if other_annotations:
# If current user is reviewer, they see the first other annotator's work
if get_user_role(current_user_displaying) == "reviewer":
other_ann_to_show = other_annotations[0]
transcript_to_display = other_ann_to_show.annotated_subtitle or default_transcript
ui_reviewer_field = f"Draft by: {other_ann_to_show.annotator}"
ui_color = "blue"
ui_editable = True
else: # Annotator sees "labeled by another" if they didn't do it
transcript_to_display = default_transcript # Show original if not their work
ui_reviewer_field = f"Labeled by: {other_annotations[0].annotator}"
ui_color = "lightblue"
ui_editable = False # Cannot edit others' unreviewed work if you are also annotator
# else default_transcript, unreviewed, white, editable=True already set
# If absolute_idx in unsaved_changes, it's a visual cue, actual text is already from above logic.
if not SECOND_PHASE and absolute_idx in unsaved_changes:
ui_color = "pink" # Overrides previous color if unsaved changes by current user
ui_status_message = f"{status_prefix} Page {page_idx_user_relative + 1} (User-view)."
if SECOND_PHASE:
ui_status_message += " (Review Phase)"
else:
ui_status_message += " (Annotation Phase)"
# For reviewer checkbox in first phase
show_accept_checkbox = not SECOND_PHASE and get_user_role(current_user_displaying) == "reviewer"
return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript, gr.update(visible=show_accept_checkbox)
def load_interface_data(page_idx_user_relative, idx_on_page):
audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text, accept_cb_update = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME)
return (
page_idx_user_relative,
idx_on_page,
audio,
gr.update(value=text, interactive=editable),
gr.update(value=saved_reviewer_text, elem_classes=[color]),
base_status,
original_dataset_text,
accept_cb_update, # For the first_phase_accept_cb visibility
accepted_flag # For the first_phase_accept_cb value state
)
def navigate_sample(page_idx_user_relative, idx_on_page, direction: int):
global current_page_data
if current_page_data is None or len(current_page_data) == 0:
# This case might happen if initial load failed or user has no samples
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME)
err_msg = "No data loaded. Try reloading or check your assigned range."
if not user_allowed_range or user_allowed_range[0] > user_allowed_range[1]:
err_msg = "You have no samples assigned or your range is invalid."
# Return a state that indicates error but doesn't crash UI
# The number of outputs must match `navigation_outputs_extended`
return page_idx_user_relative, idx_on_page, None, gr.update(value="Error", interactive=False), gr.update(value="Error"), err_msg, "", gr.update(visible=False), False
target_idx_on_page = idx_on_page + direction
new_page_idx_user_relative = page_idx_user_relative
new_idx_on_page = target_idx_on_page
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME)
if not user_allowed_range: # Should not happen if page data exists, but as a safeguard
return page_idx_user_relative, idx_on_page, gr.update(), gr.update(), gr.update(), "Error: No allowed range for navigation.", gr.update(), gr.update(visible=False), False
if target_idx_on_page < 0:
if page_idx_user_relative > 0:
new_page_idx_user_relative = page_idx_user_relative - 1
temp_data = load_page_data(new_page_idx_user_relative) # This updates global current_page_data
if temp_data is not None and not temp_data.empty:
new_idx_on_page = len(temp_data) - 1
else: # Prev user-relative page is empty (should not happen if page_idx_user_relative > 0 and ranges are correct)
# Stay on current sample, show message
audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME)
status = status + " [Already at the first sample of this page/range]"
return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag
else: # Already on first item of first user-relative page
audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME)
status = status + " [At the beginning of your assigned samples]"
return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag
elif target_idx_on_page >= len(current_page_data):
# Try to go to next user-relative page
new_page_idx_user_relative = page_idx_user_relative + 1
temp_data = load_page_data(new_page_idx_user_relative) # Updates global current_page_data
if temp_data is not None and not temp_data.empty:
new_idx_on_page = 0
else: # Next user-relative page is empty (means we are at the end of user's allowed samples)
current_abs_idx = current_page_data.iloc[idx_on_page]['absolute_idx']
is_at_very_end = user_allowed_range and current_abs_idx >= user_allowed_range[1]
audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME)
if is_at_very_end:
status = status + " [At the end of your assigned samples]"
else: # Should ideally not be hit if temp_data is empty after trying next page
status = status + " [No more samples in this direction (next page empty)]"
return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag
return load_interface_data(new_page_idx_user_relative, new_idx_on_page)
def go_next_sample_wrapper(page_idx_user_relative, idx_on_page):
return navigate_sample(page_idx_user_relative, idx_on_page, 1)
def go_prev_sample_wrapper(page_idx_user_relative, idx_on_page):
return navigate_sample(page_idx_user_relative, idx_on_page, -1)
def save_and_next_sample_first_phase(page_idx_user_relative, idx_on_page, current_text, is_accepted_by_reviewer_flag):
user_is_reviewer = get_user_role(CURRENT_USERNAME) == "reviewer"
accepted_to_save = is_accepted_by_reviewer_flag if user_is_reviewer else False
save_msg = save_sample_data(page_idx_user_relative, idx_on_page, current_text, CURRENT_USERNAME, accepted_flag=accepted_to_save)
print(save_msg)
# After saving, navigate to the next sample
# The navigation outputs will reload the UI for the next sample
return navigate_sample(page_idx_user_relative, idx_on_page, 1)
def review_and_next_sample_second_phase(page_idx_user_relative, idx_on_page, review_action: str):
feedback_msg = handle_second_phase_action(page_idx_user_relative, idx_on_page, review_action)
print(feedback_msg)
return navigate_sample(page_idx_user_relative, idx_on_page, 1)
def jump_to_absolute_idx(target_abs_idx_str, current_page_idx_user_relative, current_idx_on_page):
global current_page_data
try:
target_abs_idx = int(target_abs_idx_str)
if target_abs_idx < 0: target_abs_idx = 0
user_allowed_range = get_user_allowed_range(CURRENT_USERNAME)
if not user_allowed_range or not is_within_range(target_abs_idx, user_allowed_range):
status_msg = f"Target index {target_abs_idx} is outside your assigned range {user_allowed_range or 'N/A'}."
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME)
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc
user_start_abs, _ = user_allowed_range
offset_from_user_start = target_abs_idx - user_start_abs
if offset_from_user_start < 0:
status_msg = f"Logic Error: Target index {target_abs_idx} has negative offset from user start {user_start_abs}."
print(status_msg)
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME)
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc
new_user_relative_page_idx = offset_from_user_start // PAGE_SIZE
temp_page_data_df = load_page_data(new_user_relative_page_idx) # This updates global current_page_data
if temp_page_data_df is None or temp_page_data_df.empty:
status_msg = f"No data found for your page {new_user_relative_page_idx} (containing abs index {target_abs_idx})."
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME)
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc
matching_rows = current_page_data[current_page_data['absolute_idx'] == target_abs_idx]
if not matching_rows.empty:
new_idx_on_page_actual = matching_rows.iloc[0]['id_within_page']
else:
status_msg = f"Index {target_abs_idx} (your page {new_user_relative_page_idx}) in range, but not found on loaded page. Displaying start of page."
print(status_msg)
new_idx_on_page_actual = 0
if current_page_data.empty :
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME)
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg + " (Page empty)", orig_txt, cb_vis, acc
return load_interface_data(new_user_relative_page_idx, new_idx_on_page_actual)
except ValueError:
status_msg = "Invalid index format for jump."
except Exception as e:
import traceback
status_msg = f"Error jumping to index: {str(e)}"
print(f"{status_msg}\n{traceback.format_exc()}")
# Fallback for errors
audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME)
return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc
# Audio editing functions
def trim_audio_action(page_idx_user_relative, idx_on_page, trim_start_str, trim_end_str):
# Outputs must match navigation_outputs_extended
default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], # page, idx, audio, transcript, reviewer
load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", # status
*load_interface_data(page_idx_user_relative, idx_on_page)[6:]) # original_text, cb_vis, cb_val
if SECOND_PHASE: return default_return("Trimming disabled in Review Phase.")
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data):
return default_return("Audio data not available (page error).")
audio_entry_from_df = current_page_data.iloc[idx_on_page]["audio"]
original_audio_path_info = get_audio_path(audio_entry_from_df) # This could be (sr, array) or path str
# Trimming requires a loadable file path. If it's raw array, pydub needs it saved first or handled differently.
# For simplicity, assume get_audio_path gives a string path if it's not raw data.
if not isinstance(original_audio_path_info, str) or not (os.path.exists(original_audio_path_info) or original_audio_path_info.startswith("http")):
# If it is raw data, we might need to save it to a temp file to use pydub.from_file
# Or, if pydub can handle the array directly (it can via AudioSegment(data=..., sample_width=..., frame_rate=..., channels=...))
# This part needs more robust handling if original_audio_path_info is (sr, array) often.
# For now, only proceed if it's a usable path.
if isinstance(original_audio_path_info, tuple): # Raw data
return default_return("Trimming raw audio array directly not yet implemented via pydub.from_file. Save first or use array ops.")
return default_return("Trimming not supported for this audio source or it's not a local/remote file.")
absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx']
# voice_name_original = os.path.basename(original_audio_path_info if isinstance(original_audio_path_info, str) else f"sample_{absolute_idx}")
voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}"))
try:
# If original_audio_path_info is a URL, pydub might need it downloaded first.
# Let's assume local paths for simplicity of example with from_file.
# For URLs, you'd fetch then load.
temp_dir_for_download = None
audio_to_load = original_audio_path_info
if isinstance(original_audio_path_info, str) and original_audio_path_info.startswith("http"):
temp_dir_for_download = tempfile.mkdtemp()
# Simple way to get filename from URL for extension
url_fname = original_audio_path_info.split("/")[-1].split("?")[0]
local_fpath = os.path.join(temp_dir_for_download, url_fname or "downloaded_audio")
import requests
response = requests.get(original_audio_path_info, stream=True)
response.raise_for_status()
with open(local_fpath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
audio_to_load = local_fpath
audio_seg = AudioSegment.from_file(audio_to_load)
start_ms = int(float(trim_start_str) * 1000)
end_ms = int(float(trim_end_str) * 1000)
trimmed_seg = audio_seg[start_ms:end_ms]
os.makedirs("trimmed_audio", exist_ok=True)
# Sanitize voice_name_original further if it comes from URL or complex path
safe_voice_name = re.sub(r'[^\w.-]', '_', voice_name_original)
trimmed_filename = f"trimmed_{absolute_idx}_{safe_voice_name}"
if not os.path.splitext(trimmed_filename)[1]: trimmed_filename += ".wav" # ensure extension
trimmed_path = os.path.join("trimmed_audio", trimmed_filename)
export_format = os.path.splitext(trimmed_path)[1][1:]
if not export_format: export_format = "wav"
trimmed_seg.export(trimmed_path, format=export_format)
if temp_dir_for_download: # Cleanup downloaded file
shutil.rmtree(temp_dir_for_download)
dataset_model = load_saved_annotations()
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None)
if not sample: return default_return(f"Error: Sample {absolute_idx} not found in annotations for trimming.")
now = datetime.now()
annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None)
if not annotation:
annotation = Annotation(annotator=CURRENT_USERNAME, create_at=now, update_at=now) # Subtitle from UI
sample.annotations = sample.annotations or []
sample.annotations.append(annotation)
annotation.audio_trims = [AudioTrim(start=float(trim_start_str), end=float(trim_end_str))]
annotation.update_at = now
# The transcript itself isn't changed by trim, user saves that separately.
save_annotations(dataset_model)
# Update UI to play the new trimmed_path
current_ui_state = load_interface_data(page_idx_user_relative, idx_on_page)
return (current_ui_state[0], current_ui_state[1], trimmed_path, # page,idx, new audio
current_ui_state[3], current_ui_state[4], # transcript, reviewer
current_ui_state[5] + " [Trimmed]", # status
*current_ui_state[6:]) # original_text, cb_vis, cb_val
except Exception as e:
if temp_dir_for_download and os.path.exists(temp_dir_for_download): # Ensure cleanup on error too
shutil.rmtree(temp_dir_for_download)
return default_return(f"Error trimming audio: {str(e)}")
def undo_trim_action(page_idx_user_relative, idx_on_page):
default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5],
load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]",
*load_interface_data(page_idx_user_relative, idx_on_page)[6:])
if SECOND_PHASE: return default_return("Undo Trim disabled in Review Phase.")
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data):
return default_return("Audio data not available (page error).")
absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx']
dataset_model = load_saved_annotations()
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None)
if sample:
annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None)
if annotation and annotation.audio_trims:
annotation.audio_trims = None
annotation.update_at = datetime.now()
save_annotations(dataset_model)
# Restore original audio path from current_page_data
original_audio_entry = current_page_data.iloc[idx_on_page]["audio"]
restored_audio_val = get_audio_path(original_audio_entry)
current_ui_state = load_interface_data(page_idx_user_relative, idx_on_page)
return (current_ui_state[0], current_ui_state[1], restored_audio_val, # page,idx, restored audio
current_ui_state[3], current_ui_state[4], # transcript, reviewer
current_ui_state[5] + " [Trim undone]", # status
*current_ui_state[6:])
def confirm_delete_audio_action(page_idx_user_relative, idx_on_page):
default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5],
load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]",
*load_interface_data(page_idx_user_relative, idx_on_page)[6:])
if SECOND_PHASE: return default_return("Delete disabled in Review Phase.")
if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data):
return default_return("Audio data not available (page error).")
absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx']
voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}"))
dataset_model = load_saved_annotations()
sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None)
if not sample:
sample = Sample(
id=absolute_idx,
voice_name=voice_name_original,
original_subtitle=current_page_data.iloc[idx_on_page]["sentence"],
annotations=[]
)
dataset_model.samples = dataset_model.samples or [] # Ensure list exists
dataset_model.samples.append(sample)
sample.ignore_it = True
now = datetime.now()
deleted_text_marker = "AUDIO DELETED (This audio has been removed.)"
annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None)
if annotation:
annotation.annotated_subtitle = deleted_text_marker
annotation.audio_trims = None
annotation.update_at = now
else:
annotation = Annotation(annotator=CURRENT_USERNAME, annotated_subtitle=deleted_text_marker, create_at=now, update_at=now)
sample.annotations = sample.annotations or []
sample.annotations.append(annotation)
save_annotations(dataset_model)
# After deleting, reload the interface for this item, which will show it as deleted
return load_interface_data(page_idx_user_relative, idx_on_page)
# Export functions
def sanitize_string(s):
if not isinstance(s, str): s = str(s)
return re.sub(r'[^\w-./]', '_', s)
def sanitize_sentence(s):
if not isinstance(s, str): s = str(s)
# Basic sanitization, consider more robust methods if complex characters are common
return s.encode('utf-8', errors='ignore').decode('utf-8')
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token_val=None):
if not token_val:
print("Cannot push to hub: No token provided for push_to_hub_with_retry.")
return
print(f"Pushing dataset to {repo_id}")
dataset_dict.push_to_hub(repo_id, private=private, token=token_val)
def export_to_huggingface(repo_name_str, hf_token_for_export, progress=gr.Progress()):
if not hf_token_for_export:
return "Export failed: Hugging Face token is missing."
if not repo_name_str or len(repo_name_str.split('/')) != 2:
return "Export failed: Repository name must be in 'username/dataset-name' format."
try:
start_time = time.time()
# repo_name_str = sanitize_string(repo_name_str) # Sanitization might be too aggressive for HF repo names
print(f"Export started at {time.strftime('%Y-%m-%d %H:%M:%S')}")
dataset_model_annotations = load_saved_annotations()
if total_samples <= 0: # Use global total_samples
# Try to fetch again if not set
info = get_dataset_info()
if total_samples <= 0:
return "Export failed: Total number of samples is unknown or invalid."
# Load source dataset non-streamed for easier full iteration
ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False)
exported_data_list = []
progress(0, f"Preparing {total_samples} samples for export...")
num_processed_from_source = 0
for i, source_sample in enumerate(ds_source):
if i >= total_samples: break # Should not happen if ds_source.num_rows matches total_samples
num_processed_from_source +=1
absolute_idx = i # Assuming source gives ordered samples matching index
audio_entry = source_sample.get("audio") # This is {path: str} or {array:..., sampling_rate:...}
sentence_val = source_sample.get("sentence", "")
# Base audio data for export (can be path string or dict for Audio feature)
# If source_sample['audio'] is like {'path': '...', 'bytes': None}, datasets lib handles it.
# If it's {'array': ..., 'sampling_rate': ...}, it's also fine.
audio_dict_to_export = audio_entry
annotation_data = next((s for s in dataset_model_annotations.samples or [] if s.id == absolute_idx), None)
if annotation_data:
if annotation_data.ignore_it:
sentence_val = "AUDIO DELETED (This audio has been removed.)"
# Represent deleted audio: empty array or specific silent audio path
audio_dict_to_export = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000}
else:
best_ann = None
if annotation_data.annotations:
approved_anns = [a for a in annotation_data.annotations if a.second_phase_review_status == "approved"]
if SECOND_PHASE and approved_anns: # Priority to 2nd phase approved
best_ann = sorted(approved_anns, key=lambda x: x.second_phase_review_timestamp, reverse=True)[0] if approved_anns else None
if not best_ann: # Then 1st phase accepted
accepted_anns = [a for a in annotation_data.annotations if a.is_first_phase_accepted]
best_ann = sorted(accepted_anns, key=lambda x: x.update_at, reverse=True)[0] if accepted_anns else None
if not best_ann: # Fallback to latest annotation if no formal approval/acceptance
best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0]
if best_ann:
sentence_val = best_ann.annotated_subtitle if best_ann.annotated_subtitle is not None else sentence_val
if best_ann.audio_trims and audio_dict_to_export: # If audio exists and is trimmed
# Reconstruct trimmed audio path (must be consistent with saving)
original_voice_name_for_trim = os.path.basename(str(get_audio_path(audio_entry) or f"sample_{absolute_idx}"))
safe_voice_name_for_trim = re.sub(r'[^\w.-]', '_', original_voice_name_for_trim)
trimmed_fname_base = f"trimmed_{absolute_idx}_{safe_voice_name_for_trim}"
# Try common extensions or use one stored in AudioTrim if available
potential_trimmed_path = os.path.join("trimmed_audio", trimmed_fname_base + ".wav") # Assume wav for now
if os.path.exists(potential_trimmed_path):
# Load trimmed audio into array/sr format for export
arr, sr = sf.read(potential_trimmed_path)
audio_dict_to_export = {"array": arr, "sampling_rate": sr}
else:
print(f"Warning: Trimmed audio file {potential_trimmed_path} not found for sample {absolute_idx}. Exporting original/untrimmed.")
exported_data_list.append({
"audio": audio_dict_to_export,
"sentence": sanitize_sentence(sentence_val)
})
if (i + 1) % 100 == 0:
progress((i + 1) / total_samples, f"Processed {i+1}/{total_samples} samples")
gc.collect()
if num_processed_from_source != total_samples:
print(f"Warning: Processed {num_processed_from_source} from source, but expected total_samples {total_samples}.")
if not exported_data_list:
return "No data to export after processing."
# Ensure audio format is compatible before creating Dataset
for item in exported_data_list:
if item["audio"] is None: # Should have been replaced by placeholder if deleted
item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000}
elif isinstance(item["audio"], dict) and 'path' in item["audio"] and item["audio"]['path'] is None: # Path is None, but dict exists
item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Replace with placeholder
try:
final_dataset = Dataset.from_list(exported_data_list)
final_dataset = final_dataset.cast_column("audio", Audio(sampling_rate=16000)) # Specify common sampling rate
except Exception as e_cast:
# Detailed error for casting issues
print(f"Error during Dataset.from_list or cast_column: {e_cast}")
# Inspect a few items from exported_data_list if casting fails
for idx, problematic_item in enumerate(exported_data_list[:5]):
print(f"Sample item {idx} for export: Audio type {type(problematic_item['audio'])}, Audio content: {str(problematic_item['audio'])[:200]}")
return f"Export failed during data conversion: {e_cast}. Check audio data formats."
dataset_dict_export = DatasetDict({"train": final_dataset})
progress(0.95, "Uploading to Hugging Face...")
target_repo_id = f"{whoami(token=hf_token_for_export)['name']}/{repo_name_str.split('/')[-1]}" # Ensures user owns repo
push_to_hub_with_retry(
dataset_dict=dataset_dict_export,
repo_id=target_repo_id, # Use sanitized and correctly owned repo ID
private=True,
token_val=hf_token_for_export
)
end_time = time.time()
print(f"Upload done, total time: {end_time - start_time:.2f}s")
progress(1.0, "Upload complete!")
return f"Exported to huggingface.co/datasets/{target_repo_id}"
except Exception as e:
import traceback
error_msg = f"Export failed: {str(e)}"
print(f"{error_msg}\n{traceback.format_exc()}")
return error_msg
# Login function
def hf_login(hf_token_val):
global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges, SECOND_PHASE_REVIEW_MAPPING
if not hf_token_val:
# Keep main_container hidden. Return tuple must match login_outputs.
# The number of outputs is 16.
return (gr.update(visible=True), gr.update(visible=False), # login_container, main_container
gr.update(value="N/A"), hf_token_val, "Login failed: Token cannot be empty.", # reviewer_tb, hf_token_state, login_message
# Visibility updates (all default to not visible or specific phase logic)
gr.update(visible=not SECOND_PHASE), gr.update(interactive=not SECOND_PHASE), # save_next, transcript_tb
gr.update(visible=not SECOND_PHASE), gr.update(visible=not SECOND_PHASE), gr.update(visible=not SECOND_PHASE), # trim, undo, delete
gr.update(visible=False), # first_phase_accept_cb (depends on role)
gr.update(visible=SECOND_PHASE), gr.update(visible=SECOND_PHASE), # approve, reject
# Initial data states
0, 0, None, gr.update(value=""), # page_idx, idx_on_page, audio_player, transcript_tb value
"Please log in.", "") # status_md, original_transcript_state
try:
user_info = whoami(token=hf_token_val)
username = user_info['name']
if username in ALLOWED_USERS:
CURRENT_USERNAME = username
token = hf_token_val # Store validated token
# (Re)-initialize dataset info and user-specific ranges/assignments
ds_info = get_dataset_info() # Sets global total_samples
if total_samples <= 0:
# Return tuple must match login_outputs.
return (gr.update(visible=True), gr.update(visible=False),
gr.update(value="Error"), hf_token_val, "Login OK, but failed to get dataset size. Cannot proceed.",
*( [gr.update(visible=False)] * 8 ), # visibility updates
0,0,None,gr.update(value=""), "Error", "")
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS)
if SECOND_PHASE:
initialize_second_phase_assignments() # Depends on ANNOTATORS and ranges
user_allowed_range_check = get_user_allowed_range(CURRENT_USERNAME)
if not user_allowed_range_check or user_allowed_range_check[0] > user_allowed_range_check[1]:
# Return tuple must match login_outputs.
return (gr.update(visible=True), gr.update(visible=False),
gr.update(value="Error"), hf_token_val, f"Login OK, but user {CURRENT_USERNAME} has no samples assigned for {'review' if SECOND_PHASE else 'annotation'}.",
*( [gr.update(visible=False)] * 8 ),
0,0,None,gr.update(value=""), "Error: No samples assigned.", "")
# Load initial page data (user-relative page 0)
current_page_data = load_page_data(0) # Page 0 for this user
is_second_phase_active = SECOND_PHASE
user_is_first_phase_reviewer = not is_second_phase_active and get_user_role(CURRENT_USERNAME) == "reviewer"
# Initial load for UI elements
# load_interface_data returns: page_idx, idx_on_page, audio, transcript_update, reviewer_update, status, orig_text, cb_visibility_update, cb_value
initial_load_tuple = load_interface_data(0, 0) # User-relative page 0, item 0
return (
gr.update(visible=False), # login_container
gr.update(visible=True), # main_container
initial_load_tuple[4], # reviewer_tb gr.update object
hf_token_val, # hf_token_state
f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if is_second_phase_active else 'Annotation'}.", # login_message
gr.update(visible=not is_second_phase_active), # save_next_button
gr.update(interactive=not is_second_phase_active), # transcript_tb interactivity
gr.update(visible=not is_second_phase_active), # trim_button
gr.update(visible=not is_second_phase_active), # undo_trim_button
gr.update(visible=not is_second_phase_active), # delete_button
initial_load_tuple[7], # first_phase_accept_cb visibility update
gr.update(visible=is_second_phase_active), # approve_button
gr.update(visible=is_second_phase_active), # reject_button
initial_load_tuple[0], # page_idx_state
initial_load_tuple[1], # idx_on_page_state
initial_load_tuple[2], # audio_player
initial_load_tuple[3], # transcript_tb (already a gr.update obj with value & interactivity)
initial_load_tuple[5], # status_md
initial_load_tuple[6], # original_transcript_state
# The first_phase_accept_cb value is handled by its direct output connection later
)
else: # User not in ALLOWED_USERS
CURRENT_USERNAME = None
return (gr.update(visible=True), gr.update(visible=False),
gr.update(value="N/A"), hf_token_val, "User not authorized!",
*( [gr.update(visible=False)] * 8 ),
0,0,None,gr.update(value=""),"Not Authorized","")
except Exception as e:
CURRENT_USERNAME = None
import traceback
login_err_msg = f"Login failed: {str(e)}"
print(f"{login_err_msg}\n{traceback.format_exc()}")
return (gr.update(visible=True), gr.update(visible=False),
gr.update(value="Error"), hf_token_val, login_err_msg,
*( [gr.update(visible=False)] * 8 ),
0,0,None,gr.update(value=""), "Login Error","")
# Gradio Interface
css = """
.white { background-color: white; color: black; } .yellow { background-color: yellow; color: black; }
.blue { background-color: lightblue; color: black; } .green { background-color: lightgreen; color: black; }
.pink { background-color: pink; color: black; } .red { background-color: #FF7F7F; color: black; }
.orange { background-color: orange; color: black; } .gray { background-color: lightgray; color: black; }
.lightgray { background-color: #f0f0f0; color: black; }
.reviewer-textbox input { text-align: center; font-weight: bold; }
"""
with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo:
hf_token_state = gr.State(token)
current_page_idx_state = gr.State(0) # User-relative page index
current_idx_on_page_state = gr.State(0)
original_transcript_state = gr.State("")
with gr.Column(visible=True, elem_id="login_container") as login_container:
gr.Markdown("## HF Authentication")
hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", value="")
login_button = gr.Button("Login")
login_message = gr.Markdown("")
with gr.Column(visible=False, elem_id="main_container") as main_container:
gr.Markdown("# ASR Dataset Labeling Interface")
status_md = gr.Markdown("Please log in.")
with gr.Row():
with gr.Column(scale=2):
audio_player = gr.Audio(label="Audio Sample", autoplay=False)
transcript_tb = gr.TextArea(label="Transcript", lines=5, interactive=False)
reviewer_tb = gr.Textbox(label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"])
with gr.Column(scale=1):
gr.Markdown("### Navigation")
prev_button = gr.Button("← Previous")
next_button = gr.Button("Next (no save)")
# Phase 1
save_next_button = gr.Button("Save & Next", variant="primary", visible=not SECOND_PHASE)
first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=False, value=False) # Visibility controlled by login & get_sample
# Phase 2
approve_button = gr.Button("Approve & Next", variant="primary", visible=SECOND_PHASE)
reject_button = gr.Button("Reject & Next", variant="stop", visible=SECOND_PHASE)
gr.Markdown("### Audio Tools (Phase 1 only)")
with gr.Row():
trim_start_tb = gr.Textbox(label="Trim Start (s)", placeholder="e.g., 1.5", scale=1)
trim_end_tb = gr.Textbox(label="Trim End (s)", placeholder="e.g., 3.0", scale=1)
trim_button = gr.Button("Trim Audio", visible=not SECOND_PHASE)
undo_trim_button = gr.Button("Undo Trim", visible=not SECOND_PHASE)
delete_button = gr.Button("Mark Audio as Deleted", variant="stop", visible=not SECOND_PHASE)
with gr.Accordion("Advanced Navigation & Export", open=False):
with gr.Row():
jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter dataset absolute index")
jump_button = gr.Button("Jump")
with gr.Row():
default_repo_name = f"{CURRENT_USERNAME}/my-annotated-dataset" if CURRENT_USERNAME else "your-username/asr-dataset"
hf_repo_name_tb = gr.Textbox(label="Export Repository Name (username/dataset-name)", value=default_repo_name)
hf_export_button = gr.Button("Export to Hugging Face", variant="primary")
hf_export_status_md = gr.Markdown("")
# Outputs for login_button (16 outputs)
login_outputs = [
login_container, main_container, reviewer_tb, hf_token_state, login_message,
save_next_button, transcript_tb, trim_button, undo_trim_button, delete_button,
first_phase_accept_cb, # This will receive gr.update(visible=...) from login
approve_button, reject_button,
current_page_idx_state, current_idx_on_page_state, audio_player, transcript_tb,
status_md, original_transcript_state
# Note: transcript_tb appears twice: once for interactivity, once for value. Gradio handles this.
# The value for first_phase_accept_cb itself will be updated by navigation/load_interface_data
]
login_button.click(fn=hf_login, inputs=[hf_token_input], outputs=login_outputs)
# Common outputs for navigation and actions that reload sample view (9 outputs)
navigation_outputs_extended = [
current_page_idx_state, current_idx_on_page_state,
audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state,
first_phase_accept_cb, # For visibility
first_phase_accept_cb # For value
]
# Phase 1
save_next_button.click(
fn=save_and_next_sample_first_phase,
inputs=[current_page_idx_state, current_idx_on_page_state, transcript_tb, first_phase_accept_cb],
outputs=navigation_outputs_extended
)
next_button.click(
fn=go_next_sample_wrapper,
inputs=[current_page_idx_state, current_idx_on_page_state],
outputs=navigation_outputs_extended
)
prev_button.click(
fn=go_prev_sample_wrapper,
inputs=[current_page_idx_state, current_idx_on_page_state],
outputs=navigation_outputs_extended
)
# Phase 2
approve_button.click(
fn=review_and_next_sample_second_phase,
inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("approved")],
outputs=navigation_outputs_extended
)
reject_button.click(
fn=review_and_next_sample_second_phase,
inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("rejected")],
outputs=navigation_outputs_extended
)
# Audio tools (Phase 1)
trim_button.click(
fn=trim_audio_action,
inputs=[current_page_idx_state, current_idx_on_page_state, trim_start_tb, trim_end_tb],
outputs=navigation_outputs_extended
)
undo_trim_button.click(
fn=undo_trim_action,
inputs=[current_page_idx_state, current_idx_on_page_state],
outputs=navigation_outputs_extended
)
delete_button.click(
fn=confirm_delete_audio_action,
inputs=[current_page_idx_state, current_idx_on_page_state],
outputs=navigation_outputs_extended
)
# Jump and Export
jump_button.click(
fn=jump_to_absolute_idx,
inputs=[jump_text_tb, current_page_idx_state, current_idx_on_page_state],
outputs=navigation_outputs_extended
)
hf_export_button.click(
fn=export_to_huggingface,
inputs=[hf_repo_name_tb, hf_token_state],
outputs=[hf_export_status_md],
queue=True
)
if __name__ == "__main__":
# --- Global config override for testing ---
# SECOND_PHASE = True # <<<<<<< SET THIS TO True TO TEST SECOND PHASE
# ALLOWED_USERS = ["vargha", "navidved", "userC"]
# REVIEWERS = ["vargha"] # First phase reviewers
# ANNOTATORS = ["navidved", "userC"] # First phase annotators (become reviewers in 2nd phase if in this list)
# Re-initialize based on potential overrides FOR TESTING
# In a real app, these would be set once at the top.
# ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS]
# if total_samples > 0: # If total_samples was determined
# annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS)
# if SECOND_PHASE:
# initialize_second_phase_assignments()
# else:
# print("Main block: total_samples not positive, ranges/assignments might be an issue if not loaded by login.")
if SECOND_PHASE:
print("==== APPLICATION LAUNCHING IN SECOND PHASE (REVIEW MODE) ====")
# Ensure assignments are made if they haven't been due to total_samples not being ready
if not SECOND_PHASE_REVIEW_MAPPING and total_samples > 0 and ANNOTATORS:
print("Late initialization of second phase assignments...")
if not annotator_ranges: # Should be populated by now if total_samples is known
annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS)
initialize_second_phase_assignments()
elif not SECOND_PHASE_REVIEW_MAPPING :
print("Warning: Second phase active, but review mapping is empty. Check total_samples and ANNOTATORS list.")
else:
print("==== APPLICATION LAUNCHING IN FIRST PHASE (ANNOTATION MODE) ====")
demo.queue().launch(debug=True, share=False)