import gradio as gr import os import json import pandas as pd from datasets import load_dataset, DatasetDict, Dataset, Audio from huggingface_hub import HfApi, whoami, login, hf_hub_download try: from huggingface_hub.utils import HfHubHTTPError # For newer versions except ImportError: from huggingface_hub.hf_api import HfHubHTTPError # For older versions (e.g., <0.5.0) import tempfile import shutil import gc import time import psutil from pydub import AudioSegment import soundfile as sf from tenacity import retry, stop_after_attempt, wait_exponential import re import numpy as np from pydantic import BaseModel from typing import Optional, List, Tuple from datetime import datetime import requests # Log in with Hugging Face token token = os.getenv("hf_token") if token: try: login(token) print("Successfully logged in using hf_token environment variable.") except Exception as e: print(f"Failed to login with hf_token environment variable: {e}") token = None # Ensure token is None if login fails else: print("Warning: hf_token environment variable not set. Hugging Face Hub operations might fail unless token is provided via UI.") # Configuration HF_DATASET_NAME = "navidved/channelb-raw-data" AUDIO_DIR = "audio" SAVE_PATH = "annotations.json" # Local filename for annotations ALLOWED_USERS = ["shahab7", "Amirnamini23", "Mohsen711", "mahya2025", "najmeh00", "sepehr21ar", "zahraemarati", "Moghim72", "amin76", "vargha", "navidved"] REVIEWERS = ["vargha", "navidved"] ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] CURRENT_USERNAME = None PAGE_SIZE = 100 # SAVE_INTERVAL = 1 # FOR DEBUGGING: PUSH ON EVERY SAVE SAVE_INTERVAL = 10 # Normal operation: push every 10 saves # --- SECOND PHASE CONFIGURATION --- SECOND_PHASE = False SECOND_PHASE_REVIEW_MAPPING = {} # Global state variables current_page = 0 current_page_data = None audio_backup = {} annotation_count = 0 # Counts saves since login for the current session unsaved_changes = {} total_samples = 0 annotator_ranges = {} # Pydantic data models class AudioTrim(BaseModel): start: float end: float class Annotation(BaseModel): annotator: str annotated_subtitle: Optional[str] = None audio_trims: Optional[List[AudioTrim]] = None is_first_phase_accepted: bool = False first_phase_reviewer_username: Optional[str] = None second_phase_reviewed_by: Optional[str] = None second_phase_review_status: Optional[str] = None second_phase_review_timestamp: Optional[datetime] = None create_at: datetime update_at: datetime class Sample(BaseModel): id: int voice_name: str original_subtitle: str ignore_it: bool = False description: Optional[str] = None annotations: Optional[List[Annotation]] = None is_approved_in_second_phase: bool = False class DatasetModel(BaseModel): samples: Optional[List[Sample]] = None # Utility functions def load_saved_annotations(): dataset_model = None local_file_loaded_successfully = False annotations_filename_in_repo = os.path.basename(SAVE_PATH) # e.g., "annotations.json" if os.path.exists(SAVE_PATH): try: with open(SAVE_PATH, "r", encoding="utf-8") as f: data = json.load(f) if "samples" in data or not data: dataset_model = DatasetModel(**data) print(f"Loaded annotations from local JSON file: {SAVE_PATH}") local_file_loaded_successfully = True else: print(f"Local JSON file {SAVE_PATH} has incorrect structure. Ignoring.") except Exception as e: print(f"Error loading local JSON file '{SAVE_PATH}': {str(e)}. Will try HF Hub or create new.") try: corrupt_path = SAVE_PATH + ".corrupt." + datetime.now().strftime("%Y%m%d%H%M%S%f") os.rename(SAVE_PATH, corrupt_path) print(f"Renamed corrupt local file to {corrupt_path}") except OSError as re_e: print(f"Could not rename corrupt local file: {re_e}") global token # Access the global token, which should be set by hf_login if not local_file_loaded_successfully and token: print(f"Local annotations not loaded or not found/corrupt. Trying Hugging Face Hub for {annotations_filename_in_repo}...") try: hf_path = hf_hub_download( repo_id=HF_DATASET_NAME, filename=annotations_filename_in_repo, repo_type="dataset", token=os.getenv("hf_token") ) with open(hf_path, "r", encoding="utf-8") as f: data = json.load(f) dataset_model = DatasetModel(**data) with open(SAVE_PATH, "w", encoding="utf-8") as f_cache: f_cache.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) print(f"Loaded annotations from HF '{HF_DATASET_NAME}/{annotations_filename_in_repo}' and cached to '{SAVE_PATH}'.") except HfHubHTTPError as e: if e.response.status_code == 404: print(f"Annotations file '{annotations_filename_in_repo}' not found on HF repo '{HF_DATASET_NAME}'. This is normal if it's the first run or not pushed yet.") else: print(f"Error loading JSON file from HF repo '{HF_DATASET_NAME}/{annotations_filename_in_repo}': {str(e)}") except Exception as e: print(f"Unexpected error loading JSON file from HF repo '{HF_DATASET_NAME}/{annotations_filename_in_repo}': {str(e)}") if dataset_model is None: print("No valid annotations found locally or on HF Hub (or failed to load). Creating new empty DatasetModel.") dataset_model = DatasetModel(samples=[]) return dataset_model def push_json_to_hf(): global token # Use the globally set token from hf_login annotations_filename_in_repo = os.path.basename(SAVE_PATH) if not token: print("Push to HF: Aborted. Token not available/set.") return print(f"Push to HF: Attempting to upload '{SAVE_PATH}' as '{annotations_filename_in_repo}' to '{HF_DATASET_NAME}'.") try: user_details = whoami(token=token) print(f"Push to HF: Token confirmed for user '{user_details.get('name')}'.") except Exception as e_whoami: print(f"Push to HF: Token seems invalid or whoami failed. Error: {e_whoami}") print(f"Push to HF: Aborting upload due to token validation issue.") return try: api = HfApi() api.upload_file( path_or_fileobj=SAVE_PATH, # Local path to the file path_in_repo=annotations_filename_in_repo, # Name of the file in the repository repo_type="dataset", repo_id=HF_DATASET_NAME, token=os.getenv("hf_token"), commit_message=f"Updated {annotations_filename_in_repo} via annotation tool at {datetime.now().isoformat()}" ) print(f"Push to HF: Successfully uploaded '{annotations_filename_in_repo}' to Hugging Face repository '{HF_DATASET_NAME}'.") except Exception as e: print(f"Push to HF: Error uploading '{annotations_filename_in_repo}' to '{HF_DATASET_NAME}'. Error: {str(e)}") import traceback print("Push to HF: Traceback below:") traceback.print_exc() def save_annotations(dataset_model: DatasetModel): global annotation_count, token # Make sure we're using the global token # DEBUGGING PRINT print(f"Debug (save_annotations): annotation_count (before inc)={annotation_count}, SAVE_INTERVAL={SAVE_INTERVAL}, token_is_truthy={bool(token)}") try: with open(SAVE_PATH, "w", encoding="utf-8") as f: f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) print(f"Saved annotations locally to {SAVE_PATH}") annotation_count += 1 # Increment after successful local save if token and (annotation_count % SAVE_INTERVAL == 0): print(f"Debug (save_annotations): Conditions met for HF push. Current annotation_count={annotation_count}.") push_json_to_hf() elif not token: print(f"Debug (save_annotations): HF push skipped. Token is not available. annotation_count={annotation_count}.") else: # Token is available, but interval not met print(f"Debug (save_annotations): HF push skipped. Interval not met. annotation_count={annotation_count}. " f"Need {(SAVE_INTERVAL - (annotation_count % SAVE_INTERVAL)) % SAVE_INTERVAL} more saves for next push (or 0 if at interval).") except Exception as e: print(f"Error in save_annotations (local save or triggering push): {str(e)}") import traceback print("Traceback for save_annotations error:") traceback.print_exc() def calculate_annotator_ranges(total_samples_val, annotators_list): num_annotators = len(annotators_list) if num_annotators == 0 or total_samples_val <= 0: return {} samples_per_annotator = total_samples_val // num_annotators extra_samples = total_samples_val % num_annotators ranges = {} start_idx = 0 for i, annotator in enumerate(annotators_list): end_idx = start_idx + samples_per_annotator - 1 if i < extra_samples: end_idx += 1 if end_idx >= total_samples_val: end_idx = total_samples_val -1 if start_idx <= end_idx: ranges[annotator] = (start_idx, end_idx) start_idx = end_idx + 1 print(f"Calculated annotator ranges: {ranges}") return ranges def initialize_second_phase_assignments(): global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges, total_samples if not ANNOTATORS or len(ANNOTATORS) < 1: print("Not enough annotators for second phase review.") SECOND_PHASE_REVIEW_MAPPING = {} return if not annotator_ranges and total_samples > 0: print("Populating annotator_ranges for second phase initialization (was empty).") annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) elif not annotator_ranges and total_samples <= 0: print("Warning: Cannot initialize second phase assignments without total_samples and annotator_ranges.") return if len(ANNOTATORS) == 1: annotator = ANNOTATORS[0] SECOND_PHASE_REVIEW_MAPPING[annotator] = annotator print(f"Second phase: {annotator} will review their own work.") else: for i, reviewer_user in enumerate(ANNOTATORS): original_annotator_idx = (i - 1 + len(ANNOTATORS)) % len(ANNOTATORS) original_annotator_user = ANNOTATORS[original_annotator_idx] SECOND_PHASE_REVIEW_MAPPING[reviewer_user] = original_annotator_user print(f"Second phase: {reviewer_user} will review {original_annotator_user}'s work.") for reviewer, original_annotator in SECOND_PHASE_REVIEW_MAPPING.items(): if original_annotator not in annotator_ranges: print(f"Warning: Original annotator {original_annotator} (being reviewed by {reviewer}) has no range defined in annotator_ranges.") def get_user_allowed_range(username): global annotator_ranges, total_samples, ANNOTATORS # Ensure ANNOTATORS is accessible if SECOND_PHASE: if not SECOND_PHASE_REVIEW_MAPPING: # If empty, try to initialize # Need annotator_ranges for initialize_second_phase_assignments if not annotator_ranges and total_samples > 0 and ANNOTATORS: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) initialize_second_phase_assignments() # This will populate SECOND_PHASE_REVIEW_MAPPING original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(username) if original_annotator_to_review: # Ensure annotator_ranges is populated if it wasn't before if not annotator_ranges and total_samples > 0 and ANNOTATORS: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) user_range = annotator_ranges.get(original_annotator_to_review) return user_range else: # User not found in review mapping (e.g., a first-phase reviewer not part of ANNOTATORS cycle) return None # Or handle as appropriate, e.g., full range if they are a super-reviewer else: # First Phase Logic if get_user_role(username) == "reviewer": return (0, total_samples - 1) if total_samples > 0 else None # Ensure annotator_ranges is populated for annotators elif not annotator_ranges and total_samples > 0 and ANNOTATORS: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) if username in annotator_ranges: return annotator_ranges[username] else: return None def is_within_range(absolute_idx, allowed_range): if allowed_range is None: return False return allowed_range[0] <= absolute_idx <= allowed_range[1] def get_user_role(username): return "reviewer" if username in REVIEWERS else "annotator" def get_dataset_info(): global total_samples if total_samples > 0: return {'num_samples': total_samples} try: print(f"Attempting to load dataset info for {HF_DATASET_NAME} (non-streaming)...") ds_info_obj = load_dataset(HF_DATASET_NAME, split="train", streaming=False) num_samples_val = ds_info_obj.num_rows if num_samples_val and num_samples_val > 0: total_samples = num_samples_val print(f"Dataset info: total_samples set to {total_samples}") return {'num_samples': total_samples} else: print(f"Warning: ds_info_obj.num_rows was not positive ({num_samples_val}). Trying iteration for count (may be slow).") ds_stream = load_dataset(HF_DATASET_NAME, split="train", streaming=True) count = 0 for _ in ds_stream: # This will iterate over the whole dataset if num_rows is wrong count +=1 if count % 10000 == 0: print(f"Counting by iteration... at {count}") # Progress for large datasets if count > 0: total_samples = count print(f"Dataset info: total_samples set to {total_samples} by iteration.") return {'num_samples': total_samples} else: print("Warning: Could not determine total_samples from dataset info or iteration.") total_samples = -1 return {'num_samples': -1} except Exception as e: print(f"Error getting dataset info for {HF_DATASET_NAME}: {e}") total_samples = -1 return {'num_samples': -1} # Initial data load attempt (will be re-attempted more robustly in hf_login) # dataset_info = get_dataset_info() # if total_samples > 0: # annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) # if SECOND_PHASE: # initialize_second_phase_assignments() # else: # print("Initial check: total_samples is not positive. Will rely on login process to set this.") # annotator_ranges = {} def get_audio_path(audio_entry): if isinstance(audio_entry, dict): if "array" in audio_entry and "sampling_rate" in audio_entry: return (audio_entry["sampling_rate"], audio_entry["array"]) return audio_entry.get("path", None) if isinstance(audio_entry, str): if audio_entry.startswith("http://") or audio_entry.startswith("https://"): return audio_entry if os.path.exists(audio_entry): return audio_entry if AUDIO_DIR: joined_path = os.path.join(AUDIO_DIR, audio_entry) if os.path.exists(joined_path): return joined_path return audio_entry return None def load_page_data(page_num_within_user_view=0): global current_page_data, current_page current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) current_page = page_num_within_user_view user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) if not user_allowed_range: print(f"User {CURRENT_USERNAME} has no allowed range.") return current_page_data user_start_abs, user_end_abs = user_allowed_range if user_start_abs < 0 or user_end_abs < 0 or user_start_abs > user_end_abs: print(f"User {CURRENT_USERNAME} has an invalid allowed range: {user_allowed_range}") return current_page_data page_global_start_idx = user_start_abs + (page_num_within_user_view * PAGE_SIZE) if page_global_start_idx > user_end_abs: print(f"Requested page {page_num_within_user_view} (abs start {page_global_start_idx}) is beyond user {CURRENT_USERNAME}'s allowed samples end ({user_end_abs}).") return current_page_data page_global_end_idx = min(page_global_start_idx + PAGE_SIZE - 1, user_end_abs) num_samples_on_this_page = page_global_end_idx - page_global_start_idx + 1 if num_samples_on_this_page <= 0: print(f"No samples for user {CURRENT_USERNAME} on their page {page_num_within_user_view}. Calculated range for page: [{page_global_start_idx}-{page_global_end_idx}]") return current_page_data print(f"Loading page {page_num_within_user_view} for user {CURRENT_USERNAME}. " f"Effective absolute dataset range for this page: [{page_global_start_idx}-{page_global_end_idx}] " f"(from user range [{user_start_abs}-{user_end_abs}]). " f"Will attempt to load {num_samples_on_this_page} samples.") try: ds_full = load_dataset(HF_DATASET_NAME, split="train", streaming=True, token=token if token else None) # Use token for private datasets ds_page_specific = ds_full.skip(page_global_start_idx) page_iterable = ds_page_specific.take(num_samples_on_this_page) except Exception as e: print(f"Error loading or processing dataset via skip/take for page data: {e}") return current_page_data samples_on_page_list = [] current_processing_abs_idx = page_global_start_idx for id_on_page_counter, sample_data_item in enumerate(page_iterable): sample_data_item['absolute_idx'] = current_processing_abs_idx sample_data_item['id_within_page'] = id_on_page_counter samples_on_page_list.append(sample_data_item) current_processing_abs_idx += 1 if id_on_page_counter + 1 >= num_samples_on_this_page: break if samples_on_page_list: current_page_data = pd.DataFrame(samples_on_page_list) print(f"Loaded {len(samples_on_page_list)} samples for page {page_num_within_user_view}. " f"First abs_idx: {samples_on_page_list[0]['absolute_idx']}, " f"Last abs_idx: {samples_on_page_list[-1]['absolute_idx']}.") else: print(f"No samples were loaded for page {page_num_within_user_view} (user: {CURRENT_USERNAME}) " f"despite expecting {num_samples_on_this_page} from range [{page_global_start_idx}-{page_global_end_idx}]. ") gc.collect() return current_page_data # Core functions (save_sample_data, handle_second_phase_action, get_sample, load_interface_data, navigation functions, jump, trim, export etc. remain largely the same as your previous version) # ... (Keep the rest of your functions from the previous version here) # For brevity, I'm omitting the bulk of the functions that were not directly related to the HF save issue or initial loading. # Make sure to include: # - save_sample_data # - handle_second_phase_action # - get_sample # - load_interface_data # - navigate_sample and its wrappers # - jump_to_absolute_idx # - trim_audio_action, undo_trim_action, confirm_delete_audio_action # - export_to_huggingface # - hf_login (ensure it correctly calls get_dataset_info, calculate_annotator_ranges, load_page_data, etc. *after* successful auth) def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_action, accepted_flag=False): global current_page_data, unsaved_changes if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return "Invalid index or data not loaded for current page." actual_sample_info = current_page_data.iloc[idx_on_page] absolute_idx = actual_sample_info['absolute_idx'] if not SECOND_PHASE: allowed_range = get_user_allowed_range(current_user_performing_action) if not is_within_range(absolute_idx, allowed_range): return f"You are not allowed to annotate this sample {absolute_idx} (out of range {allowed_range})." audio_entry_original = actual_sample_info["audio"] voice_name = os.path.basename(str(get_audio_path(audio_entry_original) or f"sample_{absolute_idx}")) dataset_model = load_saved_annotations() # This will load existing or create new sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if not sample: sample = Sample( id=absolute_idx, voice_name=voice_name, original_subtitle=actual_sample_info["sentence"], annotations=[] ) dataset_model.samples = dataset_model.samples or [] dataset_model.samples.append(sample) now = datetime.now() annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) if get_user_role(current_user_performing_action) == "reviewer" and not SECOND_PHASE : if annotation: annotation.annotated_subtitle = transcript.strip() annotation.update_at = now annotation.is_first_phase_accepted = accepted_flag annotation.first_phase_reviewer_username = current_user_performing_action if accepted_flag else None else: annotation = Annotation( annotator=current_user_performing_action, annotated_subtitle=transcript.strip(), create_at=now, update_at=now, is_first_phase_accepted=accepted_flag, first_phase_reviewer_username=current_user_performing_action if accepted_flag else None ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) else: if annotation: annotation.annotated_subtitle = transcript.strip() annotation.update_at = now else: annotation = Annotation( annotator=current_user_performing_action, annotated_subtitle=transcript.strip(), create_at=now, update_at=now, is_first_phase_accepted=False ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) if absolute_idx in unsaved_changes: del unsaved_changes[absolute_idx] save_annotations(dataset_model) # This will save locally and potentially push to HF return f"✓ Saved annotation for sample {absolute_idx}" def handle_second_phase_action(page_idx, idx_on_page, action: str): global current_page_data, CURRENT_USERNAME if not SECOND_PHASE: return "Not in second phase." if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return "Invalid index or data not loaded for current page (second phase)." actual_sample_info = current_page_data.iloc[idx_on_page] absolute_idx = actual_sample_info['absolute_idx'] original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(CURRENT_USERNAME) if not original_annotator_to_review: return f"User {CURRENT_USERNAME} is not assigned to review any user's work in SECOND_PHASE_REVIEW_MAPPING." dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if not sample: return f"Error: Sample {absolute_idx} not found in annotations.json for review." annotation_to_review = next((ann for ann in sample.annotations or [] if ann.annotator == original_annotator_to_review), None) if not annotation_to_review: print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Creating placeholder for review.") annotation_to_review = Annotation( annotator=original_annotator_to_review, annotated_subtitle=sample.original_subtitle, # Or actual_sample_info["sentence"] create_at=datetime.now(), update_at=datetime.now() ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation_to_review) annotation_to_review.second_phase_reviewed_by = CURRENT_USERNAME annotation_to_review.second_phase_review_status = action annotation_to_review.second_phase_review_timestamp = datetime.now() annotation_to_review.update_at = datetime.now() if action == "approved": sample.is_approved_in_second_phase = True # else: sample.is_approved_in_second_phase = False # Explicitly set to False on rejection save_annotations(dataset_model) return f"✓ Review ({action}) saved for sample {absolute_idx} (Original annotator: {original_annotator_to_review})" def get_sample(page_idx_user_relative, idx_on_page, current_user_displaying): global current_page_data, total_samples if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): # Default empty values for all expected return items return None, "", f"Invalid index ({idx_on_page}) for current page data (len {len(current_page_data) if current_page_data is not None else 'None'}).", "unreviewed", "white", True, False, "", gr.update(visible=False) actual_sample_info = current_page_data.iloc[idx_on_page] absolute_idx = actual_sample_info['absolute_idx'] audio_entry_original = actual_sample_info["audio"] audio_val = get_audio_path(audio_entry_original) default_transcript = actual_sample_info.get("sentence", "") # Use .get for safety transcript_to_display = default_transcript ui_reviewer_field = "unreviewed" ui_color = "white" ui_editable = True ui_is_accepted_flag = False status_prefix = "" user_allowed_range = get_user_allowed_range(current_user_displaying) if user_allowed_range: user_start_abs, user_end_abs = user_allowed_range # Ensure user_start_abs is valid before calculation if user_start_abs is not None and absolute_idx >= user_start_abs : current_sample_num_in_user_assignment = absolute_idx - user_start_abs + 1 total_samples_for_user = user_end_abs - user_start_abs + 1 status_prefix = f"Sample {current_sample_num_in_user_assignment} of {total_samples_for_user} for you (Abs Idx {absolute_idx})." else: # Fallback if range is odd or absolute_idx is somehow outside status_prefix = f"Sample (Abs Idx {absolute_idx}). Range issue for user stats." else: status_prefix = f"Sample (Abs Idx {absolute_idx}). No range assigned." dataset_model = load_saved_annotations() sample_from_json = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if sample_from_json: if sample_from_json.ignore_it: audio_val = None transcript_to_display = "AUDIO DELETED (This audio has been removed.)" ui_reviewer_field = "deleted" ui_color = "red" ui_editable = False elif SECOND_PHASE: ui_editable = False original_annotator_being_reviewed = SECOND_PHASE_REVIEW_MAPPING.get(current_user_displaying) if not original_annotator_being_reviewed: transcript_to_display = "Error: You are not mapped to review any user." ui_color = "red" ui_reviewer_field = "Error" else: ui_reviewer_field = f"Reviewing: {original_annotator_being_reviewed}" annotation_under_review = next((ann for ann in sample_from_json.annotations or [] if ann.annotator == original_annotator_being_reviewed), None) if annotation_under_review: transcript_to_display = annotation_under_review.annotated_subtitle or default_transcript ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved" and annotation_under_review.second_phase_reviewed_by == current_user_displaying) if annotation_under_review.second_phase_reviewed_by: if annotation_under_review.second_phase_reviewed_by == current_user_displaying: ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange" else: ui_color = "gray" ui_reviewer_field += f" (Already reviewed by {annotation_under_review.second_phase_reviewed_by} as {annotation_under_review.second_phase_review_status})" else: ui_color = "yellow" else: transcript_to_display = default_transcript ui_reviewer_field += " (No submission by original annotator)" ui_color = "lightgray" else: # First Phase Logic accepted_first_phase_annotation = next((a for a in sample_from_json.annotations or [] if a.is_first_phase_accepted and a.first_phase_reviewer_username), None) if accepted_first_phase_annotation: transcript_to_display = accepted_first_phase_annotation.annotated_subtitle or default_transcript ui_reviewer_field = f"Accepted by: {accepted_first_phase_annotation.first_phase_reviewer_username}" ui_color = "green" ui_is_accepted_flag = True ui_editable = (get_user_role(current_user_displaying) == "reviewer") else: user_specific_annotation = next((a for a in sample_from_json.annotations or [] if a.annotator == current_user_displaying), None) if user_specific_annotation: transcript_to_display = user_specific_annotation.annotated_subtitle or default_transcript ui_reviewer_field = f"Your draft (as {user_specific_annotation.annotator})" ui_color = "yellow" ui_editable = True else: other_annotations = [a for a in sample_from_json.annotations or [] if not a.is_first_phase_accepted] if other_annotations: if get_user_role(current_user_displaying) == "reviewer": other_ann_to_show = other_annotations[0] transcript_to_display = other_ann_to_show.annotated_subtitle or default_transcript ui_reviewer_field = f"Draft by: {other_ann_to_show.annotator}" ui_color = "blue" ui_editable = True else: transcript_to_display = default_transcript ui_reviewer_field = f"Labeled by: {other_annotations[0].annotator}" ui_color = "lightblue" ui_editable = False if not SECOND_PHASE and absolute_idx in unsaved_changes: ui_color = "pink" ui_status_message = f"{status_prefix} Page {page_idx_user_relative + 1} (User-view)." if SECOND_PHASE: ui_status_message += " (Review Phase)" else: ui_status_message += " (Annotation Phase)" show_accept_checkbox = not SECOND_PHASE and get_user_role(current_user_displaying) == "reviewer" return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript, gr.update(visible=show_accept_checkbox) def load_interface_data(page_idx_user_relative, idx_on_page): # get_sample returns 9 items audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text, accept_cb_visibility_update = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) return ( page_idx_user_relative, # 0 idx_on_page, # 1 audio, # 2 gr.update(value=text, interactive=editable), # 3 transcript_tb gr.update(value=saved_reviewer_text, elem_classes=[color]), # 4 reviewer_tb base_status, # 5 status_md original_dataset_text, # 6 original_transcript_state accept_cb_visibility_update, # 7 first_phase_accept_cb (visibility part) accepted_flag # 8 first_phase_accept_cb (value part) ) def navigate_sample(page_idx_user_relative, idx_on_page, direction: int): global current_page_data if current_page_data is None or len(current_page_data) == 0: user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) err_msg = "No data loaded. Try reloading or check your assigned range." if not user_allowed_range or (user_allowed_range[0] > user_allowed_range[1]): # check for invalid range err_msg = "You have no samples assigned or your range is invalid." # Return a 9-tuple consistent with load_interface_data's structure return page_idx_user_relative, idx_on_page, None, gr.update(value="Error", interactive=False), gr.update(value="Error"), err_msg, "", gr.update(visible=False), False target_idx_on_page = idx_on_page + direction new_page_idx_user_relative = page_idx_user_relative new_idx_on_page = target_idx_on_page user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) # This check should ideally not be hit if current_page_data exists, but good safeguard if not user_allowed_range: # Use get_sample to fetch current state with an error message current_state = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) # current_state is a 9-tuple: (audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis_update) return page_idx_user_relative, idx_on_page, current_state[0], gr.update(value=current_state[1], interactive=current_state[5]), gr.update(value=current_state[3], elem_classes=[current_state[4]]), "Error: No allowed range for navigation.", current_state[7], current_state[8], current_state[6] if target_idx_on_page < 0: # Moving to previous page or beginning of assignment if page_idx_user_relative > 0: new_page_idx_user_relative = page_idx_user_relative - 1 temp_data = load_page_data(new_page_idx_user_relative) if temp_data is not None and not temp_data.empty: new_idx_on_page = len(temp_data) - 1 else: # Previous page is empty (shouldn't happen if logic is correct) current_state = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) status = current_state[2] + " [Already at the first sample of this page/range]" return page_idx_user_relative, idx_on_page, current_state[0], gr.update(value=current_state[1], interactive=current_state[5]), gr.update(value=current_state[3], elem_classes=[current_state[4]]), status, current_state[7], current_state[8], current_state[6] else: # Already on first item of first user-relative page current_state = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) status = current_state[2] + " [At the beginning of your assigned samples]" return page_idx_user_relative, idx_on_page, current_state[0], gr.update(value=current_state[1], interactive=current_state[5]), gr.update(value=current_state[3], elem_classes=[current_state[4]]), status, current_state[7], current_state[8], current_state[6] elif target_idx_on_page >= len(current_page_data): # Moving to next page or end of assignment new_page_idx_user_relative = page_idx_user_relative + 1 temp_data = load_page_data(new_page_idx_user_relative) if temp_data is not None and not temp_data.empty: new_idx_on_page = 0 else: # Next user-relative page is empty (means we are at the end of user's allowed samples) current_abs_idx_check = -1 if current_page_data is not None and not current_page_data.empty and idx_on_page < len(current_page_data): current_abs_idx_check = current_page_data.iloc[idx_on_page]['absolute_idx'] is_at_very_end = user_allowed_range and current_abs_idx_check != -1 and current_abs_idx_check >= user_allowed_range[1] current_state = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) status = current_state[2] if is_at_very_end: status += " [At the end of your assigned samples]" else: status += " [No more samples in this direction (next page empty or end of assignment)]" return page_idx_user_relative, idx_on_page, current_state[0], gr.update(value=current_state[1], interactive=current_state[5]), gr.update(value=current_state[3], elem_classes=[current_state[4]]), status, current_state[7], current_state[8], current_state[6] # If navigation is within the current page or to a new valid page/index return load_interface_data(new_page_idx_user_relative, new_idx_on_page) def go_next_sample_wrapper(page_idx_user_relative, idx_on_page): return navigate_sample(page_idx_user_relative, idx_on_page, 1) def go_prev_sample_wrapper(page_idx_user_relative, idx_on_page): return navigate_sample(page_idx_user_relative, idx_on_page, -1) def save_and_next_sample_first_phase(page_idx_user_relative, idx_on_page, current_text, is_accepted_by_reviewer_flag): user_is_reviewer = get_user_role(CURRENT_USERNAME) == "reviewer" accepted_to_save = is_accepted_by_reviewer_flag if user_is_reviewer else False save_msg = save_sample_data(page_idx_user_relative, idx_on_page, current_text, CURRENT_USERNAME, accepted_flag=accepted_to_save) print(save_msg) return navigate_sample(page_idx_user_relative, idx_on_page, 1) def review_and_next_sample_second_phase(page_idx_user_relative, idx_on_page, review_action: str): feedback_msg = handle_second_phase_action(page_idx_user_relative, idx_on_page, review_action) print(feedback_msg) return navigate_sample(page_idx_user_relative, idx_on_page, 1) def jump_to_absolute_idx(target_abs_idx_str, current_page_idx_user_relative, current_idx_on_page): global current_page_data # Fallback return using current state if jump fails def _fallback_return(status_message_suffix=""): current_state = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) status = current_state[2] + status_message_suffix return current_page_idx_user_relative, current_idx_on_page, current_state[0], gr.update(value=current_state[1], interactive=current_state[5]), gr.update(value=current_state[3], elem_classes=[current_state[4]]), status, current_state[7], current_state[8], current_state[6] try: target_abs_idx = int(target_abs_idx_str) if target_abs_idx < 0: target_abs_idx = 0 user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) if not user_allowed_range or not is_within_range(target_abs_idx, user_allowed_range): return _fallback_return(f" [Target index {target_abs_idx} is outside your assigned range {user_allowed_range or 'N/A'}.]") user_start_abs, _ = user_allowed_range offset_from_user_start = target_abs_idx - user_start_abs if offset_from_user_start < 0: return _fallback_return(f" [Logic Error: Target index {target_abs_idx} has negative offset from user start {user_start_abs}.]") new_user_relative_page_idx = offset_from_user_start // PAGE_SIZE # load_page_data updates global current_page_data and current_page temp_page_data_df = load_page_data(new_user_relative_page_idx) if temp_page_data_df is None or temp_page_data_df.empty: return _fallback_return(f" [No data found for your page {new_user_relative_page_idx} (containing abs index {target_abs_idx})].") # Calculate new_idx_on_page based on the target_abs_idx relative to the start of the loaded page # The loaded page (current_page_data) now starts at `user_start_abs + new_user_relative_page_idx * PAGE_SIZE` page_actual_start_abs = current_page_data.iloc[0]['absolute_idx'] if not current_page_data.empty else -1 if page_actual_start_abs == -1: # Should not happen if temp_page_data_df was not empty return _fallback_return(f" [Error: Page {new_user_relative_page_idx} loaded empty unexpectedly.]") new_idx_on_page_actual = target_abs_idx - page_actual_start_abs if not (0 <= new_idx_on_page_actual < len(current_page_data)): # This means target_abs_idx was in the user's range for this page, but the page didn't actually contain it # (e.g. dataset ended prematurely within this page's expected span) # Default to first item on the successfully loaded (but perhaps shorter) page. print(f"Warning: Target index {target_abs_idx} resulted in out-of-bounds id_on_page ({new_idx_on_page_actual}) for loaded page. Defaulting to 0.") new_idx_on_page_actual = 0 if current_page_data.empty: # Should be caught above return _fallback_return(f" [Page {new_user_relative_page_idx} is empty after load attempt for jump.]") return load_interface_data(new_user_relative_page_idx, new_idx_on_page_actual) except ValueError: return _fallback_return(" [Invalid index format for jump.]") except Exception as e: import traceback print(f"Error jumping to index: {str(e)}\n{traceback.format_exc()}") return _fallback_return(f" [Error jumping to index: {str(e)}]") def trim_audio_action(page_idx_user_relative, idx_on_page, trim_start_str, trim_end_str): def _return_current_state_with_message(msg_suffix): loaded_data = load_interface_data(page_idx_user_relative, idx_on_page) return (*loaded_data[0:5], loaded_data[5] + f" [{msg_suffix}]", *loaded_data[6:]) if SECOND_PHASE: return _return_current_state_with_message("Trimming disabled in Review Phase.") if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return _return_current_state_with_message("Audio data not available (page error for trim).") actual_sample_info = current_page_data.iloc[idx_on_page] absolute_idx = actual_sample_info['absolute_idx'] original_audio_path_info = get_audio_path(actual_sample_info["audio"]) source_basename_for_trimmed_file = os.path.basename(str(original_audio_path_info)) if isinstance(original_audio_path_info, str) else f"sample_raw_data_{absolute_idx}" audio_seg = None temp_dir_for_download = None try: if isinstance(original_audio_path_info, tuple): sr, audio_array = original_audio_path_info if not isinstance(audio_array, np.ndarray): return _return_current_state_with_message("Raw audio data is not a numpy array.") if audio_array.size == 0: return _return_current_state_with_message("Cannot trim empty audio array.") audio_array = np.ascontiguousarray(audio_array) channels = 1 if audio_array.ndim == 1 else (audio_array.shape[1] if audio_array.ndim == 2 and audio_array.shape[1] in [1,2] else (audio_array.shape[0] if audio_array.ndim == 2 and audio_array.shape[0] in [1,2] else 0)) if channels == 0: return _return_current_state_with_message(f"Unsupported audio array shape or channels: {audio_array.shape}") if audio_array.ndim == 2 and audio_array.shape[0] < audio_array.shape[1] and audio_array.shape[0] in [1, 2]: audio_array = np.ascontiguousarray(audio_array.T) if audio_array.dtype == np.float32 or audio_array.dtype == np.float64: audio_array_int = (audio_array * np.iinfo(np.int16).max).astype(np.int16) elif audio_array.dtype == np.int16: audio_array_int = audio_array elif audio_array.dtype == np.int32: audio_array_int = (audio_array >> 16).astype(np.int16) else: return _return_current_state_with_message(f"Unsupported numpy array dtype for raw audio: {audio_array.dtype}") sample_width = audio_array_int.itemsize audio_seg = AudioSegment(data=audio_array_int.tobytes(), sample_width=sample_width, frame_rate=sr, channels=channels) elif isinstance(original_audio_path_info, str): audio_to_load = original_audio_path_info if not (os.path.exists(audio_to_load) or audio_to_load.startswith("http")): return _return_current_state_with_message("Audio file path is invalid, does not exist, or is not a valid URL.") if audio_to_load.startswith("http"): temp_dir_for_download = tempfile.mkdtemp() url_fname = audio_to_load.split("/")[-1].split("?")[0] local_fpath = os.path.join(temp_dir_for_download, url_fname or "downloaded_audio.tmp") response = requests.get(audio_to_load, stream=True); response.raise_for_status() with open(local_fpath, 'wb') as f: shutil.copyfileobj(response.raw, f) audio_to_load = local_fpath audio_seg = AudioSegment.from_file(audio_to_load) else: return _return_current_state_with_message("Trimming not supported for this audio source.") if audio_seg is None: return _return_current_state_with_message("Failed to load audio segment.") try: start_s, end_s = float(trim_start_str), float(trim_end_str) except ValueError: return _return_current_state_with_message("Invalid trim times: Start and End must be numbers.") start_ms, end_ms, audio_duration_ms = int(start_s * 1000), int(end_s * 1000), len(audio_seg) if not (0 <= start_ms < end_ms and end_ms <= audio_duration_ms): return _return_current_state_with_message(f"Invalid trim times: start={start_s}s, end={end_s}s for audio of {audio_duration_ms/1000.0:.2f}s.") trimmed_seg = audio_seg[start_ms:end_ms] os.makedirs("trimmed_audio", exist_ok=True) safe_voice_name = re.sub(r'[^\w.-]', '_', source_basename_for_trimmed_file) trimmed_filename = f"trimmed_{absolute_idx}_{safe_voice_name}" if not os.path.splitext(trimmed_filename)[1]: trimmed_filename += ".wav" trimmed_path = os.path.join("trimmed_audio", trimmed_filename) export_format = os.path.splitext(trimmed_path)[1][1:].lower() or "wav" trimmed_seg.export(trimmed_path, format=export_format) dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if not sample: sample = Sample(id=absolute_idx, voice_name=os.path.basename(str(get_audio_path(actual_sample_info["audio"]) or f"sample_{absolute_idx}")), original_subtitle=actual_sample_info["sentence"], annotations=[]) dataset_model.samples = dataset_model.samples or [] dataset_model.samples.append(sample) now = datetime.now() annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) if not annotation: annotation = Annotation(annotator=CURRENT_USERNAME, create_at=now, update_at=now) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) annotation.audio_trims = [AudioTrim(start=start_s, end=end_s)] annotation.update_at = now save_annotations(dataset_model) # Return full state, but with new audio path and status message loaded_data_after_trim = load_interface_data(page_idx_user_relative, idx_on_page) # The audio path needs to be overridden here to show the trimmed path return (loaded_data_after_trim[0], loaded_data_after_trim[1], trimmed_path, loaded_data_after_trim[3], loaded_data_after_trim[4], loaded_data_after_trim[5] + " [Trimmed]", *loaded_data_after_trim[6:]) except Exception as e: import traceback print(f"Error during trim_audio_action for abs_idx {absolute_idx}: {str(e)}\n{traceback.format_exc()}") return _return_current_state_with_message(f"Error trimming: {str(e)}") finally: if temp_dir_for_download and os.path.exists(temp_dir_for_download): shutil.rmtree(temp_dir_for_download) def undo_trim_action(page_idx_user_relative, idx_on_page): def _return_current_state_with_message(msg_suffix): return load_interface_data(page_idx_user_relative, idx_on_page)[0:5] + \ (load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg_suffix}]",) + \ load_interface_data(page_idx_user_relative, idx_on_page)[6:] if SECOND_PHASE: return _return_current_state_with_message("Undo Trim disabled in Review Phase.") if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return _return_current_state_with_message("Audio data not available (page error).") absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if sample: annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) if annotation and annotation.audio_trims: annotation.audio_trims = None annotation.update_at = datetime.now() save_annotations(dataset_model) return _return_current_state_with_message("Trim undone") # Reloads UI showing original audio def confirm_delete_audio_action(page_idx_user_relative, idx_on_page): def _return_current_state_with_message(msg_suffix=""): # Default to no suffix if just reloading loaded_data = load_interface_data(page_idx_user_relative, idx_on_page) return (*loaded_data[0:5], loaded_data[5] + f" [{msg_suffix}]" if msg_suffix else loaded_data[5], *loaded_data[6:]) if SECOND_PHASE: return _return_current_state_with_message("Delete disabled in Review Phase.") if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return _return_current_state_with_message("Audio data not available (page error).") absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}")) dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if not sample: sample = Sample(id=absolute_idx, voice_name=voice_name_original, original_subtitle=current_page_data.iloc[idx_on_page]["sentence"], annotations=[]) dataset_model.samples = dataset_model.samples or [] dataset_model.samples.append(sample) sample.ignore_it = True now = datetime.now() deleted_text_marker = "AUDIO DELETED (This audio has been removed.)" annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) if annotation: annotation.annotated_subtitle = deleted_text_marker annotation.audio_trims = None annotation.update_at = now else: annotation = Annotation(annotator=CURRENT_USERNAME, annotated_subtitle=deleted_text_marker, create_at=now, update_at=now) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) save_annotations(dataset_model) return _return_current_state_with_message() # Reload interface to show deleted status def sanitize_string(s): if not isinstance(s, str): s = str(s) return re.sub(r'[^\w-./]', '_', s) def sanitize_sentence(s): if not isinstance(s, str): s = str(s) return s.encode('utf-8', errors='ignore').decode('utf-8') @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token_val=None): if not token_val: print("Cannot push to hub: No token provided for push_to_hub_with_retry.") return print(f"Pushing dataset to {repo_id}") dataset_dict.push_to_hub(repo_id, private=private, token=token_val) # Make sure this token has write access def export_to_huggingface(repo_name_str, hf_token_for_export, progress=gr.Progress()): if not hf_token_for_export: return "Export failed: Hugging Face token is missing." if not repo_name_str or len(repo_name_str.split('/')) != 2: return "Export failed: Repository name must be in 'username/dataset-name' format." try: start_time = time.time() print(f"Export started at {time.strftime('%Y-%m-%d %H:%M:%S')}") dataset_model_annotations = load_saved_annotations() current_total_samples = total_samples if current_total_samples <= 0: info = get_dataset_info() current_total_samples = total_samples if current_total_samples <= 0: return "Export failed: Total number of samples is unknown or invalid." ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False, token=hf_token_for_export) # Use token for private source iteration_limit = len(ds_source) if iteration_limit != current_total_samples: print(f"Warning: Source dataset length ({iteration_limit}) mismatches cached total_samples ({current_total_samples}). Using source length for export.") exported_data_list = [] progress(0, f"Preparing {iteration_limit} samples for export...") num_processed_from_source = 0 for i, source_sample in enumerate(ds_source): if i >= iteration_limit: break num_processed_from_source +=1 absolute_idx = i audio_entry = source_sample.get("audio") sentence_val = source_sample.get("sentence", "") audio_dict_to_export = audio_entry annotation_data = next((s for s in dataset_model_annotations.samples or [] if s.id == absolute_idx), None) if annotation_data: if annotation_data.ignore_it: sentence_val = "AUDIO DELETED (This audio has been removed.)" audio_dict_to_export = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} else: best_ann = None if annotation_data.annotations: approved_anns = [a for a in annotation_data.annotations if a.second_phase_review_status == "approved"] if SECOND_PHASE and approved_anns: best_ann = sorted(approved_anns, key=lambda x: x.second_phase_review_timestamp or datetime.min, reverse=True)[0] if not best_ann: accepted_anns = [a for a in annotation_data.annotations if a.is_first_phase_accepted] best_ann = sorted(accepted_anns, key=lambda x: x.update_at, reverse=True)[0] if accepted_anns else None if not best_ann: best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0] if best_ann: sentence_val = best_ann.annotated_subtitle if best_ann.annotated_subtitle is not None else sentence_val if best_ann.audio_trims and audio_dict_to_export: original_audio_path_for_trim_lookup = get_audio_path(audio_entry) original_voice_name_for_trim = os.path.basename(str(original_audio_path_for_trim_lookup or f"sample_{absolute_idx}")) safe_voice_name_for_trim = re.sub(r'[^\w.-]', '_', original_voice_name_for_trim) trimmed_fname_base = f"trimmed_{absolute_idx}_{safe_voice_name_for_trim}" potential_trimmed_path = os.path.join("trimmed_audio", trimmed_fname_base + ".wav") if os.path.exists(potential_trimmed_path): try: arr, sr_trim = sf.read(potential_trimmed_path) # Renamed sr to sr_trim audio_dict_to_export = {"array": arr, "sampling_rate": sr_trim} except Exception as e_read_trim: print(f"Warning: Could not read trimmed audio file {potential_trimmed_path} for sample {absolute_idx}: {e_read_trim}.") # else: # Keep original audio_dict_to_export exported_data_list.append({ "audio": audio_dict_to_export, "sentence": sanitize_sentence(sentence_val) }) if (i + 1) % 100 == 0: progress((i + 1) / iteration_limit, f"Processed {i+1}/{iteration_limit} samples") gc.collect() if not exported_data_list: return "No data to export after processing." for item in exported_data_list: # Ensure audio format before creating Dataset audio_item = item["audio"] if audio_item is None or (isinstance(audio_item, dict) and audio_item.get('path') is None and audio_item.get('array') is None): item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Placeholder for missing/deleted try: final_dataset = Dataset.from_list(exported_data_list) # Cast audio, ensure all items have 'array' and 'sampling_rate' or valid 'path' final_dataset = final_dataset.cast_column("audio", Audio(sampling_rate=16000)) except Exception as e_cast: print(f"Error during Dataset.from_list or cast_column: {e_cast}") for idx_problem, problematic_item in enumerate(exported_data_list[:5]): print(f"Sample item {idx_problem} for export: Audio type {type(problematic_item['audio'])}, Content: {str(problematic_item['audio'])[:200]}") return f"Export failed during data conversion: {e_cast}." dataset_dict_export = DatasetDict({"train": final_dataset}) progress(0.95, "Uploading to Hugging Face...") try: current_hf_user = whoami(token=hf_token_for_export)['name'] except Exception as e_whoami_export: return f"Export failed: Could not verify Hugging Face user with provided token: {e_whoami_export}" dataset_name_part = repo_name_str.split('/')[-1] # Get 'my-annotated-dataset' from 'user/my-annotated-dataset' target_repo_id = f"{current_hf_user}/{dataset_name_part}" push_to_hub_with_retry(dataset_dict=dataset_dict_export, repo_id=target_repo_id, private=True, token_val=hf_token_for_export) end_time = time.time() print(f"Upload done, total time: {end_time - start_time:.2f}s") progress(1.0, "Upload complete!") return f"Exported to huggingface.co/datasets/{target_repo_id}" except Exception as e: import traceback error_msg = f"Export failed: {str(e)}" print(f"{error_msg}\n{traceback.format_exc()}") return error_msg def hf_login(hf_token_val_ui): global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges, SECOND_PHASE_REVIEW_MAPPING, annotation_count # Reset session-specific annotation count on new login annotation_count = 0 # Default state for UI elements on login failure or before successful load failed_login_transcript_update = gr.update(value="", interactive=False) def _failed_login_outputs(login_msg_text, reviewer_text_val="N/A"): # This function constructs the 19-tuple for login outputs return ( gr.update(visible=True), gr.update(visible=False), # login_container, main_container gr.update(value=reviewer_text_val), hf_token_val_ui, login_msg_text, # reviewer_tb, hf_token_state, login_message gr.update(visible=False), failed_login_transcript_update, # save_next_button, transcript_tb (interactive) gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), # trim, undo_trim, delete buttons gr.update(visible=False, value=False), # first_phase_accept_cb (vis & val) gr.update(visible=False), gr.update(visible=False), # approve_button, reject_button 0, 0, None, failed_login_transcript_update, # page_idx, idx_on_page, audio, transcript_tb (value) login_msg_text if "failed" in login_msg_text.lower() or "error" in login_msg_text.lower() else "Please log in.", # status_md "" # original_transcript_state ) if not hf_token_val_ui: return _failed_login_outputs("Login failed: Token cannot be empty.") try: print(f"Attempting login with token from UI...") user_info = whoami(token=hf_token_val_ui) username = user_info['name'] print(f"whoami successful for user: {username}") if username in ALLOWED_USERS: CURRENT_USERNAME = username token = hf_token_val_ui # IMPORTANT: Set the global token to the one provided in UI print(f"User '{CURRENT_USERNAME}' is in ALLOWED_USERS. Global token updated.") # Crucial: Fetch dataset info and ranges AFTER successful login & token set # Reset total_samples to ensure it's re-fetched with the new token if necessary total_samples = 0 ds_info = get_dataset_info() if total_samples <= 0: return _failed_login_outputs(f"Login OK for {CURRENT_USERNAME}, but failed to get dataset size. Cannot proceed.", reviewer_text_val="Error: No Dataset Size") annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) if SECOND_PHASE: # SECOND_PHASE_REVIEW_MAPPING.clear() # Clear previous mapping initialize_second_phase_assignments() # This uses global annotator_ranges user_allowed_range_check = get_user_allowed_range(CURRENT_USERNAME) if not user_allowed_range_check or user_allowed_range_check[0] > user_allowed_range_check[1]: return _failed_login_outputs(f"Login OK for {CURRENT_USERNAME}, but no samples assigned for {'review' if SECOND_PHASE else 'annotation'}.", reviewer_text_val="No Samples Assigned") current_page_data = load_page_data(0) # page_num_within_user_view = 0 # Check if page loading actually got data initial_idx_on_page = 0 if current_page_data is None or current_page_data.empty: print(f"Warning: Initial page load for user {CURRENT_USERNAME} resulted in no data.") # Attempt to load interface with (0,0) but expect "no data" messages from get_sample initial_idx_on_page = 0 # or handle as error if no data at all is critical # load_interface_data returns a 9-tuple initial_load_tuple = load_interface_data(current_page, initial_idx_on_page) is_second_phase_active = SECOND_PHASE # Structure for login_outputs (19 items) return ( gr.update(visible=False), # 0 login_container gr.update(visible=True), # 1 main_container initial_load_tuple[4], # 2 reviewer_tb (gr.update obj from load_interface_data) hf_token_val_ui, # 3 hf_token_state (value) -> updates the gr.State f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if is_second_phase_active else 'Annotation'}.", # 4 login_message gr.update(visible=not is_second_phase_active), # 5 save_next_button (visibility) initial_load_tuple[3], # 6 transcript_tb (gr.update obj for value and interactivity) gr.update(visible=not is_second_phase_active), # 7 trim_button (visibility) gr.update(visible=not is_second_phase_active), # 8 undo_trim_button (visibility) gr.update(visible=not is_second_phase_active), # 9 delete_button (visibility) gr.update(visible=initial_load_tuple[7]['visible'], value=initial_load_tuple[8]), # 10 first_phase_accept_cb (vis from [7], val from [8]) gr.update(visible=is_second_phase_active), # 11 approve_button (visibility) gr.update(visible=is_second_phase_active), # 12 reject_button (visibility) initial_load_tuple[0], # 13 current_page_idx_state (value) initial_load_tuple[1], # 14 current_idx_on_page_state (value) initial_load_tuple[2], # 15 audio_player (value or gr.update obj) initial_load_tuple[3], # 16 transcript_tb (can be same as 6, Gradio handles it) initial_load_tuple[5], # 17 status_md (value) initial_load_tuple[6] # 18 original_transcript_state (value) ) else: CURRENT_USERNAME = None token = None # Clear global token if auth fails or user not allowed return _failed_login_outputs(f"User '{username}' not in allowed user list.", reviewer_text_val="Unauthorized") except Exception as e: CURRENT_USERNAME = None token = None # Clear global token on any login exception import traceback login_err_msg = f"Login failed: {str(e)}" print(f"{login_err_msg}\n{traceback.format_exc()}") return _failed_login_outputs(login_err_msg, reviewer_text_val="Login Error") # Gradio Interface (largely same as your previous version) css = """ .white { background-color: white; color: black; } .yellow { background-color: yellow; color: black; } .blue { background-color: lightblue; color: black; } .green { background-color: lightgreen; color: black; } .pink { background-color: pink; color: black; } .red { background-color: #FF7F7F; color: black; } .orange { background-color: orange; color: black; } .gray { background-color: lightgray; color: black; } .lightgray { background-color: #f0f0f0; color: black; } .reviewer-textbox input { text-align: center; font-weight: bold; } """ with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: # hf_token_state will store the token provided via UI and used for operations. # Initialize with env var 'token' if available, otherwise empty. # This gr.State is updated by the hf_login function's output. hf_token_state = gr.State(os.getenv("hf_token") or "") current_page_idx_state = gr.State(0) current_idx_on_page_state = gr.State(0) original_transcript_state = gr.State("") with gr.Column(visible=True, elem_id="login_container") as login_container: gr.Markdown("## HF Authentication") # hf_token_input default value is also from env var, or empty. hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", value="") login_button = gr.Button("Login") login_message = gr.Markdown("") with gr.Column(visible=False, elem_id="main_container") as main_container: gr.Markdown("# ASR Dataset Labeling Interface") status_md = gr.Markdown("Please log in.") with gr.Row(): with gr.Column(scale=2): audio_player = gr.Audio(label="Audio Sample", autoplay=False) transcript_tb = gr.TextArea(label="Transcript", lines=5, interactive=False) reviewer_tb = gr.Textbox(label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"]) with gr.Column(scale=1): gr.Markdown("### Navigation") prev_button = gr.Button("← Previous") next_button = gr.Button("Next (no save)") save_next_button = gr.Button("Save & Next", variant="primary", visible=not SECOND_PHASE) first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=False, value=False) approve_button = gr.Button("Approve & Next", variant="primary", visible=SECOND_PHASE) reject_button = gr.Button("Reject & Next", variant="stop", visible=SECOND_PHASE) gr.Markdown("### Audio Tools (Phase 1 only)") with gr.Row(): trim_start_tb = gr.Textbox(label="Trim Start (s)", placeholder="e.g., 1.5", scale=1) trim_end_tb = gr.Textbox(label="Trim End (s)", placeholder="e.g., 3.0", scale=1) trim_button = gr.Button("Trim Audio", visible=not SECOND_PHASE) undo_trim_button = gr.Button("Undo Trim", visible=not SECOND_PHASE) delete_button = gr.Button("Mark Audio as Deleted", variant="stop", visible=not SECOND_PHASE) with gr.Accordion("Advanced Navigation & Export", open=False): with gr.Row(): jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter dataset absolute index") jump_button = gr.Button("Jump") with gr.Row(): # Default repo name will be updated more accurately if user logs in. # For now, a generic placeholder. hf_repo_name_tb = gr.Textbox(label="Export Repository Name (your_hf_username/dataset-name)", value="your-hf-username/my-annotated-asr-dataset") hf_export_button = gr.Button("Export to Hugging Face", variant="primary") hf_export_status_md = gr.Markdown("") # Outputs for login_button (19 outputs) login_outputs = [ login_container, main_container, reviewer_tb, hf_token_state, login_message, # 0-4 save_next_button, transcript_tb, trim_button, undo_trim_button, delete_button, # 5-9 first_phase_accept_cb, # 10 (this receives a gr.update obj with 'visible' and 'value' keys) approve_button, reject_button, # 11-12 current_page_idx_state, current_idx_on_page_state, audio_player, # 13-15 transcript_tb, # 16 (target for transcript value, can be same as #6) status_md, original_transcript_state # 17-18 ] login_button.click(fn=hf_login, inputs=[hf_token_input], outputs=login_outputs) # Common outputs for navigation and actions that reload sample view (9 outputs from load_interface_data) # (page_idx_state, idx_on_page_state, audio_player, transcript_tb_update, reviewer_tb_update, # status_md, original_transcript_state, first_phase_accept_cb_vis_update, first_phase_accept_cb_val) navigation_outputs_extended = [ current_page_idx_state, current_idx_on_page_state, # States audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state, # UI components first_phase_accept_cb, # For visibility update (receives gr.update(visible=...)) first_phase_accept_cb # For value update (receives value directly, Gradio checkbox handles it) ] save_next_button.click( fn=save_and_next_sample_first_phase, inputs=[current_page_idx_state, current_idx_on_page_state, transcript_tb, first_phase_accept_cb], outputs=navigation_outputs_extended ) next_button.click( fn=go_next_sample_wrapper, inputs=[current_page_idx_state, current_idx_on_page_state], outputs=navigation_outputs_extended ) prev_button.click( fn=go_prev_sample_wrapper, inputs=[current_page_idx_state, current_idx_on_page_state], outputs=navigation_outputs_extended ) approve_button.click( fn=review_and_next_sample_second_phase, inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("approved")], outputs=navigation_outputs_extended ) reject_button.click( fn=review_and_next_sample_second_phase, inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("rejected")], outputs=navigation_outputs_extended ) trim_button.click( fn=trim_audio_action, inputs=[current_page_idx_state, current_idx_on_page_state, trim_start_tb, trim_end_tb], outputs=navigation_outputs_extended ) undo_trim_button.click( fn=undo_trim_action, inputs=[current_page_idx_state, current_idx_on_page_state], outputs=navigation_outputs_extended ) delete_button.click( fn=confirm_delete_audio_action, inputs=[current_page_idx_state, current_idx_on_page_state], outputs=navigation_outputs_extended ) jump_button.click( fn=jump_to_absolute_idx, inputs=[jump_text_tb, current_page_idx_state, current_idx_on_page_state], outputs=navigation_outputs_extended ) hf_export_button.click( fn=export_to_huggingface, inputs=[hf_repo_name_tb, hf_token_state], # Use hf_token_state here outputs=[hf_export_status_md], queue=True ) if __name__ == "__main__": # Initializations that don't depend on login token can be here # For example, setting SECOND_PHASE based on an env var or config file. # However, total_samples and annotator_ranges should primarily be determined *after* login, # as they might depend on the dataset accessible by the user's token. # Example: Override SECOND_PHASE for testing # os.environ['APP_SECOND_PHASE'] = "True" # SECOND_PHASE = os.getenv('APP_SECOND_PHASE', 'False').lower() == 'true' print(f"Application starting. Second phase mode: {SECOND_PHASE}") # Initial dataset info try (might fail if token needed and not globally set from env) # This is mostly for informational purposes before login, hf_login will do a more robust fetch. if total_samples <= 0: print("Main block: total_samples not yet set. Will be determined after login.") if SECOND_PHASE: print("==== APPLICATION LAUNCHING IN SECOND PHASE (REVIEW MODE) ====") # Initialization of SECOND_PHASE_REVIEW_MAPPING will happen after login, # once total_samples and annotator_ranges are confirmed. else: print("==== APPLICATION LAUNCHING IN FIRST PHASE (ANNOTATION MODE) ====") demo.queue().launch(debug=True, share=False) # Set share=True for public link