diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -15,31 +15,25 @@ from tenacity import retry, stop_after_attempt, wait_exponential import re import numpy as np from pydantic import BaseModel -from typing import Optional, List, Tuple +from typing import Optional, List from datetime import datetime # Log in with Hugging Face token token = os.getenv("hf_token") -if token: - login(token) -else: - print("Warning: hf_token environment variable not set. Hugging Face Hub operations might fail.") +login(token) # Configuration HF_DATASET_NAME = "navidved/channelb-raw-data" AUDIO_DIR = "audio" SAVE_PATH = "annotations.json" -ALLOWED_USERS = ["vargha", "navidved", "userC"] # Added userC for testing 2nd phase with >1 annotator -REVIEWERS = ["vargha"] # First phase reviewers -ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] # First phase annotators +ALLOWED_USERS = ["vargha", "navidved"] +REVIEWERS = ["vargha"] +ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] CURRENT_USERNAME = None -PAGE_SIZE = 100 # Kept for pagination logic, though review might be sample by sample +PAGE_SIZE = 100 SAVE_INTERVAL = 10 - -# --- SECOND PHASE CONFIGURATION --- -SECOND_PHASE = False # Set to True to activate second phase review -SECOND_PHASE_REVIEW_MAPPING = {} # Populated if SECOND_PHASE is True. Maps: reviewer_username -> original_annotator_username -# Example: {"navidved": "userC"} means navidved reviews userC's work +# Future: Define CHUNK_SIZE for splitting dataset into sets assigned to two annotators +CHUNK_SIZE = 100 # Global state variables current_page = 0 @@ -47,9 +41,11 @@ ds_iter = None current_page_data = None audio_backup = {} annotation_count = 0 -unsaved_changes = {} # Primarily for first phase +unsaved_changes = {} total_samples = 0 -annotator_ranges = {} # Stores {annotator_username: (start_idx, end_idx)} for first phase +annotator_ranges = None +# Future: Track completed chunks for dynamic assignment +# completed_chunks = {} # Dict mapping annotator to set of completed chunk indices # Pydantic data models class AudioTrim(BaseModel): @@ -57,21 +53,14 @@ class AudioTrim(BaseModel): end: float class Annotation(BaseModel): - annotator: str # Original annotator (first phase) + annotator: str annotated_subtitle: Optional[str] = None audio_trims: Optional[List[AudioTrim]] = None - - # First phase review fields - is_first_phase_accepted: bool = False - first_phase_reviewer_username: Optional[str] = None - - # Second phase review fields - second_phase_reviewed_by: Optional[str] = None - second_phase_review_status: Optional[str] = None # "approved" or "rejected" - second_phase_review_timestamp: Optional[datetime] = None - + accepted: bool = False create_at: datetime update_at: datetime + # Future: Add feedback field for approved/rejected + # feedback: Optional[str] = None # "approved" or "rejected" class Sample(BaseModel): id: int @@ -80,27 +69,26 @@ class Sample(BaseModel): ignore_it: bool = False description: Optional[str] = None annotations: Optional[List[Annotation]] = None - is_approved_in_second_phase: bool = False # True if the primary annotation is approved in 2nd phase + # Future: Add accepted field at Sample level + # accepted: bool = False # True if both assigned annotators approve -class DatasetModel(BaseModel): # Renamed to avoid conflict with datasets.Dataset +class Dataset(BaseModel): samples: Optional[List[Sample]] = None # Utility functions def load_saved_annotations(): - dataset_model = None + dataset = None if os.path.exists(SAVE_PATH): try: with open(SAVE_PATH, "r", encoding="utf-8") as f: data = json.load(f) - dataset_model = DatasetModel(**data) + dataset = Dataset(**data) print("Loaded annotations from local JSON file") except Exception as e: print(f"Error loading local JSON file: {str(e)}. Removing invalid file.") - # os.remove(SAVE_PATH) # Be cautious with auto-removing - dataset_model = None - - - if dataset_model is None and token: + os.remove(SAVE_PATH) + + if dataset is None: try: hf_path = hf_hub_download( repo_id=HF_DATASET_NAME, @@ -110,37 +98,32 @@ def load_saved_annotations(): ) with open(hf_path, "r", encoding="utf-8") as f: data = json.load(f) - dataset_model = DatasetModel(**data) - # Cache it locally + dataset = Dataset(**data) with open(SAVE_PATH, "w", encoding="utf-8") as f: - f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) + f.write(dataset.model_dump_json(exclude_none=True, indent=4)) print("Loaded annotations from HF dataset repository and cached locally") except Exception as e: print(f"Error loading JSON file from HF repo: {str(e)}") - dataset_model = None - - if dataset_model is None: - dataset_model = DatasetModel(samples=[]) - print("Created new empty DatasetModel for annotations") - - return dataset_model + + if dataset is None: + dataset = Dataset(samples=[]) + print("Created new empty Dataset for annotations") + + return dataset -def save_annotations(dataset_model: DatasetModel): +def save_annotations(dataset: Dataset): global annotation_count try: with open(SAVE_PATH, "w", encoding="utf-8") as f: - f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) + f.write(dataset.model_dump_json(exclude_none=True, indent=4)) print(f"Saved annotations to {SAVE_PATH}") annotation_count += 1 - if annotation_count % SAVE_INTERVAL == 0 and token: + if annotation_count % SAVE_INTERVAL == 0: push_json_to_hf() except Exception as e: print(f"Error saving annotations: {str(e)}") def push_json_to_hf(): - if not token: - print("Cannot push to HF: token not available.") - return try: api = HfApi() api.upload_file( @@ -154,1258 +137,795 @@ def push_json_to_hf(): except Exception as e: print(f"Error uploading JSON file: {str(e)}") -def calculate_annotator_ranges(total_samples_val, annotators_list): - num_annotators = len(annotators_list) - if num_annotators == 0 or total_samples_val <= 0: +def calculate_annotator_ranges(total_samples, annotators): + num_annotators = len(annotators) + if num_annotators == 0: return {} - - samples_per_annotator = total_samples_val // num_annotators - extra_samples = total_samples_val % num_annotators + samples_per_annotator = total_samples // num_annotators + extra_samples = total_samples % num_annotators ranges = {} start = 0 - for i, annotator in enumerate(annotators_list): + for i, annotator in enumerate(annotators): end = start + samples_per_annotator - 1 if i < extra_samples: end += 1 - if end >= total_samples_val: # Ensure end does not exceed total_samples - end = total_samples_val -1 - if start <= end : # Ensure start is not greater than end - ranges[annotator] = (start, end) + ranges[annotator] = (start, end) start = end + 1 return ranges -def initialize_second_phase_assignments(): - global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges - if not ANNOTATORS or len(ANNOTATORS) < 1: # Requires at least 1 annotator to review their own work, or 2 for cross-review - print("Not enough annotators for second phase review.") - SECOND_PHASE_REVIEW_MAPPING = {} - return - - # Ensure annotator_ranges is populated - if not annotator_ranges and total_samples > 0: - print("Populating annotator_ranges for second phase initialization.") - annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - - - if len(ANNOTATORS) == 1: - # Single annotator reviews their own work if that's the desired logic - # Or, this phase might not apply. For now, let's allow self-review. - annotator = ANNOTATORS[0] - SECOND_PHASE_REVIEW_MAPPING[annotator] = annotator - print(f"Second phase: {annotator} will review their own work.") - else: - # Cyclic assignment: annotator[i] reviews annotator[i-1]'s work - for i, reviewer_user in enumerate(ANNOTATORS): - original_annotator_idx = (i - 1 + len(ANNOTATORS)) % len(ANNOTATORS) - original_annotator_user = ANNOTATORS[original_annotator_idx] - SECOND_PHASE_REVIEW_MAPPING[reviewer_user] = original_annotator_user - print(f"Second phase: {reviewer_user} will review {original_annotator_user}'s work.") - - # Verify that original annotators have ranges - for reviewer, original_annotator in SECOND_PHASE_REVIEW_MAPPING.items(): - if original_annotator not in annotator_ranges: - print(f"Warning: Original annotator {original_annotator} has no range defined in annotator_ranges.") - # This could happen if total_samples was 0 or annotator_ranges wasn't calculated correctly. - def get_user_allowed_range(username): - global annotator_ranges, total_samples - if SECOND_PHASE: - if not SECOND_PHASE_REVIEW_MAPPING: # Ensure it's initialized - initialize_second_phase_assignments() - - original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(username) - if original_annotator_to_review: - # The user `username` is reviewing `original_annotator_to_review`'s work. - # The range is the original work range of `original_annotator_to_review`. - if not annotator_ranges and total_samples > 0: # Lazy init for ranges if needed - annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - - return annotator_ranges.get(original_annotator_to_review) - else: # User is not a designated reviewer in the second phase mapping - return None # Or (0,-1) to signify no access - else: # First Phase Logic - if get_user_role(username) == "reviewer": # First phase reviewers see everything - return (0, total_samples - 1) if total_samples > 0 else None - elif username in annotator_ranges: # First phase annotators see their assigned range - return annotator_ranges[username] - else: - return None + if get_user_role(username) == "reviewer": + return (0, total_samples - 1) + elif username in annotator_ranges: + return annotator_ranges[username] + else: + return None def is_within_range(absolute_idx, allowed_range): if allowed_range is None: return False return allowed_range[0] <= absolute_idx <= allowed_range[1] -def get_user_role(username): # This defines first-phase roles +def get_user_role(username): return "reviewer" if username in REVIEWERS else "annotator" def init_dataset_iterator(): global ds_iter try: - # It's better to load the dataset on demand rather than keeping an iterator open. - # For streaming, iter(load_dataset(...)) is fine if used immediately. - # ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - # ds_iter = iter(ds) + ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) + ds_iter = iter(ds) return True except Exception as e: print(f"Error initializing dataset iterator: {e}") return False def load_page_data(page_num=0): - global current_page_data, current_page, total_samples - - # For streaming, we re-fetch and skip. - try: - ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - temp_ds_iter = iter(ds) - except Exception as e: - print(f"Error loading dataset for page data: {e}") - current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page"]) - return current_page_data - - # Determine the actual range of samples the user can see - # This needs to be based on the full dataset indices, not just page logic - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not allowed_range: - print(f"User {CURRENT_USERNAME} has no allowed range.") - current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page"]) - return current_page_data - - # Calculate start and end absolute indices for the requested page, clipped by allowed_range - page_start_abs_idx = page_num * PAGE_SIZE - page_end_abs_idx = page_start_abs_idx + PAGE_SIZE - 1 - - # Effective start and end for fetching, considering user's total allowed range - effective_start_idx = max(page_start_abs_idx, allowed_range[0]) - effective_end_idx = min(page_end_abs_idx, allowed_range[1]) - - samples_on_page = [] - current_absolute_idx = 0 - - # Iterate through the dataset to find samples within the effective range for this page - # This can be slow for large datasets and large page_num with streaming. - # A non-streaming dataset or a more optimized way to seek would be better for large scale. - - idx_counter_for_page = 0 - for i, sample_data in enumerate(temp_ds_iter): - current_absolute_idx = i # Absolute index in the full dataset - - if current_absolute_idx > effective_end_idx : - break # Past the samples needed for this page and user range - - if current_absolute_idx >= effective_start_idx: - # This sample is within the user's allowed range and on the current conceptual page - sample_data['absolute_idx'] = current_absolute_idx - sample_data['id_within_page'] = idx_counter_for_page # relative index on current page view - samples_on_page.append(sample_data) - idx_counter_for_page +=1 - if len(samples_on_page) >= PAGE_SIZE : # Filled the page - break - + global ds_iter, current_page_data, current_page + if ds_iter is None: + if not init_dataset_iterator(): + return pd.DataFrame(columns=["audio", "sentence"]) + if page_num < current_page: + ds_iter = iter(load_dataset(HF_DATASET_NAME, split="train", streaming=True)) + current_page = 0 + samples_to_skip = page_num * PAGE_SIZE - (current_page * PAGE_SIZE) if page_num > current_page else 0 + for _ in range(samples_to_skip): + try: + next(ds_iter) + except StopIteration: + break + samples = [] + for _ in range(PAGE_SIZE): + try: + sample = next(ds_iter) + samples.append(sample) + except StopIteration: + break current_page = page_num - if samples_on_page: - current_page_data = pd.DataFrame(samples_on_page) - else: - # If no samples found (e.g., page is outside effective range) - current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) - print(f"No samples found for user {CURRENT_USERNAME} on page {page_num} within effective range {effective_start_idx}-{effective_end_idx}") + current_page_data = pd.DataFrame(samples) + + if CURRENT_USERNAME and get_user_role(CURRENT_USERNAME) == "annotator": + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if allowed_range: + start_idx = page_num * PAGE_SIZE + current_page_data['absolute_idx'] = range(start_idx, start_idx + len(current_page_data)) + current_page_data = current_page_data[ + (current_page_data['absolute_idx'] >= allowed_range[0]) & + (current_page_data['absolute_idx'] <= allowed_range[1]) + ] + current_page_data = current_page_data.drop(columns=['absolute_idx']) - gc.collect() - return current_page_data - def get_dataset_info(): - global total_samples # Use global total_samples - if total_samples > 0: # If already fetched - return {'num_samples': total_samples} try: - # Temporarily load to get info, can be slow for huge datasets if not streaming - # For streaming, num_examples might be None or -1, so actual iteration might be needed - info = load_dataset(HF_DATASET_NAME, streaming=True, split="train").info - # The 'num_examples' for a streaming dataset split might not be accurate or available. - # It's often -1 or None. You might need a way to get the true total count if it's crucial. - # For now, we'll use it if available, otherwise, it remains a challenge for pure streaming. - if hasattr(info, 'estimated_size') and info.estimated_size is not None: # Check an alternative if num_examples is not good - pass # Not directly number of samples - - # Fallback: iterate to count if num_examples is not reliable - # This is very inefficient and should be avoided if possible. - # A pre-calculated count or a different dataset split might be needed. - # For this example, we'll assume info.splits['train'].num_examples is somewhat usable - # or that a fixed total_samples is set if this is problematic. - - # Simplified: try to get from info, but acknowledge limitations - ds_info_obj = load_dataset(HF_DATASET_NAME, split="train") # Load non-streaming for info - num_samples_val = ds_info_obj.num_rows - if num_samples_val and num_samples_val > 0: - total_samples = num_samples_val - return {'num_samples': total_samples} - - # If still no count, this is an issue for range calculations. - # For now, return -1, but this will break range logic. - print("Warning: Could not reliably determine total_samples from dataset info.") + info = load_dataset(HF_DATASET_NAME, split="train", streaming=True).info + if hasattr(info, 'splits') and 'train' in info.splits: + return {'num_samples': info.splits['train'].num_examples} return {'num_samples': -1} - except Exception as e: print(f"Error getting dataset info: {e}") return {'num_samples': -1} - -# Initial data load (moved after functions it calls are defined) -# init_dataset_iterator() # Iterator not maintained globally anymore for streaming robustness -dataset_info = get_dataset_info() # This sets global total_samples +init_dataset_iterator() +current_page_data = load_page_data(0) +dataset_info = get_dataset_info() +total_samples = dataset_info.get('num_samples', -1) if total_samples > 0: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - if SECOND_PHASE: - initialize_second_phase_assignments() # Initialize after annotator_ranges might be populated else: - print("Warning: total_samples is not positive. Annotation ranges and second phase assignments may be incorrect.") annotator_ranges = {} -# Load first page data for the initial user if any -# This should happen after login when CURRENT_USERNAME is set. -# current_page_data = load_page_data(0) # Moved to hf_login success path - - def get_audio_path(audio_entry): if isinstance(audio_entry, dict): if "array" in audio_entry and "sampling_rate" in audio_entry: - return (audio_entry["sampling_rate"], audio_entry["array"]) # Return tuple for direct use + return (audio_entry["sampling_rate"], audio_entry["array"]) return audio_entry.get("path", None) if isinstance(audio_entry, str): if audio_entry.startswith("http://") or audio_entry.startswith("https://"): - return audio_entry # URL - if os.path.exists(audio_entry): # Absolute path return audio_entry - # Relative path (try joining with AUDIO_DIR if one is configured) - if AUDIO_DIR: - joined_path = os.path.join(AUDIO_DIR, audio_entry) - if os.path.exists(joined_path): - return joined_path - return audio_entry # Return as is, might be a relative path resolvable by datasets - return None # Or handle unknown type + if os.path.exists(audio_entry): + return audio_entry + joined_path = os.path.join(AUDIO_DIR, audio_entry) + if os.path.exists(joined_path): + return joined_path + return audio_entry + # Core functions -def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_action, accepted_flag=False): +def save_sample_data(page_idx, idx, transcript, reviewer, accepted=False): global current_page_data, unsaved_changes + if idx >= len(current_page_data): + return "Invalid index" + absolute_idx = page_idx * PAGE_SIZE + idx + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not is_within_range(absolute_idx, allowed_range): + return "You are not allowed to annotate this sample." + audio_entry = current_page_data.iloc[idx]["audio"] + voice_name = os.path.basename(str(audio_entry)) + dataset = load_saved_annotations() - if current_page_data is None or idx_on_page >= len(current_page_data): - return "Invalid index or data not loaded for current page." - - actual_sample_info = current_page_data.iloc[idx_on_page] - absolute_idx = actual_sample_info['absolute_idx'] - - # First phase saving logic - allowed_range = get_user_allowed_range(current_user_performing_action) - if not is_within_range(absolute_idx, allowed_range) and not SECOND_PHASE: # In 2nd phase, this check is implicitly handled by page loading - return "You are not allowed to annotate this sample (out of range)." - - audio_entry_original = actual_sample_info["audio"] # This might be path or dict - voice_name = os.path.basename(str(get_audio_path(audio_entry_original) or f"sample_{absolute_idx}")) - - dataset_model = load_saved_annotations() - sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) - + sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) if not sample: sample = Sample( id=absolute_idx, voice_name=voice_name, - original_subtitle=actual_sample_info["sentence"], + original_subtitle=current_page_data.iloc[idx]["sentence"], annotations=[] ) - dataset_model.samples = dataset_model.samples or [] - dataset_model.samples.append(sample) - - now = datetime.now() - # In the first phase, current_user_performing_action is the annotator or reviewer. - # 'accepted_flag' is used if current_user_performing_action is a first-phase reviewer. - annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) + dataset.samples = dataset.samples or [] + dataset.samples.append(sample) + now = datetime.now() + annotation = next((a for a in sample.annotations or [] if a.annotator == reviewer), None) if annotation: annotation.annotated_subtitle = transcript.strip() annotation.update_at = now - if get_user_role(current_user_performing_action) == "reviewer": # First phase reviewer - annotation.is_first_phase_accepted = accepted_flag - annotation.first_phase_reviewer_username = current_user_performing_action if accepted_flag else None + if get_user_role(reviewer) == "reviewer": + annotation.accepted = accepted else: - new_annotation_data = { - "annotator": current_user_performing_action, - "annotated_subtitle": transcript.strip(), - "create_at": now, - "update_at": now, - "is_first_phase_accepted": False # Default - } - if get_user_role(current_user_performing_action) == "reviewer": - new_annotation_data["is_first_phase_accepted"] = accepted_flag - if accepted_flag: - new_annotation_data["first_phase_reviewer_username"] = current_user_performing_action - - annotation = Annotation(**new_annotation_data) + annotation = Annotation( + annotator=reviewer, + annotated_subtitle=transcript.strip(), + create_at=now, + update_at=now, + accepted=accepted if get_user_role(reviewer) == "reviewer" else False + ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) - + if absolute_idx in unsaved_changes: del unsaved_changes[absolute_idx] - - save_annotations(dataset_model) - return f"✓ Saved annotation for sample {absolute_idx}" - -def handle_second_phase_action(page_idx, idx_on_page, action: str): # action is "approved" or "rejected" - global current_page_data, CURRENT_USERNAME - - if not SECOND_PHASE: - return "Not in second phase." - if current_page_data is None or idx_on_page >= len(current_page_data): - return "Invalid index or data not loaded for current page (second phase)." - - actual_sample_info = current_page_data.iloc[idx_on_page] - absolute_idx = actual_sample_info['absolute_idx'] - - original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(CURRENT_USERNAME) - if not original_annotator_to_review: - return "You are not assigned to review any user's work." - - dataset_model = load_saved_annotations() - sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) - if not sample: - # This case should ideally not happen if data is consistent. - # If it does, it means the sample exists in source dataset but not annotations.json. - # A reviewer in 2nd phase is reviewing existing annotation. - return f"Error: Sample {absolute_idx} not found in annotations.json for review." - - # Find the annotation made by the original_annotator_to_review - annotation_to_review = next((ann for ann in sample.annotations or [] if ann.annotator == original_annotator_to_review), None) - - if not annotation_to_review: - # If original annotator did not make an annotation for this sample. - # Option 1: Create a placeholder annotation based on original_subtitle and review that. - # Option 2: Report error. For now, report error. - # This implies the first phase annotator skipped this item or it wasn't in their range correctly. - print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Reviewing original subtitle implicitly.") - # Let's create one if missing, based on original subtitle - annotation_to_review = Annotation( - annotator=original_annotator_to_review, - annotated_subtitle=sample.original_subtitle, # Use original subtitle - create_at=sample.annotations[0].create_at if sample.annotations else datetime.now(), # Approx original creation - update_at=datetime.now() - ) - sample.annotations = sample.annotations or [] - sample.annotations.append(annotation_to_review) - - - annotation_to_review.second_phase_reviewed_by = CURRENT_USERNAME - annotation_to_review.second_phase_review_status = action - annotation_to_review.second_phase_review_timestamp = datetime.now() - annotation_to_review.update_at = datetime.now() - - if action == "approved": - sample.is_approved_in_second_phase = True - # If rejected, is_approved_in_second_phase could be set to False, or depend on other conditions. - # For now, only explicit approval sets it to True. - - save_annotations(dataset_model) - return f"✓ Review ({action}) saved for sample {absolute_idx} (Original annotator: {original_annotator_to_review})" - - -def get_sample(page_idx, idx_on_page, current_user_displaying): # current_user_displaying is CURRENT_USERNAME - global current_page_data, unsaved_changes, total_samples - - if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): - return None, "", f"Invalid index. Range is 0-{len(current_page_data)-1}", "unreviewed", "white", True, False, "" - - actual_sample_info = current_page_data.iloc[idx_on_page] - absolute_idx = actual_sample_info['absolute_idx'] - - audio_entry_original = actual_sample_info["audio"] - audio_val = get_audio_path(audio_entry_original) - default_transcript = actual_sample_info["sentence"] - transcript_to_display = default_transcript - - # UI states - ui_reviewer_field = "unreviewed" # Textbox showing who annotated/reviewed - ui_color = "white" - ui_editable = True # Transcript text area - ui_is_accepted_flag = False # For first phase checkmark logic, or second phase display - ui_status_message = f"Sample {absolute_idx+1}" - if total_samples > 0: - ui_status_message += f" of {total_samples}" + save_annotations(dataset) + return f"✓ Saved annotation for sample {absolute_idx}" - dataset_model = load_saved_annotations() - sample_from_json = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) +def get_sample(page_idx, idx, current_username): + global current_page_data, unsaved_changes + if idx < 0 or idx >= len(current_page_data): + return None, "", f"Invalid index. Range is 0-{len(current_page_data)-1}", "unreviewed", "white", True, False + absolute_idx = page_idx * PAGE_SIZE + idx + audio_entry = current_page_data.iloc[idx]["audio"] + voice_name = os.path.basename(str(audio_entry)) + dataset = load_saved_annotations() + + sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) + audio_val = get_audio_path(audio_entry) + transcript = current_page_data.iloc[idx]["sentence"] + reviewer = "unreviewed" + color = "white" + editable = True + accepted = False + + # === Second phase: restrict to approve/reject only === + if second_phase: + editable = False # Disable editing transcript - if sample_from_json: - if sample_from_json.ignore_it: + if sample: + if sample.ignore_it: audio_val = None - transcript_to_display = "AUDIO DELETED (This audio has been removed.)" - ui_reviewer_field = "deleted" - ui_color = "red" - ui_editable = False - - elif SECOND_PHASE: - ui_editable = False # Transcript not editable in 2nd phase - original_annotator_being_reviewed = SECOND_PHASE_REVIEW_MAPPING.get(current_user_displaying) - - if not original_annotator_being_reviewed: # Should not happen if UI is controlled properly - transcript_to_display = "Error: User not in review mapping." - ui_color = "red" - else: - ui_reviewer_field = f"Reviewing: {original_annotator_being_reviewed}" - annotation_under_review = next((ann for ann in sample_from_json.annotations or [] if ann.annotator == original_annotator_being_reviewed), None) - - if annotation_under_review: - transcript_to_display = annotation_under_review.annotated_subtitle or default_transcript - ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved") - - if annotation_under_review.second_phase_reviewed_by: - if annotation_under_review.second_phase_reviewed_by == current_user_displaying: - ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange" # orange for rejected by current user - else: # Reviewed by someone else - ui_color = "gray" - ui_reviewer_field += f" (Reviewed by {annotation_under_review.second_phase_reviewed_by})" - else: # Pending review by current_user_displaying - ui_color = "yellow" # Indicates pending current user's review - else: # No annotation from original annotator for this sample - transcript_to_display = default_transcript # Show original dataset subtitle - ui_reviewer_field += " (Original annotator made no submission)" - ui_color = "lightgray" # Needs review, but based on original - - else: # First Phase Logic - # Check for an accepted annotation by a first-phase reviewer - accepted_first_phase_annotation = next((a for a in sample_from_json.annotations or [] if a.is_first_phase_accepted and a.first_phase_reviewer_username), None) - - if accepted_first_phase_annotation: - transcript_to_display = accepted_first_phase_annotation.annotated_subtitle or default_transcript - ui_reviewer_field = accepted_first_phase_annotation.first_phase_reviewer_username - ui_color = "green" - ui_is_accepted_flag = True - ui_editable = (get_user_role(current_user_displaying) == "reviewer") # Only 1st phase reviewer can edit accepted + transcript = "AUDIO DELETED (This audio has been removed.)" + reviewer = "deleted" + color = "red" + editable = False + else: + accepted_annotation = next((a for a in sample.annotations if a.accepted), None) + if accepted_annotation: + transcript = accepted_annotation.annotated_subtitle or transcript + reviewer = accepted_annotation.annotator + color = "green" + editable = (get_user_role(current_username) == "reviewer") and not second_phase + accepted = True else: - # Check for annotation by the current user (annotator or reviewer) - user_specific_annotation = next((a for a in sample_from_json.annotations or [] if a.annotator == current_user_displaying), None) - if user_specific_annotation: - transcript_to_display = user_specific_annotation.annotated_subtitle or default_transcript - ui_reviewer_field = user_specific_annotation.annotator - ui_color = "yellow" if absolute_idx not in unsaved_changes else "pink" - ui_editable = True + user_annotation = next((a for a in sample.annotations if a.annotator == current_username), None) + if user_annotation: + transcript = user_annotation.annotated_subtitle or transcript + reviewer = user_annotation.annotator + color = "yellow" if absolute_idx not in unsaved_changes else "pink" + editable = not second_phase else: - # Check for annotations by other annotators (not current user, not accepted by reviewer) - # Display the first one found for a reviewer to potentially act on, or inform annotator - other_annotations = [a for a in sample_from_json.annotations or [] if a.annotator != current_user_displaying and not a.is_first_phase_accepted] + other_annotations = [a for a in sample.annotations if a.annotator != current_username] if other_annotations: - # If current user is a reviewer, they see the other annotator's work - if get_user_role(current_user_displaying) == "reviewer": - other_ann_to_show = other_annotations[0] - transcript_to_display = other_ann_to_show.annotated_subtitle or default_transcript - ui_reviewer_field = other_ann_to_show.annotator - ui_color = "blue" # Reviewer sees other's work - ui_editable = True - else: # Current user is an annotator, and another annotator worked on it - # This state is a bit ambiguous. Default to original if not assigned to this user. - # For simplicity, show original if it's not their saved work. - transcript_to_display = default_transcript - ui_reviewer_field = "labeled by another annotator" - ui_color = "lightblue" - ui_editable = False # Annotator cannot edit other annotator's unreviewed work - else: # No annotations at all, or only unreviewed by others and user is annotator + if get_user_role(current_username) == "reviewer": + other_annotation = other_annotations[0] + transcript = other_annotation.annotated_subtitle or transcript + reviewer = other_annotation.annotator + else: + transcript = current_page_data.iloc[idx]["sentence"] + reviewer = "labeled by another annotator" + color = "blue" + editable = (get_user_role(current_username) == "reviewer") and not second_phase + else: if absolute_idx in unsaved_changes: - transcript_to_display = unsaved_changes[absolute_idx] - ui_reviewer_field = current_user_displaying - ui_color = "pink" - ui_editable = True - # else, default_transcript, unreviewed, white, editable=True (already set) - - # If no sample_from_json, then it's a fresh sample from dataset - # transcript_to_display remains default_transcript. ui states remain default. - # This case is hit if annotations.json doesn't have this absolute_idx yet. - - # Status message update - current_page_for_status = page_idx + 1 # page_idx is 0-indexed - # If current_page_data has 'absolute_idx', we can use that - # page_num_from_abs = (absolute_idx // PAGE_SIZE) + 1 - - ui_status_message = f"{ui_status_message} - Page {current_page_for_status}" - if SECOND_PHASE : - ui_status_message += " (Review Phase)" - else: - ui_status_message += " (Annotation Phase)" - - - return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript - - -def load_interface_data(page_idx, idx_on_page): # Renamed from load_interface to avoid conflict - # This function is primarily a wrapper around get_sample for UI updates - audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) - - # Audio backup logic (can be simplified or removed if not strictly needed for undo_trim) - # absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] if current_page_data is not None and idx_on_page < len(current_page_data) else -1 - # audio_entry_original = current_page_data.iloc[idx_on_page]["audio"] if current_page_data is not None and idx_on_page < len(current_page_data) else "" - # key = f"{absolute_idx}_{os.path.basename(str(get_audio_path(audio_entry_original) or 'unknown'))}" - # if key not in audio_backup and audio is not None: # Backup the audio playable value - # audio_backup[key] = audio - + transcript = unsaved_changes[absolute_idx] + reviewer = current_username + color = "pink" + editable = not second_phase + else: + transcript = current_page_data.iloc[idx]["sentence"] + reviewer = "unreviewed" + color = "white" + editable = not second_phase + + status = f"Sample {absolute_idx+1}" + if total_samples > 0: + status += f" of {total_samples}" + return audio_val, transcript, status, reviewer, color, editable, accepted + +def load_interface(page_idx, idx): + audio, text, base_status, saved_reviewer, color, editable, accepted = get_sample(page_idx, idx, CURRENT_USERNAME) + absolute_idx = page_idx * PAGE_SIZE + idx + audio_entry = current_page_data.iloc[idx]["audio"] + key = f"{absolute_idx}_{os.path.basename(str(audio_entry))}" + if key not in audio_backup: + audio_backup[key] = audio + status = f"{base_status} - Page {page_idx+1} - Reviewer: {saved_reviewer}" return ( - page_idx, # current_page_idx state - idx_on_page, # current_idx_on_page state - audio, # audio_player value - gr.update(value=text, interactive=editable), # transcript update - gr.update(value=saved_reviewer_text, elem_classes=[color]), # reviewer Textbox update - base_status, # status markdown update - original_dataset_text # original_transcript state + page_idx, + idx, + audio, + gr.update(value=text, interactive=editable), + gr.update(value=saved_reviewer, elem_classes=[color], interactive=False), + status, + text ) # Navigation functions -def navigate_sample(page_idx, idx_on_page, direction: int): # direction: 1 for next, -1 for prev - global current_page_data, total_samples - - if current_page_data is None or len(current_page_data) == 0: - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No data loaded.", gr.update() - - target_idx_on_page = idx_on_page + direction - - new_page_idx = page_idx - new_idx_on_page = target_idx_on_page - - if target_idx_on_page < 0: # Need to go to previous page - if page_idx > 0: - new_page_idx = page_idx - 1 - # Load new page data and set index to last item - temp_data = load_page_data(new_page_idx) - if temp_data is not None and not temp_data.empty: - new_idx_on_page = len(temp_data) - 1 - else: # Previous page is empty or out of allowed range - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No more samples in this direction (prev page).", gr.update() - else: # Already on first item of first page - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "At the beginning of your assigned samples.", gr.update() - - elif target_idx_on_page >= len(current_page_data): # Need to go to next page - new_page_idx = page_idx + 1 - temp_data = load_page_data(new_page_idx) # load_page_data updates current_page_data - if temp_data is not None and not temp_data.empty: - new_idx_on_page = 0 - else: # Next page is empty or out of allowed range - # Check if we are at the very end of the allowed samples - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - current_abs_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - if allowed_range and current_abs_idx >= allowed_range[1]: - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "At the end of your assigned samples.", gr.update() - else: - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No more samples in this direction (next page).", gr.update() - - # If we switched page, current_page_data is already updated by load_page_data. - # If staying on same page, it's fine. - return load_interface_data(new_page_idx, new_idx_on_page) - - -def go_next_sample_wrapper(page_idx, idx_on_page): # Simpler wrapper for UI - return navigate_sample(page_idx, idx_on_page, 1) - -def go_prev_sample_wrapper(page_idx, idx_on_page): # Simpler wrapper for UI - return navigate_sample(page_idx, idx_on_page, -1) - - -def save_and_next_sample_first_phase(page_idx, idx_on_page, current_text, is_accepted_by_reviewer_flag): - # Note: `current_annotator_ui` (reviewer textbox value) is not who is performing action. - # CURRENT_USERNAME is performing the action. - # `is_accepted_by_reviewer_flag` is the checkbox state (true/false) if user is a reviewer. - # If user is an annotator, this flag might not be directly applicable or always false from UI. - - # Determine if the current user is acting as a first-phase reviewer to use the 'accepted' flag - user_is_reviewer = get_user_role(CURRENT_USERNAME) == "reviewer" - save_msg = save_sample_data(page_idx, idx_on_page, current_text, CURRENT_USERNAME, - accepted_flag=is_accepted_by_reviewer_flag if user_is_reviewer else False) - print(save_msg) # Log save message - # Then navigate - return navigate_sample(page_idx, idx_on_page, 1) - - -def review_and_next_sample_second_phase(page_idx, idx_on_page, review_action: str): - feedback_msg = handle_second_phase_action(page_idx, idx_on_page, review_action) - print(feedback_msg) # Log feedback message - # Then navigate - return navigate_sample(page_idx, idx_on_page, 1) - - -def jump_to_absolute_idx(target_abs_idx_str, current_page_idx, current_idx_on_page): # Removed unused text/annotator params +def next_sample(page_idx, idx): global current_page_data - try: - target_abs_idx = int(target_abs_idx_str) - if target_abs_idx < 0: target_abs_idx = 0 - - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(target_abs_idx, allowed_range): - status_msg = f"Target index {target_abs_idx} is outside your assigned range {allowed_range}." - # Return current state with error message - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - - new_page_idx = target_abs_idx // PAGE_SIZE - new_idx_on_page_conceptual = target_abs_idx % PAGE_SIZE # This is index on the conceptual new page - - # Load data for the new page - temp_page_data = load_page_data(new_page_idx) # This updates global current_page_data - - if temp_page_data is None or temp_page_data.empty: - status_msg = f"No data found for page {new_page_idx} containing index {target_abs_idx}." - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - - # Find the actual index on the loaded page for target_abs_idx - # The loaded page might not start exactly at new_page_idx * PAGE_SIZE if user's range is small. - # `load_page_data` now adds 'absolute_idx' and 'id_within_page' to `current_page_data` - - # Find the row with the matching absolute_idx in the newly loaded current_page_data - matching_rows = current_page_data[current_page_data['absolute_idx'] == target_abs_idx] - if not matching_rows.empty: - new_idx_on_page_actual = matching_rows.index[0] # This is the DataFrame index, should be same as 'id_within_page' + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + absolute_idx = page_idx * PAGE_SIZE + idx + if is_within_range(absolute_idx + 1, allowed_range): + if idx < len(current_page_data) - 1: + new_page_idx = page_idx + new_idx = idx + 1 else: - # This means target_abs_idx, though in allowed_range, was not on the loaded page (e.g. page is sparse due to filtering) - # Fallback: load the first item of the page if target not found directly. - # Or better, report an issue. - status_msg = f"Index {target_abs_idx} is in range, but not found on page {new_page_idx}. Displaying start of page." - print(status_msg) # Log this - new_idx_on_page_actual = 0 # Default to first item of the loaded page - if current_page_data.empty : # Page is actually empty - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) # Revert to old view - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - - return load_interface_data(new_page_idx, new_idx_on_page_actual) - - except ValueError: - status_msg = "Invalid index format for jump." - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - except Exception as e: - status_msg = f"Error jumping to index: {e}" - print(status_msg) - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - + new_page_idx = page_idx + 1 + load_page_data(new_page_idx) + if len(current_page_data) > 0: + new_idx = 0 + else: + return page_idx, idx, gr.update(), gr.update(), gr.update(), "No more samples in your range.", gr.update() + else: + return page_idx, idx, gr.update(), gr.update(), gr.update(), "Next sample is outside your assigned range.", gr.update() + return load_interface(new_page_idx, new_idx) -# Audio editing functions (simplifying, assuming these are for phase 1 only) -def trim_audio_action(page_idx, idx_on_page, trim_start_str, trim_end_str): - # This function would need significant rework if used with the new get_sample returns. - # For now, let's assume it's for phase 1 and we fetch audio path differently or disable in phase 2. - # For simplicity in this modification, advanced audio ops might be limited. - if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Trimming disabled in Review Phase.", gr.Textbox() +def go_next_without_save(page_idx, idx, current_text, current_annotator, original_transcript): + global current_page_data, unsaved_changes + absolute_idx = page_idx * PAGE_SIZE + idx + if current_text.strip() != original_transcript.strip(): + unsaved_changes[absolute_idx] = current_text + audio, text, base_status, saved_reviewer, color, editable, accepted = get_sample(page_idx, idx, CURRENT_USERNAME) + status = f"{base_status} - Page {page_idx+1} - Reviewer: {saved_reviewer} [Unsaved changes]" + return ( + page_idx, + idx, + audio, + gr.update(value=text, interactive=editable), + gr.update(value=saved_reviewer, elem_classes=["pink"], interactive=False), + status, + original_transcript + ) + return next_sample(page_idx, idx) - # Simplified: fetch audio path if possible - audio_val, transcript, base_status, saved_reviewer, color, editable, accepted, _ = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) - - if not isinstance(audio_val, str) or not os.path.exists(audio_val): - # Try to get original path from current_page_data for non-raw audio - if current_page_data is not None and idx_on_page < len(current_page_data): - audio_entry = current_page_data.iloc[idx_on_page]["audio"] - resolved_path = get_audio_path(audio_entry) - if isinstance(resolved_path, str) and os.path.exists(resolved_path): - audio_val = resolved_path - else: # If it's raw audio data (tuple) or URL, or non-existent path - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Trimming not supported for this audio format or it's not a local file.", transcript +def prev_sample(page_idx, idx, current_text, current_annotator, original_transcript): + global current_page_data, unsaved_changes + absolute_idx = page_idx * PAGE_SIZE + idx + if current_text.strip() != original_transcript.strip(): + save_sample_data(page_idx, idx, current_text, CURRENT_USERNAME, False) + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if is_within_range(absolute_idx - 1, allowed_range): + if idx > 0: + new_page_idx = page_idx + new_idx = idx - 1 else: - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Audio data not available for trimming.", transcript - - - absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) - - + if page_idx > 0: + new_page_idx = page_idx - 1 + new_data = load_page_data(new_page_idx) + if len(new_data) > 0: + new_idx = len(new_data) - 1 + else: + return page_idx, idx, gr.update(), gr.update(), gr.update(), "No previous samples in your range.", gr.update() + else: + return page_idx, idx, gr.update(), gr.update(), gr.update(), "No previous samples in your range.", gr.update() + else: + return page_idx, idx, gr.update(), gr.update(), gr.update(), "Previous sample is outside your assigned range.", gr.update() + return load_interface(new_page_idx, new_idx) + +def jump_to(target_idx, page_idx, idx, current_text, current_annotator, original_transcript): + global unsaved_changes + absolute_idx = page_idx * PAGE_SIZE + idx + if current_text.strip() != original_transcript.strip(): + save_sample_data(page_idx, idx, current_text, CURRENT_USERNAME, False) try: - audio_seg = AudioSegment.from_file(audio_val) - start_ms = int(float(trim_start_str) * 1000) - end_ms = int(float(trim_end_str) * 1000) + target_idx = int(target_idx) + if target_idx < 0: + target_idx = 0 + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not is_within_range(target_idx, allowed_range): + return page_idx, idx, gr.update(), gr.update(), gr.update(), "Target index is outside your assigned range.", gr.update() + new_page_idx = target_idx // PAGE_SIZE + new_idx = target_idx % PAGE_SIZE + new_data = load_page_data(new_page_idx) + if new_idx >= len(new_data): + new_idx = len(new_data) - 1 if len(new_data) > 0 else 0 + return load_interface(new_page_idx, new_idx) + except: + return load_interface(page_idx, idx) + +def save_and_next_sample(page_idx, idx, current_text, current_annotator, original_transcript): + save_sample_data(page_idx, idx, current_text, current_annotator, False) + return next_sample(page_idx, idx) + +# Audio editing functions +def trim_audio_action(page_idx, idx, trim_start, trim_end, current_text, current_annotator, original_transcript): + audio, transcript, base_status, saved_reviewer, _, _, _ = get_sample(page_idx, idx, CURRENT_USERNAME) + absolute_idx = page_idx * PAGE_SIZE + idx + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not is_within_range(absolute_idx, allowed_range): + return page_idx, idx, audio, transcript, saved_reviewer, "Sample is outside your assigned range.", transcript + temp_audio_file = None + if isinstance(audio, tuple): + sample_rate, audio_array = audio + try: + if np.issubdtype(audio_array.dtype, np.floating): + audio_array = (audio_array * 32767).astype(np.int16) + else: + audio_array = np.array(audio_array) + temp_audio_file = os.path.join(tempfile.gettempdir(), f"temp_{page_idx}{idx}.wav") + sf.write(temp_audio_file, audio_array, sample_rate) + audio = temp_audio_file + except Exception as e: + return page_idx, idx, audio, transcript, saved_reviewer, f"Error converting raw audio: {str(e)}", transcript + if not isinstance(audio, str) or not os.path.exists(audio): + return page_idx, idx, audio, transcript, saved_reviewer, "Trimming not supported for this audio format.", transcript + try: + audio_seg = AudioSegment.from_file(audio) + start_ms = int(float(trim_start) * 1000) + end_ms = int(float(trim_end) * 1000) trimmed_seg = audio_seg[start_ms:end_ms] - os.makedirs("trimmed_audio", exist_ok=True) - trimmed_filename = f"trimmed_{absolute_idx}_{voice_name_original}" - # Ensure unique extension, wav is usually safe - if not trimmed_filename.lower().endswith(('.wav', '.mp3', '.flac')): - trimmed_filename += ".wav" - trimmed_path = os.path.join("trimmed_audio", trimmed_filename) + voice_name = os.path.basename(str(current_page_data.iloc[idx]["audio"])) + trimmed_path = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{voice_name}") + trimmed_seg.export(trimmed_path, format="wav") - # Export format might need to match original or be a standard like wav - export_format = os.path.splitext(trimmed_path)[1][1:] - if not export_format: export_format = "wav" # Default if no extension - - trimmed_seg.export(trimmed_path, format=export_format) - - dataset_model = load_saved_annotations() - sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) - if not sample: # Should exist if we are editing it - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Error: Sample not found in annotations for trimming.", transcript + dataset = load_saved_annotations() + sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) + if not sample: + sample = Sample( + id=absolute_idx, + voice_name=voice_name, + original_subtitle=current_page_data.iloc[idx]["sentence"], + annotations=[] + ) + dataset.samples = dataset.samples or [] + dataset.samples.append(sample) now = datetime.now() - # Associate trim with current user's annotation for this sample - annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) - if not annotation: # Create if doesn't exist + annotation = next((a for a in sample.annotations or [] if a.annotator == saved_reviewer), None) + if annotation: + annotation.audio_trims = [AudioTrim(start=float(trim_start), end=float(trim_end))] + annotation.update_at = now + else: annotation = Annotation( - annotator=CURRENT_USERNAME, - annotated_subtitle=transcript, # Current transcript - audio_trims=[AudioTrim(start=float(trim_start_str), end=float(trim_end_str))], + annotator=saved_reviewer, + audio_trims=[AudioTrim(start=float(trim_start), end=float(trim_end))], create_at=now, update_at=now ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) - else: - annotation.audio_trims = [AudioTrim(start=float(trim_start_str), end=float(trim_end_str))] - annotation.update_at = now - save_annotations(dataset_model) + save_annotations(dataset) new_status = f"{base_status} [Trimmed]" - return page_idx, idx_on_page, trimmed_path, transcript, saved_reviewer, new_status, transcript + return page_idx, idx, trimmed_path, transcript, saved_reviewer, new_status, transcript except Exception as e: - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, f"Error trimming audio: {str(e)}", transcript - + return page_idx, idx, audio, transcript, saved_reviewer, f"Error trimming audio: {str(e)}", transcript -def undo_trim_action(page_idx, idx_on_page): - if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Undo Trim disabled in Review Phase.", gr.Textbox() - - audio_val, transcript, base_status, saved_reviewer, color, editable, accepted, _ = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) - absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) - - dataset_model = load_saved_annotations() - sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) +def undo_trim_action(page_idx, idx, current_text, current_annotator, original_transcript): + audio, transcript, base_status, saved_reviewer, _, _, _ = get_sample(page_idx, idx, CURRENT_USERNAME) + absolute_idx = page_idx * PAGE_SIZE + idx + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not is_within_range(absolute_idx, allowed_range): + return page_idx, idx, audio, transcript, saved_reviewer, "Sample is outside your assigned range.", transcript + + voice_name = os.path.basename(str(current_page_data.iloc[idx]["audio"])) + dataset = load_saved_annotations() + sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) if sample: - annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) # Trim is user-specific + annotation = next((a for a in sample.annotations or [] if a.annotator == saved_reviewer), None) if annotation and annotation.audio_trims: annotation.audio_trims = None annotation.update_at = datetime.now() - save_annotations(dataset_model) - - # Restore original audio from backup or re-fetch from source dataset info - original_audio_path_or_data = current_page_data.iloc[idx_on_page]["audio"] # This is the source entry - restored_audio_val = get_audio_path(original_audio_path_or_data) + save_annotations(dataset) - # key = f"{absolute_idx}_{voice_name_original}" - # orig_audio_backup = audio_backup.get(key) # Fetch from backup if available - # if not orig_audio_backup: # If not in backup, use the path from current_page_data - # orig_audio_backup = get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) - + orig_audio = audio_backup.get(f"{absolute_idx}_{voice_name}", audio) new_status = f"{base_status} [Trim undone]" - return page_idx, idx_on_page, restored_audio_val, transcript, saved_reviewer, new_status, transcript - - -def confirm_delete_audio_action(page_idx, idx_on_page): - if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Delete disabled in Review Phase.", gr.Textbox() - - absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + return page_idx, idx, orig_audio, transcript, saved_reviewer, new_status, transcript - dataset_model = load_saved_annotations() - sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) +def confirm_delete_audio(page_idx, idx, current_text, current_annotator, original_transcript): + absolute_idx = page_idx * PAGE_SIZE + idx + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not is_within_range(absolute_idx, allowed_range): + return page_idx, idx, gr.update(), gr.update(), gr.update(), "Sample is outside your assigned range.", gr.update() + + voice_name = os.path.basename(str(current_page_data.iloc[idx]["audio"])) + dataset = load_saved_annotations() + sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) if not sample: sample = Sample( id=absolute_idx, - voice_name=voice_name_original, - original_subtitle=current_page_data.iloc[idx_on_page]["sentence"], + voice_name=voice_name, + original_subtitle=current_page_data.iloc[idx]["sentence"], annotations=[] ) - dataset_model.samples = dataset_model.samples or [] - dataset_model.samples.append(sample) - + dataset.samples = dataset.samples or [] + dataset.samples.append(sample) + sample.ignore_it = True now = datetime.now() - # Create/update an annotation by CURRENT_USERNAME to mark this action annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) - deleted_text_marker = "AUDIO DELETED (This audio has been removed.)" if annotation: - annotation.annotated_subtitle = deleted_text_marker - annotation.audio_trims = None # Clear trims + annotation.annotated_subtitle = "AUDIO DELETED (This audio has been removed.)" + annotation.audio_trims = None annotation.update_at = now - # Potentially clear review statuses if deletion overrides them else: annotation = Annotation( annotator=CURRENT_USERNAME, - annotated_subtitle=deleted_text_marker, + annotated_subtitle="AUDIO DELETED (This audio has been removed.)", create_at=now, update_at=now ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) - - save_annotations(dataset_model) + save_annotations(dataset) new_status = f"Sample {absolute_idx+1} [Audio deleted]" - if total_samples > 0: new_status += f" of {total_samples}" - - # Return values to update UI correctly after deletion - return page_idx, idx_on_page, None, deleted_text_marker, "deleted", new_status, deleted_text_marker - + if total_samples > 0: + new_status += f" of {total_samples}" + return page_idx, idx, None, annotation.annotated_subtitle, "deleted", new_status, annotation.annotated_subtitle -# Export functions (largely unchanged, ensure CURRENT_USERNAME context if it matters for export) +# Export functions def sanitize_string(s): - if not isinstance(s, str): s = str(s) - return re.sub(r'[^\w-./]', '_', s) + if not isinstance(s, str): + s = str(s) + return re.sub(r'[^\w\-\./]', '_', s) def sanitize_sentence(s): - if not isinstance(s, str): s = str(s) + if not isinstance(s, str): + s = str(s) return s.encode('utf-8', errors='ignore').decode('utf-8') @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) -def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token_val=None): - if not token_val: - print("Cannot push to hub: No token provided for push_to_hub_with_retry.") - return +def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token=None): print(f"Pushing dataset to {repo_id}") - dataset_dict.push_to_hub(repo_id, private=private, token=token_val) + dataset_dict.push_to_hub(repo_id, private=private, token=token) -def export_to_huggingface(repo_name_str, hf_token_for_export, progress=gr.Progress()): - # This export logic needs to be carefully reviewed. - # It rebuilds a dataset from HF_DATASET_NAME and applies annotations. - # It should reflect the FINAL state of annotations (e.g., after second phase review if applicable). - # The current logic uses CURRENT_USERNAME for annotation preference, which might not be ideal for a global export. - # It should ideally use the "winning" annotation (e.g., accepted by reviewer, or approved in 2nd phase). - if not hf_token_for_export: - return "Export failed: Hugging Face token is missing." +def export_to_huggingface(repo_name, token, progress=gr.Progress()): try: start_time = time.time() - repo_name_str = sanitize_string(repo_name_str) + repo_name = sanitize_string(repo_name) print(f"Export started at {time.strftime('%Y-%m-%d %H:%M:%S')}") + dataset = load_saved_annotations() + total_samples_export = get_dataset_info().get('num_samples', -1) + if total_samples_export <= 0: + total_samples_export = 1500 + chunk_size = 100 + num_chunks = (total_samples_export + chunk_size - 1) // chunk_size + print(f"Total samples: {total_samples_export}, chunks: {num_chunks}") + progress(0, f"Total samples: {total_samples_export}, chunks: {num_chunks}") - dataset_model_annotations = load_saved_annotations() # Load all annotations - - # Use total_samples from global or re-fetch if necessary. - # The export should process all samples defined by total_samples. - # Let's assume total_samples is the definitive count. - if total_samples <= 0: - return "Export failed: Total number of samples is unknown or invalid." - - # export_total_samples = total_samples - # Using streaming for source, but collecting all data. This can be memory intensive. - # Consider processing in true streaming fashion if dataset is very large. - - ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False) # Load non-streaming for easier iteration up to total_samples - - exported_data_list = [] - progress(0, f"Preparing {total_samples} samples for export...") - - for i, source_sample in enumerate(ds_source): - if i >= total_samples: break # Limit to known total_samples - - absolute_idx = i # Assuming source_sample is ordered and corresponds to index i - - audio_entry = source_sample.get("audio") - sentence_val = source_sample.get("sentence", "") # Default original sentence - - # Determine final audio and sentence based on annotations - audio_dict_to_export = None # Default to no audio if deleted or issue + with tempfile.TemporaryDirectory() as temp_dir: + ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) + ds_iter = iter(ds) + all_datasets = [] + processed_samples = 0 - # Convert audio path/data from source_sample to array for export - # This part is tricky: we need to load audio content. - # For simplicity, this example will re-use get_audio_path and then load if it's a path. - raw_audio_data = None - audio_path_or_data = get_audio_path(audio_entry) - if isinstance(audio_path_or_data, tuple): # Raw audio from get_audio_path - raw_audio_data = {"array": audio_path_or_data[1], "sampling_rate": audio_path_or_data[0]} - elif isinstance(audio_path_or_data, str) and (os.path.exists(audio_path_or_data) or audio_path_or_data.startswith("http")): - # If it's a path, load it. This might be slow. - # For URLs, datasets library handles loading when building Dataset object. - # For local paths, we need to load into array. - if os.path.exists(audio_path_or_data): + for chunk_idx in range(num_chunks): + chunk_data = [] + for _ in range(chunk_size): try: - arr, sr = sf.read(audio_path_or_data) - raw_audio_data = {"array": arr, "sampling_rate": sr} - except Exception as e_load: - print(f"Warning: Could not load audio file {audio_path_or_data} for export: {e_load}") - # raw_audio_data remains None - else: # URL - raw_audio_data = audio_path_or_data # Pass URL directly, Audio feature will handle - - audio_dict_to_export = raw_audio_data - - - # Check annotations for this sample - annotation_data = next((s for s in dataset_model_annotations.samples or [] if s.id == absolute_idx), None) - - if annotation_data: - if annotation_data.ignore_it: - sentence_val = "AUDIO DELETED (This audio has been removed.)" - audio_dict_to_export = None # No audio - else: - # Determine the "best" annotation to use - # Priority: 1. Approved in 2nd phase, 2. Accepted in 1st phase by reviewer, 3. Annotator's latest - best_ann = None - if annotation_data.annotations: - # Check for 2nd phase approved - # This needs to find the annotation that WAS approved, not make a new one. - # The original annotator's submission that got approved. - if annotation_data.is_approved_in_second_phase: - # Find which annotation was approved. Iterate through them. - for ann in annotation_data.annotations: - if ann.second_phase_review_status == "approved": - best_ann = ann - break + sample = next(ds_iter) + absolute_idx = chunk_idx * chunk_size + len(chunk_data) + audio_entry = sample["audio"] + audio_dict = None + audio_key = None - if not best_ann: # Check for 1st phase accepted - for ann in annotation_data.annotations: - if ann.is_first_phase_accepted: - best_ann = ann - break + if isinstance(audio_entry, dict) and "array" in audio_entry: + if not np.all(np.isfinite(audio_entry["array"])): + audio_dict = None + else: + audio_dict = { + "array": audio_entry["array"], + "sampling_rate": audio_entry["sampling_rate"] + } + audio_key = audio_entry.get("path", f"sample_{absolute_idx}.mp3") + audio_key = sanitize_string(audio_key) + elif isinstance(audio_entry, str): + if audio_entry.startswith("http://") or audio_entry.startswith("https://"): + audio_dict = None + audio_key = sanitize_string(audio_entry) + else: + resolved_path = get_audio_path(audio_entry) + if os.path.exists(resolved_path): + try: + audio_array, sample_rate = sf.read(resolved_path) + if not np.all(np.isfinite(audio_array)): + audio_dict = None + else: + audio_dict = { + "array": audio_array, + "sampling_rate": sample_rate + } + audio_key = sanitize_string(resolved_path) + except: + audio_dict = None + audio_key = sanitize_string(resolved_path) + else: + audio_dict = None + audio_key = sanitize_string(resolved_path) + else: + audio_dict = None + audio_key = sanitize_string(str(audio_entry)) - if not best_ann: # Fallback to any annotation (e.g., latest by timestamp or first found) - # This could be more sophisticated, e.g. latest updated. - # For now, take first one if multiple non-reviewed/accepted exist. - # Or, if a specific user's annotations are primary (e.g. CURRENT_USERNAME if this is a personal export) - # Let's assume any relevant annotation is fine if not formally accepted/approved. - # The original code used CURRENT_USERNAME's annotation. This might be too specific for a general export. - # Let's try to find *any* annotation from the list for the sample if no "accepted" one exists. - if annotation_data.annotations: - best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0] # latest - - if best_ann: - sentence_val = best_ann.annotated_subtitle or sentence_val # Use annotated if available - # Handle trimmed audio if specified in best_ann - if best_ann.audio_trims and audio_dict_to_export: # Only if audio exists - # This part requires that trimmed audio files are accessible and named consistently - # The original trim_audio_action saves to "trimmed_audio/trimmed_{abs_idx}_{voice_name}" - # We need to reconstruct this path or have a direct reference. - # Assuming voice_name is from original sample. - original_voice_name = sanitize_string(os.path.basename(str(get_audio_path(audio_entry) or f"sample_{absolute_idx}"))) - trimmed_path_potential = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{original_voice_name}") - # Ensure extension consistency for look up - if not os.path.splitext(trimmed_path_potential)[1]: trimmed_path_potential += ".wav" # common default - - if os.path.exists(trimmed_path_potential): - try: - arr, sr = sf.read(trimmed_path_potential) - audio_dict_to_export = {"array": arr, "sampling_rate": sr} - except Exception as e_trim_load: - print(f"Warning: Could not load trimmed audio {trimmed_path_potential}: {e_trim_load}") - # audio_dict_to_export remains as original loaded audio - # else: print(f"Trimmed audio path not found: {trimmed_path_potential}") - - exported_data_list.append({ - "audio": audio_dict_to_export, # This will be None if deleted or failed to load - "sentence": sanitize_sentence(sentence_val) - }) + sentence = sample.get("sentence", "") + sample_data = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) + if sample_data: + if sample_data.ignore_it: + audio_dict = None + sentence = "AUDIO DELETED (This audio has been removed.)" + elif sample_data.annotations: + annotation = next((a for a in sample_data.annotations if a.accepted), None) or \ + next((a for a in sample_data.annotations if a.annotator == CURRENT_USERNAME), None) + if annotation and annotation.annotated_subtitle: + sentence = annotation.annotated_subtitle + if annotation.audio_trims: + trimmed_path = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{audio_key}") + if os.path.exists(trimmed_path): + audio_array, sample_rate = sf.read(trimmed_path) + audio_dict = { + "array": audio_array, + "sampling_rate": sample_rate + } + + chunk_data.append({ + "audio": audio_dict, + "sentence": sanitize_sentence(sentence) + }) + + gc.collect() + except StopIteration: + break + + if chunk_data: + chunk_dataset = Dataset.from_list(chunk_data) + chunk_dataset = chunk_dataset.cast_column("audio", Audio()) + chunk_path = os.path.join(temp_dir, f"chunk_{chunk_idx}.parquet") + chunk_dataset.to_parquet(chunk_path) + all_datasets.append(chunk_path) + processed_samples += len(chunk_data) + progress(processed_samples / total_samples_export, f"Processed {processed_samples}/{total_samples_export}") + + del chunk_data + gc.collect() - if (i + 1) % 100 == 0: # Progress update - progress((i + 1) / total_samples, f"Processed {i+1}/{total_samples} samples") - gc.collect() - - if not exported_data_list: - return "No data to export after processing." - - # Create Hugging Face Dataset from the collected data - # Filter out entries where audio is None if dataset schema requires audio - # final_export_list = [item for item in exported_data_list if item["audio"] is not None] - # Or handle audio being optional by schema. For Audio(), None might not be allowed if array is mandatory. - # Let's assume for now audio can be None (e.g. deleted). If Audio() cast fails, this needs adjustment. - # The Audio feature expects a path, dict with array/sr, or bytes. None might lead to issues. - # Handling: if audio_dict_to_export is None, replace with a dummy silent audio array or skip sample. - # For now, let's try passing None and see if cast_column handles it gracefully or errors. - # It's safer to ensure 'audio' is always a valid Audio structure or path. - # If audio is None (e.g. ignore_it=True), we should ensure the Audio feature can handle it. - # Typically, you might replace with a path to a very short silent audio file, or an empty array if supported. - - for item in exported_data_list: - if item["audio"] is None: # If audio was marked for deletion / ignore_it - # Provide a placeholder that Audio() can cast, e.g. path to a tiny silent wav or empty array - # For simplicity, if datasets lib allows None for audio feature, this is fine. - # Otherwise, this needs a robust placeholder. - # A common practice is to provide a dictionary with a path to a universally accessible silent file, - # or an empty numpy array for 'array' and a common 'sampling_rate'. - # Let's try with an empty array. - item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Example placeholder - elif isinstance(item["audio"], str): # If it's a URL or path string - # The Audio feature will handle loading this. - pass - elif not (isinstance(item["audio"], dict) and "array" in item["audio"] and "sampling_rate" in item["audio"]): - print(f"Warning: Invalid audio format for export for a sample, replacing with silent audio: {item['audio']}") - item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} - - - final_dataset = Dataset.from_list(exported_data_list) - final_dataset = final_dataset.cast_column("audio", Audio()) # Cast to Audio feature type - - dataset_dict_export = DatasetDict({"train": final_dataset}) - - progress(0.95, "Uploading to Hugging Face...") - push_to_hub_with_retry( - dataset_dict=dataset_dict_export, - repo_id=repo_name_str, - private=True, # Assuming private, can be a parameter - token_val=hf_token_for_export - ) - print(f"Upload done, total time: {time.time() - start_time:.2f}s") - progress(1.0, "Upload complete!") - return f"Exported to huggingface.co/datasets/{repo_name_str}" - + if all_datasets: + combined_dataset = Dataset.from_parquet([p for p in all_datasets]) + dataset_dict = DatasetDict({"train": combined_dataset}) + progress(0.95, "Uploading to Hugging Face...") + push_to_hub_with_retry( + dataset_dict=dataset_dict, + repo_id=repo_name, + private=True, + token=token + ) + print(f"Upload done, total time: {time.time() - start_time:.2f}s") + progress(1.0, "Upload complete!") + return f"Exported to huggingface.co/datasets/{repo_name}" + else: + return "No data to export." except Exception as e: error_msg = f"Export failed: {str(e)}" - import traceback - print(f"{error_msg}\n{traceback.format_exc()}") + print(error_msg) return error_msg - # Login function -def hf_login(hf_token_val): - global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges - - if not hf_token_val: # If user clears the box and clicks login - return gr.update(visible=True), gr.update(visible=False), "", "", "Login failed: Token cannot be empty." - +def hf_login(hf_token): + global CURRENT_USERNAME try: - user_info = whoami(token=hf_token_val) - username = user_info['name'] - + username = whoami(token=hf_token)['name'] if username in ALLOWED_USERS: CURRENT_USERNAME = username - token = hf_token_val # Store the validated token globally for other HF ops - - # Initialize/re-initialize dataset info and ranges based on logged-in user - # This ensures that if total_samples was not fetched, it's attempted again. - ds_info = get_dataset_info() # Sets global total_samples - if total_samples > 0: - annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - if SECOND_PHASE: - initialize_second_phase_assignments() # Depends on ANNOTATORS and their ranges - else: - # Handle case where total_samples is still unknown (critical for ranges) - return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, "Login successful, but failed to get dataset size. Cannot proceed." - - - # Load initial page data for this user - current_page_data = load_page_data(0) # page 0 for the current user - - # Determine initial UI state based on SECOND_PHASE - is_second_phase_active = SECOND_PHASE - - # Update visibility of components based on phase - updates = { - # Phase 1 components - "save_next_button_vis": not is_second_phase_active, - "transcript_interactive": not is_second_phase_active, - "trim_button_vis": not is_second_phase_active, - "undo_trim_button_vis": not is_second_phase_active, - "delete_button_vis": not is_second_phase_active, - "first_phase_accept_cb_vis": (not is_second_phase_active and get_user_role(CURRENT_USERNAME) == "reviewer"), - # Phase 2 components - "approve_button_vis": is_second_phase_active, - "reject_button_vis": is_second_phase_active, - } - - initial_load = load_interface_data(0, 0) # Load data for the first sample (page 0, index 0 on page) - - # Return tuple for outputs matching login_button.click() - # login_container, main_container, reviewer_textbox (as initial state), hf_token_state, login_message, - # then all the visibility/interactivity updates - return ( - gr.update(visible=False), # login_container - gr.update(visible=True), # main_container - initial_load[4], # reviewer_textbox gr.update object (initial_load[4] is reviewer text gr.update) - hf_token_val, # hf_token_state - f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if SECOND_PHASE else 'Annotation'}.", # login_message - - # UI component updates based on phase - gr.update(visible=updates["save_next_button_vis"]), - gr.update(interactive=updates["transcript_interactive"]), # This is for transcript Textarea - gr.update(visible=updates["trim_button_vis"]), - gr.update(visible=updates["undo_trim_button_vis"]), - gr.update(visible=updates["delete_button_vis"]), - gr.update(visible=updates["first_phase_accept_cb_vis"]), - gr.update(visible=updates["approve_button_vis"]), - gr.update(visible=updates["reject_button_vis"]), - - # Initial data for the interface elements from load_interface_data - initial_load[0], # page_idx_state - initial_load[1], # idx_on_page_state - initial_load[2], # audio_player - initial_load[3], # transcript (already includes interactivity) - # initial_load[4] is reviewer, already used above for initial value - initial_load[5], # status_md - initial_load[6], # original_transcript_state - ) - + load_page_data(0) + return gr.update(visible=False), gr.update(visible=True), username, hf_token, "Login successful!" else: - CURRENT_USERNAME = None - return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, "User not authorized!" + return gr.update(visible=True), gr.update(visible=False), "", hf_token, "User not authorized!" except Exception as e: - CURRENT_USERNAME = None - import traceback - print(f"Login failed: {str(e)}\n{traceback.format_exc()}") - return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, f"Login failed: {str(e)}" - - -# Set initial values for UI elements before login (mostly empty or default) -init_page_idx = 0 -init_idx_on_page = 0 -init_audio_val = None -init_transcript_val = gr.update(value="", interactive=False) # Non-interactive before login -init_reviewer_val = gr.update(value="N/A", interactive=False) -init_status_val = "Please log in." -init_original_text_val = "" + return gr.update(visible=True), gr.update(visible=False), "", hf_token, f"Login failed: {str(e)}" + +# Add a Gradio State for second_phase +def is_reviewer(): + return CURRENT_USERNAME in REVIEWERS + +# === Second Phase Toggle === +second_phase = False # Set to True to activate second phase + +# === Assign review splits for second phase === +def get_second_phase_ranges(annotator_ranges): + # Swap each annotator's range with the next annotator (cyclic) + annotators = list(annotator_ranges.keys()) + if len(annotators) < 2: + return annotator_ranges # Nothing to swap + swapped = {} + for i, annotator in enumerate(annotators): + next_annotator = annotators[(i + 1) % len(annotators)] + swapped[annotator] = annotator_ranges[next_annotator] + return swapped + +# Set initial values +if len(current_page_data) > 0: + init_values = load_interface(0, 0) +else: + init_values = (0, 0, None, gr.update(value="", interactive=True), gr.update(value="unreviewed", elem_classes=["white"], interactive=False), "No data available.", "") # Gradio Interface css = """ -.white { background-color: white; color: black; } -.yellow { background-color: yellow; color: black; } -.blue { background-color: lightblue; color: black; } /* Adjusted for readability */ -.green { background-color: lightgreen; color: black; } /* Adjusted for readability */ -.pink { background-color: pink; color: black; } -.red { background-color: #FF7F7F; color: black; } /* Softer red */ -.orange { background-color: orange; color: black; } -.gray { background-color: lightgray; color: black; } -.lightgray { background-color: #f0f0f0; color: black; } /* For very subtle states */ -.reviewer-textbox input { text-align: center; font-weight: bold; } +.white { background-color: white; } +.yellow { background-color: yellow; } +.blue { background-color: blue; color: white; } +.green { background-color: green; color: white; } +.pink { background-color: pink; } +.red { background-color: red; color: white; } """ -with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: - hf_token_state = gr.State(token) # Store token for export or other uses - - # UI States for navigation and data - current_page_idx_state = gr.State(init_page_idx) - current_idx_on_page_state = gr.State(init_idx_on_page) - original_transcript_state = gr.State(init_original_text_val) # Stores original subtitle from dataset for current item - +with gr.Blocks(css=css, title="ASR Dataset Labeling with HF Authentication") as demo: + hf_token_state = gr.State("") + with gr.Column(visible=True, elem_id="login_container") as login_container: - gr.Markdown("## HF Authentication\nPlease enter your Hugging Face token (read & write permissions).") - hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", placeholder="Enter your HF token", value=token or "") + gr.Markdown("## HF Authentication\nPlease enter your Hugging Face token to proceed.") + hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", placeholder="Enter your HF token") login_button = gr.Button("Login") login_message = gr.Markdown("") - + with gr.Column(visible=False, elem_id="main_container") as main_container: gr.Markdown("# ASR Dataset Labeling Interface") - status_md = gr.Markdown(init_status_val) # For status messages like "Sample x of y" + gr.Markdown("Listen to audio and edit transcriptions. Changes are saved via the Save & Next button.") + with gr.Row(): + current_page_idx = gr.State(value=init_values[0]) + current_idx = gr.State(value=init_values[1]) + original_transcript = gr.State(value=init_values[6]) + with gr.Column(): + audio_player = gr.Audio(value=init_values[2], label="Audio", autoplay=True) + transcript = gr.TextArea(value=init_values[3], label="Transcript", lines=5, placeholder="Edit transcript here...", interactive=not second_phase) + reviewer = gr.Textbox(value=init_values[4], label="Reviewer", placeholder="Reviewer (auto-filled)", interactive=False, elem_classes=["white"]) + status = gr.Markdown(value=init_values[5]) with gr.Row(): - with gr.Column(scale=2): - audio_player = gr.Audio(value=init_audio_val, label="Audio Sample", autoplay=False) # Autoplay off initially - transcript_tb = gr.TextArea(value=init_transcript_val['value'], label="Transcript", lines=5, - interactive=init_transcript_val.get('interactive', False)) - reviewer_tb = gr.Textbox(value=init_reviewer_val['value'], label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"]) - - with gr.Column(scale=1): - gr.Markdown("### Navigation") - prev_button = gr.Button("← Previous") - next_button = gr.Button("Next (no save)") # For first phase - - # Phase 1 Buttons - save_next_button = gr.Button("Save & Next", variant="primary", visible=not SECOND_PHASE) - first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=(not SECOND_PHASE and CURRENT_USERNAME in REVIEWERS if CURRENT_USERNAME else False)) - - # Phase 2 Buttons - approve_button = gr.Button("Approve & Next", variant="primary", visible=SECOND_PHASE) - reject_button = gr.Button("Reject & Next", variant="stop", visible=SECOND_PHASE) - - gr.Markdown("### Audio Tools (Phase 1 only)") - with gr.Row(): - trim_start_tb = gr.Textbox(label="Trim Start (s)", placeholder="e.g., 1.5", scale=1) - trim_end_tb = gr.Textbox(label="Trim End (s)", placeholder="e.g., 3.0", scale=1) - trim_button = gr.Button("Trim Audio", visible=not SECOND_PHASE) - undo_trim_button = gr.Button("Undo Trim", visible=not SECOND_PHASE) - delete_button = gr.Button("Mark Audio as Deleted", variant="stop", visible=not SECOND_PHASE) - # Confirm/Cancel for delete are managed dynamically - - with gr.Accordion("Advanced Navigation & Export", open=False): - with gr.Row(): - jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter index number") - jump_button = gr.Button("Jump") - with gr.Row(): - hf_repo_name_tb = gr.Textbox(label="Export Repository Name (username/dataset-name)", - placeholder=f"{CURRENT_USERNAME}/my-annotated-dataset" if CURRENT_USERNAME else "your-username/asr-dataset") - hf_export_button = gr.Button("Export to Hugging Face", variant="primary") - hf_export_status_md = gr.Markdown("") - - # Define outputs for login_button carefully, matching the hf_login function's return tuple - login_outputs = [ - login_container, main_container, reviewer_tb, hf_token_state, login_message, - # Visibility/interactivity updates - save_next_button, transcript_tb, trim_button, undo_trim_button, delete_button, - first_phase_accept_cb, approve_button, reject_button, - # Initial data load updates - current_page_idx_state, current_idx_on_page_state, audio_player, transcript_tb, # transcript_tb updated twice, once for interactivity, once for value - status_md, original_transcript_state - ] - # Need to ensure transcript_tb gets value update from initial_load too. - # hf_login returns initial_load[3] which is gr.update(value=text, interactive=editable) for transcript. - # So, one update to transcript_tb should be sufficient if it carries both value and interactivity. + # Second phase: Approve/Reject buttons + approve_button = gr.Button("Approve", variant="primary", visible=second_phase) + reject_button = gr.Button("Reject", variant="stop", visible=second_phase) + prev_button = gr.Button("← Previous", visible=not second_phase) + save_next_button = gr.Button("Save & Next", variant="primary", visible=not second_phase) + next_button = gr.Button("Next", visible=not second_phase) + + with gr.Row(): + trim_start = gr.Textbox(label="Trim Start (seconds)", placeholder="e.g., 1.5") + trim_end = gr.Textbox(label="Trim End (seconds)", placeholder="e.g., 3.0") + trim_button = gr.Button("Trim Audio", variant="primary") + undo_trim_button = gr.Button("Undo Trim") + + with gr.Row(): + delete_button = gr.Button("Delete Audio", variant="stop") + with gr.Row(): + confirm_delete_button = gr.Button("Confirm Delete", visible=False) + cancel_delete_button = gr.Button("Cancel Delete", visible=False) + + with gr.Row(): + jump_text = gr.Textbox(label="Jump to Global Index", placeholder="Enter index number") + jump_button = gr.Button("Jump") + with gr.Row(): + hf_repo_name = gr.Textbox(label="Repository Name (username/dataset-name)", placeholder="e.g., your-username/asr-dataset") + with gr.Row(): + hf_export_button = gr.Button("Export to Hugging Face", variant="primary") + hf_export_status = gr.Markdown("") + + # Event handlers + def update_phase_ui(is_second_phase): + # Hide annotation controls, show approve/reject in second phase + return ( + gr.update(visible=not is_second_phase), + gr.update(visible=not is_second_phase), + gr.update(visible=not is_second_phase), + gr.update(visible=is_second_phase), + gr.update(visible=is_second_phase) + ) + second_phase_checkbox.change( + fn=update_phase_ui, + inputs=[second_phase_checkbox], + outputs=[prev_button, save_next_button, next_button, approve_button, reject_button] + ) + if not second_phase: + save_next_button.click( + fn=save_and_next_sample, + inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + next_button.click( + fn=go_next_without_save, + inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + prev_button.click( + fn=prev_sample, + inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + else: + approve_button.click( + fn=approve_sample, + inputs=[current_page_idx, current_idx, reviewer], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + reject_button.click( + fn=reject_sample, + inputs=[current_page_idx, current_idx, reviewer], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + jump_button.click( + fn=jump_to, + inputs=[jump_text, current_page_idx, current_idx, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + trim_button.click( + fn=trim_audio_action, + inputs=[current_page_idx, current_idx, trim_start, trim_end, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + undo_trim_button.click( + fn=undo_trim_action, + inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + delete_button.click( + fn=lambda: (gr.update(visible=True), gr.update(visible=True)), + inputs=None, + outputs=[confirm_delete_button, cancel_delete_button] + ) + confirm_delete_button.click( + fn=confirm_delete_audio, + inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], + outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] + ) + confirm_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=confirm_delete_button) + confirm_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=cancel_delete_button) + cancel_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=confirm_delete_button) + cancel_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=cancel_delete_button) + + hf_export_button.click(fn=export_to_huggingface, inputs=[hf_repo_name, hf_token_state], outputs=[hf_export_status], queue=False) + login_button.click( fn=hf_login, inputs=[hf_token_input], - outputs=login_outputs - ) - - # Common outputs for navigation and actions - navigation_outputs = [ - current_page_idx_state, current_idx_on_page_state, - audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state - ] - - # Phase 1 actions - save_next_button.click( - fn=save_and_next_sample_first_phase, - inputs=[current_page_idx_state, current_idx_on_page_state, transcript_tb, first_phase_accept_cb], - outputs=navigation_outputs - ) - # 'Next (no save)' button (only for Phase 1) - next_button.click( - fn=go_next_sample_wrapper, # This simple nav doesn't save unsaved changes. User should be aware. - inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs - ).then( # Add a small JS to clear unsaved changes marker if any (conceptual) - None, None, None, _js="() => { /* Clear unsaved visual cues if any */ }" - ) - - prev_button.click( - fn=go_prev_sample_wrapper, # Similarly, does not auto-save. - inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs - ) - - # Phase 2 actions - approve_button.click( - fn=review_and_next_sample_second_phase, - inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("approved")], # Pass action string - outputs=navigation_outputs - ) - reject_button.click( - fn=review_and_next_sample_second_phase, - inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("rejected")], # Pass action string - outputs=navigation_outputs + outputs=[login_container, main_container, reviewer, hf_token_state, login_message] ) - # Audio tools (Phase 1) - trim_button.click( - fn=trim_audio_action, - inputs=[current_page_idx_state, current_idx_on_page_state, trim_start_tb, trim_end_tb], - outputs=navigation_outputs # Outputs audio_player, status_md primarily - ) - undo_trim_button.click( - fn=undo_trim_action, - inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs - ) - delete_button.click( # This will be a confirmable action - fn=confirm_delete_audio_action, # Direct action for simplicity, could add confirmation dialog - inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs - ) - - # Jump and Export - jump_button.click( - fn=jump_to_absolute_idx, - inputs=[jump_text_tb, current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs - ) - hf_export_button.click( - fn=export_to_huggingface, - inputs=[hf_repo_name_tb, hf_token_state], - outputs=[hf_export_status_md], - queue=True # Export can be long - ) - -# Launch the interface -if __name__ == "__main__": - # For testing, you might want to set SECOND_PHASE here or via environment variable - # Example: os.environ.get("APP_SECOND_PHASE", "False").lower() == "true" - # SECOND_PHASE = True # Force second phase for testing - if SECOND_PHASE: - print("==== APPLICATION RUNNING IN SECOND PHASE (REVIEW MODE) ====") - else: - print("==== APPLICATION RUNNING IN FIRST PHASE (ANNOTATION MODE) ====") - - demo.queue().launch(debug=True, share=False) # Share=True for ngrok link if needed \ No newline at end of file +demo.launch()