diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -15,26 +15,31 @@ from tenacity import retry, stop_after_attempt, wait_exponential import re import numpy as np from pydantic import BaseModel -from typing import Optional, List +from typing import Optional, List, Tuple from datetime import datetime # Log in with Hugging Face token token = os.getenv("hf_token") -login(token) +if token: + login(token) +else: + print("Warning: hf_token environment variable not set. Hugging Face Hub operations might fail.") # Configuration HF_DATASET_NAME = "navidved/channelb-raw-data" AUDIO_DIR = "audio" SAVE_PATH = "annotations.json" -ALLOWED_USERS = ["vargha", "navidved"] -REVIEWERS = ["vargha"] -ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] +ALLOWED_USERS = ["vargha", "navidved", "userC"] # Added userC for testing 2nd phase with >1 annotator +REVIEWERS = ["vargha"] # First phase reviewers +ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] # First phase annotators CURRENT_USERNAME = None -PAGE_SIZE = 100 +PAGE_SIZE = 100 # Kept for pagination logic, though review might be sample by sample SAVE_INTERVAL = 10 -# Future: Define CHUNK_SIZE for splitting dataset into sets assigned to two annotators -CHUNK_SIZE = 100 -SECOND_PHASE = False # Toggle for second phase + +# --- SECOND PHASE CONFIGURATION --- +SECOND_PHASE = False # Set to True to activate second phase review +SECOND_PHASE_REVIEW_MAPPING = {} # Populated if SECOND_PHASE is True. Maps: reviewer_username -> original_annotator_username +# Example: {"navidved": "userC"} means navidved reviews userC's work # Global state variables current_page = 0 @@ -42,11 +47,9 @@ ds_iter = None current_page_data = None audio_backup = {} annotation_count = 0 -unsaved_changes = {} +unsaved_changes = {} # Primarily for first phase total_samples = 0 -annotator_ranges = None -SPLITS_PER_USER = {} # Maps users to their assigned splits for review -SPLIT_ASSIGNMENTS = {} # Maps splits to their assigned reviewers +annotator_ranges = {} # Stores {annotator_username: (start_idx, end_idx)} for first phase # Pydantic data models class AudioTrim(BaseModel): @@ -54,14 +57,21 @@ class AudioTrim(BaseModel): end: float class Annotation(BaseModel): - annotator: str + annotator: str # Original annotator (first phase) annotated_subtitle: Optional[str] = None audio_trims: Optional[List[AudioTrim]] = None - accepted: bool = False + + # First phase review fields + is_first_phase_accepted: bool = False + first_phase_reviewer_username: Optional[str] = None + + # Second phase review fields + second_phase_reviewed_by: Optional[str] = None + second_phase_review_status: Optional[str] = None # "approved" or "rejected" + second_phase_review_timestamp: Optional[datetime] = None + create_at: datetime update_at: datetime - # Future: Add feedback field for approved/rejected - # feedback: Optional[str] = None # "approved" or "rejected" class Sample(BaseModel): id: int @@ -70,26 +80,27 @@ class Sample(BaseModel): ignore_it: bool = False description: Optional[str] = None annotations: Optional[List[Annotation]] = None - # Future: Add accepted field at Sample level - # accepted: bool = False # True if both assigned annotators approve + is_approved_in_second_phase: bool = False # True if the primary annotation is approved in 2nd phase -class Dataset(BaseModel): +class DatasetModel(BaseModel): # Renamed to avoid conflict with datasets.Dataset samples: Optional[List[Sample]] = None # Utility functions def load_saved_annotations(): - dataset = None + dataset_model = None if os.path.exists(SAVE_PATH): try: with open(SAVE_PATH, "r", encoding="utf-8") as f: data = json.load(f) - dataset = Dataset(**data) + dataset_model = DatasetModel(**data) print("Loaded annotations from local JSON file") except Exception as e: print(f"Error loading local JSON file: {str(e)}. Removing invalid file.") - os.remove(SAVE_PATH) - - if dataset is None: + # os.remove(SAVE_PATH) # Be cautious with auto-removing + dataset_model = None + + + if dataset_model is None and token: try: hf_path = hf_hub_download( repo_id=HF_DATASET_NAME, @@ -99,32 +110,37 @@ def load_saved_annotations(): ) with open(hf_path, "r", encoding="utf-8") as f: data = json.load(f) - dataset = Dataset(**data) + dataset_model = DatasetModel(**data) + # Cache it locally with open(SAVE_PATH, "w", encoding="utf-8") as f: - f.write(dataset.model_dump_json(exclude_none=True, indent=4)) + f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) print("Loaded annotations from HF dataset repository and cached locally") except Exception as e: print(f"Error loading JSON file from HF repo: {str(e)}") - - if dataset is None: - dataset = Dataset(samples=[]) - print("Created new empty Dataset for annotations") - - return dataset + dataset_model = None -def save_annotations(dataset: Dataset): + if dataset_model is None: + dataset_model = DatasetModel(samples=[]) + print("Created new empty DatasetModel for annotations") + + return dataset_model + +def save_annotations(dataset_model: DatasetModel): global annotation_count try: with open(SAVE_PATH, "w", encoding="utf-8") as f: - f.write(dataset.model_dump_json(exclude_none=True, indent=4)) + f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) print(f"Saved annotations to {SAVE_PATH}") annotation_count += 1 - if annotation_count % SAVE_INTERVAL == 0: + if annotation_count % SAVE_INTERVAL == 0 and token: push_json_to_hf() except Exception as e: print(f"Error saving annotations: {str(e)}") def push_json_to_hf(): + if not token: + print("Cannot push to HF: token not available.") + return try: api = HfApi() api.upload_file( @@ -138,861 +154,1258 @@ def push_json_to_hf(): except Exception as e: print(f"Error uploading JSON file: {str(e)}") -def calculate_annotator_ranges(total_samples, annotators): - num_annotators = len(annotators) - if num_annotators == 0: +def calculate_annotator_ranges(total_samples_val, annotators_list): + num_annotators = len(annotators_list) + if num_annotators == 0 or total_samples_val <= 0: return {} - samples_per_annotator = total_samples // num_annotators - extra_samples = total_samples % num_annotators + + samples_per_annotator = total_samples_val // num_annotators + extra_samples = total_samples_val % num_annotators ranges = {} start = 0 - for i, annotator in enumerate(annotators): + for i, annotator in enumerate(annotators_list): end = start + samples_per_annotator - 1 if i < extra_samples: end += 1 - ranges[annotator] = (start, end) + if end >= total_samples_val: # Ensure end does not exceed total_samples + end = total_samples_val -1 + if start <= end : # Ensure start is not greater than end + ranges[annotator] = (start, end) start = end + 1 return ranges -def assign_splits_for_second_phase(): - global SPLITS_PER_USER, SPLIT_ASSIGNMENTS - if not SECOND_PHASE: +def initialize_second_phase_assignments(): + global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges + if not ANNOTATORS or len(ANNOTATORS) < 1: # Requires at least 1 annotator to review their own work, or 2 for cross-review + print("Not enough annotators for second phase review.") + SECOND_PHASE_REVIEW_MAPPING = {} return + + # Ensure annotator_ranges is populated + if not annotator_ranges and total_samples > 0: + print("Populating annotator_ranges for second phase initialization.") + annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + + + if len(ANNOTATORS) == 1: + # Single annotator reviews their own work if that's the desired logic + # Or, this phase might not apply. For now, let's allow self-review. + annotator = ANNOTATORS[0] + SECOND_PHASE_REVIEW_MAPPING[annotator] = annotator + print(f"Second phase: {annotator} will review their own work.") + else: + # Cyclic assignment: annotator[i] reviews annotator[i-1]'s work + for i, reviewer_user in enumerate(ANNOTATORS): + original_annotator_idx = (i - 1 + len(ANNOTATORS)) % len(ANNOTATORS) + original_annotator_user = ANNOTATORS[original_annotator_idx] + SECOND_PHASE_REVIEW_MAPPING[reviewer_user] = original_annotator_user + print(f"Second phase: {reviewer_user} will review {original_annotator_user}'s work.") - # Calculate splits based on annotators - total_annotators = len(ANNOTATORS) - splits = {} - for i, annotator in enumerate(ANNOTATORS): - # Each annotator's work becomes a split - splits[f"split_{i}"] = { - "range": annotator_ranges[annotator], - "original_annotator": annotator - } - - # Assign splits to different annotators for review - for i, (split_name, split_info) in enumerate(splits.items()): - reviewer = ANNOTATORS[(i + 1) % total_annotators] # Assign to next annotator - SPLITS_PER_USER[reviewer] = split_name - SPLIT_ASSIGNMENTS[split_name] = { - "reviewer": reviewer, - "range": split_info["range"], - "original_annotator": split_info["original_annotator"] - } + # Verify that original annotators have ranges + for reviewer, original_annotator in SECOND_PHASE_REVIEW_MAPPING.items(): + if original_annotator not in annotator_ranges: + print(f"Warning: Original annotator {original_annotator} has no range defined in annotator_ranges.") + # This could happen if total_samples was 0 or annotator_ranges wasn't calculated correctly. def get_user_allowed_range(username): - if not SECOND_PHASE: - if get_user_role(username) == "reviewer": - return (0, total_samples - 1) - elif username in annotator_ranges: + global annotator_ranges, total_samples + if SECOND_PHASE: + if not SECOND_PHASE_REVIEW_MAPPING: # Ensure it's initialized + initialize_second_phase_assignments() + + original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(username) + if original_annotator_to_review: + # The user `username` is reviewing `original_annotator_to_review`'s work. + # The range is the original work range of `original_annotator_to_review`. + if not annotator_ranges and total_samples > 0: # Lazy init for ranges if needed + annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + + return annotator_ranges.get(original_annotator_to_review) + else: # User is not a designated reviewer in the second phase mapping + return None # Or (0,-1) to signify no access + else: # First Phase Logic + if get_user_role(username) == "reviewer": # First phase reviewers see everything + return (0, total_samples - 1) if total_samples > 0 else None + elif username in annotator_ranges: # First phase annotators see their assigned range return annotator_ranges[username] - return None - - # In second phase, users can only access their assigned split - if username not in SPLITS_PER_USER: - return None - split = SPLIT_ASSIGNMENTS[SPLITS_PER_USER[username]] - return split["range"] + else: + return None def is_within_range(absolute_idx, allowed_range): if allowed_range is None: return False return allowed_range[0] <= absolute_idx <= allowed_range[1] -# # Future: Function to assign two annotators per sample based on chunking -# def get_annotators_for_sample(sample_idx, chunk_size, annotators): -# N = len(annotators) -# k = sample_idx // chunk_size # Determine the chunk index -# annotator1 = annotators[k % N] -# annotator2 = annotators[(k + 1) % N] -# return [annotator1, annotator2] - -# Future: Check if all assigned chunks are completed for an annotator -# def all_chunks_assigned_and_completed(username, completed_chunks): -# total_chunks = total_samples // CHUNK_SIZE + (1 if total_samples % CHUNK_SIZE else 0) -# user_chunks = set() -# for idx in range(total_samples): -# if username in get_annotators_for_sample(idx, CHUNK_SIZE, ANNOTATORS): -# chunk_idx = idx // CHUNK_SIZE -# user_chunks.add(chunk_idx) -# return user_chunks.issubset(completed_chunks.get(username, set())) - -# # Future: Get next unseen chunk for an annotator -# def get_next_unseen_chunk(username, total_chunks, completed_chunks): -# user_completed = completed_chunks.get(username, set()) -# for chunk_idx in range(total_chunks): -# if chunk_idx not in user_completed: -# return chunk_idx -# return None - -# Future: Assign a chunk to an annotator -# def assign_chunk_to_annotator(username, chunk_idx, annotators): -# completed_chunks.setdefault(username, set()) -# completed_chunks[username].add(chunk_idx) - -def get_user_role(username): +def get_user_role(username): # This defines first-phase roles return "reviewer" if username in REVIEWERS else "annotator" def init_dataset_iterator(): global ds_iter try: - ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - ds_iter = iter(ds) + # It's better to load the dataset on demand rather than keeping an iterator open. + # For streaming, iter(load_dataset(...)) is fine if used immediately. + # ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) + # ds_iter = iter(ds) return True except Exception as e: print(f"Error initializing dataset iterator: {e}") return False def load_page_data(page_num=0): - global ds_iter, current_page_data, current_page - if ds_iter is None: - if not init_dataset_iterator(): - return pd.DataFrame(columns=["audio", "sentence"]) - if page_num < current_page: - ds_iter = iter(load_dataset(HF_DATASET_NAME, split="train", streaming=True)) - current_page = 0 - samples_to_skip = page_num * PAGE_SIZE - (current_page * PAGE_SIZE) if page_num > current_page else 0 - for _ in range(samples_to_skip): - try: - next(ds_iter) - except StopIteration: - break - samples = [] - for _ in range(PAGE_SIZE): - try: - sample = next(ds_iter) - samples.append(sample) - except StopIteration: - break - current_page = page_num - current_page_data = pd.DataFrame(samples) + global current_page_data, current_page, total_samples - if CURRENT_USERNAME and get_user_role(CURRENT_USERNAME) == "annotator": - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if allowed_range: - start_idx = page_num * PAGE_SIZE - current_page_data['absolute_idx'] = range(start_idx, start_idx + len(current_page_data)) - current_page_data = current_page_data[ - (current_page_data['absolute_idx'] >= allowed_range[0]) & - (current_page_data['absolute_idx'] <= allowed_range[1]) - ] - current_page_data = current_page_data.drop(columns=['absolute_idx']) - - # Future: For annotators, filter samples based on assigned chunks - if get_user_role(CURRENT_USERNAME) == "annotator": - samples = [] + # For streaming, we re-fetch and skip. + try: ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - ds_iter = iter(ds) - absolute_start_idx = page_num * PAGE_SIZE - count = 0 - for i, sample in enumerate(ds_iter): - if i < absolute_start_idx: - continue - assigned = get_annotators_for_sample(i, CHUNK_SIZE, ANNOTATORS) - if CURRENT_USERNAME in assigned: - samples.append(sample) - count += 1 - if count == PAGE_SIZE: - break - current_page_data = pd.DataFrame(samples) + temp_ds_iter = iter(ds) + except Exception as e: + print(f"Error loading dataset for page data: {e}") + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page"]) + return current_page_data + + # Determine the actual range of samples the user can see + # This needs to be based on the full dataset indices, not just page logic + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not allowed_range: + print(f"User {CURRENT_USERNAME} has no allowed range.") + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page"]) + return current_page_data + + # Calculate start and end absolute indices for the requested page, clipped by allowed_range + page_start_abs_idx = page_num * PAGE_SIZE + page_end_abs_idx = page_start_abs_idx + PAGE_SIZE - 1 + + # Effective start and end for fetching, considering user's total allowed range + effective_start_idx = max(page_start_abs_idx, allowed_range[0]) + effective_end_idx = min(page_end_abs_idx, allowed_range[1]) + + samples_on_page = [] + current_absolute_idx = 0 + + # Iterate through the dataset to find samples within the effective range for this page + # This can be slow for large datasets and large page_num with streaming. + # A non-streaming dataset or a more optimized way to seek would be better for large scale. + + idx_counter_for_page = 0 + for i, sample_data in enumerate(temp_ds_iter): + current_absolute_idx = i # Absolute index in the full dataset + + if current_absolute_idx > effective_end_idx : + break # Past the samples needed for this page and user range + + if current_absolute_idx >= effective_start_idx: + # This sample is within the user's allowed range and on the current conceptual page + sample_data['absolute_idx'] = current_absolute_idx + sample_data['id_within_page'] = idx_counter_for_page # relative index on current page view + samples_on_page.append(sample_data) + idx_counter_for_page +=1 + if len(samples_on_page) >= PAGE_SIZE : # Filled the page + break + + current_page = page_num + if samples_on_page: + current_page_data = pd.DataFrame(samples_on_page) + else: + # If no samples found (e.g., page is outside effective range) + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) + print(f"No samples found for user {CURRENT_USERNAME} on page {page_num} within effective range {effective_start_idx}-{effective_end_idx}") gc.collect() return current_page_data + def get_dataset_info(): + global total_samples # Use global total_samples + if total_samples > 0: # If already fetched + return {'num_samples': total_samples} try: - info = load_dataset(HF_DATASET_NAME, split="train", streaming=True).info - if hasattr(info, 'splits') and 'train' in info.splits: - return {'num_samples': info.splits['train'].num_examples} + # Temporarily load to get info, can be slow for huge datasets if not streaming + # For streaming, num_examples might be None or -1, so actual iteration might be needed + info = load_dataset(HF_DATASET_NAME, streaming=True, split="train").info + # The 'num_examples' for a streaming dataset split might not be accurate or available. + # It's often -1 or None. You might need a way to get the true total count if it's crucial. + # For now, we'll use it if available, otherwise, it remains a challenge for pure streaming. + if hasattr(info, 'estimated_size') and info.estimated_size is not None: # Check an alternative if num_examples is not good + pass # Not directly number of samples + + # Fallback: iterate to count if num_examples is not reliable + # This is very inefficient and should be avoided if possible. + # A pre-calculated count or a different dataset split might be needed. + # For this example, we'll assume info.splits['train'].num_examples is somewhat usable + # or that a fixed total_samples is set if this is problematic. + + # Simplified: try to get from info, but acknowledge limitations + ds_info_obj = load_dataset(HF_DATASET_NAME, split="train") # Load non-streaming for info + num_samples_val = ds_info_obj.num_rows + if num_samples_val and num_samples_val > 0: + total_samples = num_samples_val + return {'num_samples': total_samples} + + # If still no count, this is an issue for range calculations. + # For now, return -1, but this will break range logic. + print("Warning: Could not reliably determine total_samples from dataset info.") return {'num_samples': -1} + except Exception as e: print(f"Error getting dataset info: {e}") return {'num_samples': -1} -init_dataset_iterator() -current_page_data = load_page_data(0) -dataset_info = get_dataset_info() -total_samples = dataset_info.get('num_samples', -1) + +# Initial data load (moved after functions it calls are defined) +# init_dataset_iterator() # Iterator not maintained globally anymore for streaming robustness +dataset_info = get_dataset_info() # This sets global total_samples if total_samples > 0: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + if SECOND_PHASE: + initialize_second_phase_assignments() # Initialize after annotator_ranges might be populated else: + print("Warning: total_samples is not positive. Annotation ranges and second phase assignments may be incorrect.") annotator_ranges = {} -assign_splits_for_second_phase() + +# Load first page data for the initial user if any +# This should happen after login when CURRENT_USERNAME is set. +# current_page_data = load_page_data(0) # Moved to hf_login success path + def get_audio_path(audio_entry): if isinstance(audio_entry, dict): if "array" in audio_entry and "sampling_rate" in audio_entry: - return (audio_entry["sampling_rate"], audio_entry["array"]) + return (audio_entry["sampling_rate"], audio_entry["array"]) # Return tuple for direct use return audio_entry.get("path", None) if isinstance(audio_entry, str): if audio_entry.startswith("http://") or audio_entry.startswith("https://"): + return audio_entry # URL + if os.path.exists(audio_entry): # Absolute path return audio_entry - if os.path.exists(audio_entry): - return audio_entry - joined_path = os.path.join(AUDIO_DIR, audio_entry) - if os.path.exists(joined_path): - return joined_path - return audio_entry - + # Relative path (try joining with AUDIO_DIR if one is configured) + if AUDIO_DIR: + joined_path = os.path.join(AUDIO_DIR, audio_entry) + if os.path.exists(joined_path): + return joined_path + return audio_entry # Return as is, might be a relative path resolvable by datasets + return None # Or handle unknown type # Core functions -def save_sample_data(page_idx, idx, transcript, reviewer, accepted=False): +def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_action, accepted_flag=False): global current_page_data, unsaved_changes - if idx >= len(current_page_data): - return "Invalid index" - absolute_idx = page_idx * PAGE_SIZE + idx - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(absolute_idx, allowed_range): - return "You are not allowed to annotate this sample." - audio_entry = current_page_data.iloc[idx]["audio"] - voice_name = os.path.basename(str(audio_entry)) - dataset = load_saved_annotations() - sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) + if current_page_data is None or idx_on_page >= len(current_page_data): + return "Invalid index or data not loaded for current page." + + actual_sample_info = current_page_data.iloc[idx_on_page] + absolute_idx = actual_sample_info['absolute_idx'] + + # First phase saving logic + allowed_range = get_user_allowed_range(current_user_performing_action) + if not is_within_range(absolute_idx, allowed_range) and not SECOND_PHASE: # In 2nd phase, this check is implicitly handled by page loading + return "You are not allowed to annotate this sample (out of range)." + + audio_entry_original = actual_sample_info["audio"] # This might be path or dict + voice_name = os.path.basename(str(get_audio_path(audio_entry_original) or f"sample_{absolute_idx}")) + + dataset_model = load_saved_annotations() + sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) + if not sample: sample = Sample( id=absolute_idx, voice_name=voice_name, - original_subtitle=current_page_data.iloc[idx]["sentence"], + original_subtitle=actual_sample_info["sentence"], annotations=[] ) - dataset.samples = dataset.samples or [] - dataset.samples.append(sample) - + dataset_model.samples = dataset_model.samples or [] + dataset_model.samples.append(sample) + now = datetime.now() - annotation = next((a for a in sample.annotations or [] if a.annotator == reviewer), None) + # In the first phase, current_user_performing_action is the annotator or reviewer. + # 'accepted_flag' is used if current_user_performing_action is a first-phase reviewer. + annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) + if annotation: annotation.annotated_subtitle = transcript.strip() annotation.update_at = now - if get_user_role(reviewer) == "reviewer": - annotation.accepted = accepted + if get_user_role(current_user_performing_action) == "reviewer": # First phase reviewer + annotation.is_first_phase_accepted = accepted_flag + annotation.first_phase_reviewer_username = current_user_performing_action if accepted_flag else None else: - annotation = Annotation( - annotator=reviewer, - annotated_subtitle=transcript.strip(), - create_at=now, - update_at=now, - accepted=accepted if get_user_role(reviewer) == "reviewer" else False - ) + new_annotation_data = { + "annotator": current_user_performing_action, + "annotated_subtitle": transcript.strip(), + "create_at": now, + "update_at": now, + "is_first_phase_accepted": False # Default + } + if get_user_role(current_user_performing_action) == "reviewer": + new_annotation_data["is_first_phase_accepted"] = accepted_flag + if accepted_flag: + new_annotation_data["first_phase_reviewer_username"] = current_user_performing_action + + annotation = Annotation(**new_annotation_data) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) - + if absolute_idx in unsaved_changes: del unsaved_changes[absolute_idx] - - save_annotations(dataset) + + save_annotations(dataset_model) return f"✓ Saved annotation for sample {absolute_idx}" - # Future: Modified save_sample_data with feedback and dual annotator logic - # assigned_annotators = get_annotators_for_sample(absolute_idx, CHUNK_SIZE, ANNOTATORS) - # if CURRENT_USERNAME not in assigned_annotators: - # return "You are not assigned to this sample." - - annotation = Annotation( - annotator=CURRENT_USERNAME, - annotated_subtitle=transcript.strip(), - feedback=feedback, # "approved" or "rejected" - create_at=now, - update_at=now - ) - sample.annotations = sample.annotations or [] - sample.annotations.append(annotation) +def handle_second_phase_action(page_idx, idx_on_page, action: str): # action is "approved" or "rejected" + global current_page_data, CURRENT_USERNAME - # Check if both annotators have approved - annotations = [a for a in sample.annotations if a.annotator in assigned_annotators] - if len(annotations) == 2 and all(a.feedback == "approved" for a in annotations): - sample.accepted = True + if not SECOND_PHASE: + return "Not in second phase." + if current_page_data is None or idx_on_page >= len(current_page_data): + return "Invalid index or data not loaded for current page (second phase)." + + actual_sample_info = current_page_data.iloc[idx_on_page] + absolute_idx = actual_sample_info['absolute_idx'] - if absolute_idx in unsaved_changes: - del unsaved_changes[absolute_idx] + original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(CURRENT_USERNAME) + if not original_annotator_to_review: + return "You are not assigned to review any user's work." + + dataset_model = load_saved_annotations() + sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) + if not sample: + # This case should ideally not happen if data is consistent. + # If it does, it means the sample exists in source dataset but not annotations.json. + # A reviewer in 2nd phase is reviewing existing annotation. + return f"Error: Sample {absolute_idx} not found in annotations.json for review." + + # Find the annotation made by the original_annotator_to_review + annotation_to_review = next((ann for ann in sample.annotations or [] if ann.annotator == original_annotator_to_review), None) + + if not annotation_to_review: + # If original annotator did not make an annotation for this sample. + # Option 1: Create a placeholder annotation based on original_subtitle and review that. + # Option 2: Report error. For now, report error. + # This implies the first phase annotator skipped this item or it wasn't in their range correctly. + print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Reviewing original subtitle implicitly.") + # Let's create one if missing, based on original subtitle + annotation_to_review = Annotation( + annotator=original_annotator_to_review, + annotated_subtitle=sample.original_subtitle, # Use original subtitle + create_at=sample.annotations[0].create_at if sample.annotations else datetime.now(), # Approx original creation + update_at=datetime.now() + ) + sample.annotations = sample.annotations or [] + sample.annotations.append(annotation_to_review) + + + annotation_to_review.second_phase_reviewed_by = CURRENT_USERNAME + annotation_to_review.second_phase_review_status = action + annotation_to_review.second_phase_review_timestamp = datetime.now() + annotation_to_review.update_at = datetime.now() + + if action == "approved": + sample.is_approved_in_second_phase = True + # If rejected, is_approved_in_second_phase could be set to False, or depend on other conditions. + # For now, only explicit approval sets it to True. + + save_annotations(dataset_model) + return f"✓ Review ({action}) saved for sample {absolute_idx} (Original annotator: {original_annotator_to_review})" + + +def get_sample(page_idx, idx_on_page, current_user_displaying): # current_user_displaying is CURRENT_USERNAME + global current_page_data, unsaved_changes, total_samples - save_annotations(dataset) - # Mark chunk as completed if fully annotated - chunk_idx = absolute_idx // CHUNK_SIZE - completed_chunks.setdefault(CURRENT_USERNAME, set()) - completed_chunks[CURRENT_USERNAME].add(chunk_idx) - return f"✓ Saved annotation for sample {absolute_idx}" + if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): + return None, "", f"Invalid index. Range is 0-{len(current_page_data)-1}", "unreviewed", "white", True, False, "" -def get_sample(page_idx, idx, current_username): - global current_page_data, unsaved_changes - if idx < 0 or idx >= len(current_page_data): - return None, "", f"Invalid index. Range is 0-{len(current_page_data)-1}", "unreviewed", "white", True, False - absolute_idx = page_idx * PAGE_SIZE + idx - audio_entry = current_page_data.iloc[idx]["audio"] - voice_name = os.path.basename(str(audio_entry)) - dataset = load_saved_annotations() - - sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) - audio_val = get_audio_path(audio_entry) - transcript = current_page_data.iloc[idx]["sentence"] - reviewer = "unreviewed" - color = "white" - editable = True - accepted = False + actual_sample_info = current_page_data.iloc[idx_on_page] + absolute_idx = actual_sample_info['absolute_idx'] - if sample: - if sample.ignore_it: + audio_entry_original = actual_sample_info["audio"] + audio_val = get_audio_path(audio_entry_original) + + default_transcript = actual_sample_info["sentence"] + transcript_to_display = default_transcript + + # UI states + ui_reviewer_field = "unreviewed" # Textbox showing who annotated/reviewed + ui_color = "white" + ui_editable = True # Transcript text area + ui_is_accepted_flag = False # For first phase checkmark logic, or second phase display + ui_status_message = f"Sample {absolute_idx+1}" + if total_samples > 0: + ui_status_message += f" of {total_samples}" + + dataset_model = load_saved_annotations() + sample_from_json = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) + + if sample_from_json: + if sample_from_json.ignore_it: audio_val = None - transcript = "AUDIO DELETED (This audio has been removed.)" - reviewer = "deleted" - color = "red" - editable = False - else: - accepted_annotation = next((a for a in sample.annotations if a.accepted), None) - if accepted_annotation: - transcript = accepted_annotation.annotated_subtitle or transcript - reviewer = accepted_annotation.annotator - color = "green" - editable = (get_user_role(current_username) == "reviewer") - accepted = True + transcript_to_display = "AUDIO DELETED (This audio has been removed.)" + ui_reviewer_field = "deleted" + ui_color = "red" + ui_editable = False + + elif SECOND_PHASE: + ui_editable = False # Transcript not editable in 2nd phase + original_annotator_being_reviewed = SECOND_PHASE_REVIEW_MAPPING.get(current_user_displaying) + + if not original_annotator_being_reviewed: # Should not happen if UI is controlled properly + transcript_to_display = "Error: User not in review mapping." + ui_color = "red" else: - user_annotation = next((a for a in sample.annotations if a.annotator == current_username), None) - if user_annotation: - transcript = user_annotation.annotated_subtitle or transcript - reviewer = user_annotation.annotator - color = "yellow" if absolute_idx not in unsaved_changes else "pink" - editable = True + ui_reviewer_field = f"Reviewing: {original_annotator_being_reviewed}" + annotation_under_review = next((ann for ann in sample_from_json.annotations or [] if ann.annotator == original_annotator_being_reviewed), None) + + if annotation_under_review: + transcript_to_display = annotation_under_review.annotated_subtitle or default_transcript + ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved") + + if annotation_under_review.second_phase_reviewed_by: + if annotation_under_review.second_phase_reviewed_by == current_user_displaying: + ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange" # orange for rejected by current user + else: # Reviewed by someone else + ui_color = "gray" + ui_reviewer_field += f" (Reviewed by {annotation_under_review.second_phase_reviewed_by})" + else: # Pending review by current_user_displaying + ui_color = "yellow" # Indicates pending current user's review + else: # No annotation from original annotator for this sample + transcript_to_display = default_transcript # Show original dataset subtitle + ui_reviewer_field += " (Original annotator made no submission)" + ui_color = "lightgray" # Needs review, but based on original + + else: # First Phase Logic + # Check for an accepted annotation by a first-phase reviewer + accepted_first_phase_annotation = next((a for a in sample_from_json.annotations or [] if a.is_first_phase_accepted and a.first_phase_reviewer_username), None) + + if accepted_first_phase_annotation: + transcript_to_display = accepted_first_phase_annotation.annotated_subtitle or default_transcript + ui_reviewer_field = accepted_first_phase_annotation.first_phase_reviewer_username + ui_color = "green" + ui_is_accepted_flag = True + ui_editable = (get_user_role(current_user_displaying) == "reviewer") # Only 1st phase reviewer can edit accepted + else: + # Check for annotation by the current user (annotator or reviewer) + user_specific_annotation = next((a for a in sample_from_json.annotations or [] if a.annotator == current_user_displaying), None) + if user_specific_annotation: + transcript_to_display = user_specific_annotation.annotated_subtitle or default_transcript + ui_reviewer_field = user_specific_annotation.annotator + ui_color = "yellow" if absolute_idx not in unsaved_changes else "pink" + ui_editable = True else: - other_annotations = [a for a in sample.annotations if a.annotator != current_username] + # Check for annotations by other annotators (not current user, not accepted by reviewer) + # Display the first one found for a reviewer to potentially act on, or inform annotator + other_annotations = [a for a in sample_from_json.annotations or [] if a.annotator != current_user_displaying and not a.is_first_phase_accepted] if other_annotations: - if get_user_role(current_username) == "reviewer": - other_annotation = other_annotations[0] - transcript = other_annotation.annotated_subtitle or transcript - reviewer = other_annotation.annotator - else: - transcript = current_page_data.iloc[idx]["sentence"] - reviewer = "labeled by another annotator" - color = "blue" - editable = (get_user_role(current_username) == "reviewer") - else: + # If current user is a reviewer, they see the other annotator's work + if get_user_role(current_user_displaying) == "reviewer": + other_ann_to_show = other_annotations[0] + transcript_to_display = other_ann_to_show.annotated_subtitle or default_transcript + ui_reviewer_field = other_ann_to_show.annotator + ui_color = "blue" # Reviewer sees other's work + ui_editable = True + else: # Current user is an annotator, and another annotator worked on it + # This state is a bit ambiguous. Default to original if not assigned to this user. + # For simplicity, show original if it's not their saved work. + transcript_to_display = default_transcript + ui_reviewer_field = "labeled by another annotator" + ui_color = "lightblue" + ui_editable = False # Annotator cannot edit other annotator's unreviewed work + else: # No annotations at all, or only unreviewed by others and user is annotator if absolute_idx in unsaved_changes: - transcript = unsaved_changes[absolute_idx] - reviewer = current_username - color = "pink" - editable = True - else: - transcript = current_page_data.iloc[idx]["sentence"] - reviewer = "unreviewed" - color = "white" - editable = True - - status = f"Sample {absolute_idx+1}" - if total_samples > 0: - status += f" of {total_samples}" - return audio_val, transcript, status, reviewer, color, editable, accepted - -def load_interface(page_idx, idx): - audio, text, base_status, saved_reviewer, color, editable, accepted = get_sample(page_idx, idx, CURRENT_USERNAME) - absolute_idx = page_idx * PAGE_SIZE + idx - audio_entry = current_page_data.iloc[idx]["audio"] - key = f"{absolute_idx}_{os.path.basename(str(audio_entry))}" - if key not in audio_backup: - audio_backup[key] = audio - status = f"{base_status} - Page {page_idx+1} - Reviewer: {saved_reviewer}" + transcript_to_display = unsaved_changes[absolute_idx] + ui_reviewer_field = current_user_displaying + ui_color = "pink" + ui_editable = True + # else, default_transcript, unreviewed, white, editable=True (already set) + + # If no sample_from_json, then it's a fresh sample from dataset + # transcript_to_display remains default_transcript. ui states remain default. + # This case is hit if annotations.json doesn't have this absolute_idx yet. + + # Status message update + current_page_for_status = page_idx + 1 # page_idx is 0-indexed + # If current_page_data has 'absolute_idx', we can use that + # page_num_from_abs = (absolute_idx // PAGE_SIZE) + 1 + + ui_status_message = f"{ui_status_message} - Page {current_page_for_status}" + if SECOND_PHASE : + ui_status_message += " (Review Phase)" + else: + ui_status_message += " (Annotation Phase)" + + + return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript + + +def load_interface_data(page_idx, idx_on_page): # Renamed from load_interface to avoid conflict + # This function is primarily a wrapper around get_sample for UI updates + audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) + + # Audio backup logic (can be simplified or removed if not strictly needed for undo_trim) + # absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] if current_page_data is not None and idx_on_page < len(current_page_data) else -1 + # audio_entry_original = current_page_data.iloc[idx_on_page]["audio"] if current_page_data is not None and idx_on_page < len(current_page_data) else "" + # key = f"{absolute_idx}_{os.path.basename(str(get_audio_path(audio_entry_original) or 'unknown'))}" + # if key not in audio_backup and audio is not None: # Backup the audio playable value + # audio_backup[key] = audio + return ( - page_idx, - idx, - audio, - gr.update(value=text, interactive=editable), - gr.update(value=saved_reviewer, elem_classes=[color], interactive=False), - status, - text + page_idx, # current_page_idx state + idx_on_page, # current_idx_on_page state + audio, # audio_player value + gr.update(value=text, interactive=editable), # transcript update + gr.update(value=saved_reviewer_text, elem_classes=[color]), # reviewer Textbox update + base_status, # status markdown update + original_dataset_text # original_transcript state ) # Navigation functions -def next_sample(page_idx, idx): - global current_page_data - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - absolute_idx = page_idx * PAGE_SIZE + idx - if is_within_range(absolute_idx + 1, allowed_range): - if idx < len(current_page_data) - 1: - new_page_idx = page_idx - new_idx = idx + 1 - else: - new_page_idx = page_idx + 1 - load_page_data(new_page_idx) - if len(current_page_data) > 0: - new_idx = 0 - else: - return page_idx, idx, gr.update(), gr.update(), gr.update(), "No more samples in your range.", gr.update() - else: - return page_idx, idx, gr.update(), gr.update(), gr.update(), "Next sample is outside your assigned range.", gr.update() - return load_interface(new_page_idx, new_idx) +def navigate_sample(page_idx, idx_on_page, direction: int): # direction: 1 for next, -1 for prev + global current_page_data, total_samples + + if current_page_data is None or len(current_page_data) == 0: + return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No data loaded.", gr.update() -def go_next_without_save(page_idx, idx, current_text, current_annotator, original_transcript): - global current_page_data, unsaved_changes - absolute_idx = page_idx * PAGE_SIZE + idx - if current_text.strip() != original_transcript.strip(): - unsaved_changes[absolute_idx] = current_text - audio, text, base_status, saved_reviewer, color, editable, accepted = get_sample(page_idx, idx, CURRENT_USERNAME) - status = f"{base_status} - Page {page_idx+1} - Reviewer: {saved_reviewer} [Unsaved changes]" - return ( - page_idx, - idx, - audio, - gr.update(value=text, interactive=editable), - gr.update(value=saved_reviewer, elem_classes=["pink"], interactive=False), - status, - original_transcript - ) - return next_sample(page_idx, idx) + target_idx_on_page = idx_on_page + direction + + new_page_idx = page_idx + new_idx_on_page = target_idx_on_page -def prev_sample(page_idx, idx, current_text, current_annotator, original_transcript): - global current_page_data, unsaved_changes - absolute_idx = page_idx * PAGE_SIZE + idx - if current_text.strip() != original_transcript.strip(): - save_sample_data(page_idx, idx, current_text, CURRENT_USERNAME, False) - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if is_within_range(absolute_idx - 1, allowed_range): - if idx > 0: - new_page_idx = page_idx - new_idx = idx - 1 - else: - if page_idx > 0: - new_page_idx = page_idx - 1 - new_data = load_page_data(new_page_idx) - if len(new_data) > 0: - new_idx = len(new_data) - 1 - else: - return page_idx, idx, gr.update(), gr.update(), gr.update(), "No previous samples in your range.", gr.update() + if target_idx_on_page < 0: # Need to go to previous page + if page_idx > 0: + new_page_idx = page_idx - 1 + # Load new page data and set index to last item + temp_data = load_page_data(new_page_idx) + if temp_data is not None and not temp_data.empty: + new_idx_on_page = len(temp_data) - 1 + else: # Previous page is empty or out of allowed range + return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No more samples in this direction (prev page).", gr.update() + else: # Already on first item of first page + return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "At the beginning of your assigned samples.", gr.update() + + elif target_idx_on_page >= len(current_page_data): # Need to go to next page + new_page_idx = page_idx + 1 + temp_data = load_page_data(new_page_idx) # load_page_data updates current_page_data + if temp_data is not None and not temp_data.empty: + new_idx_on_page = 0 + else: # Next page is empty or out of allowed range + # Check if we are at the very end of the allowed samples + allowed_range = get_user_allowed_range(CURRENT_USERNAME) + current_abs_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] + if allowed_range and current_abs_idx >= allowed_range[1]: + return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "At the end of your assigned samples.", gr.update() else: - return page_idx, idx, gr.update(), gr.update(), gr.update(), "No previous samples in your range.", gr.update() - else: - return page_idx, idx, gr.update(), gr.update(), gr.update(), "Previous sample is outside your assigned range.", gr.update() - return load_interface(new_page_idx, new_idx) - -def jump_to(target_idx, page_idx, idx, current_text, current_annotator, original_transcript): - global unsaved_changes - absolute_idx = page_idx * PAGE_SIZE + idx - if current_text.strip() != original_transcript.strip(): - save_sample_data(page_idx, idx, current_text, CURRENT_USERNAME, False) + return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No more samples in this direction (next page).", gr.update() + + # If we switched page, current_page_data is already updated by load_page_data. + # If staying on same page, it's fine. + return load_interface_data(new_page_idx, new_idx_on_page) + + +def go_next_sample_wrapper(page_idx, idx_on_page): # Simpler wrapper for UI + return navigate_sample(page_idx, idx_on_page, 1) + +def go_prev_sample_wrapper(page_idx, idx_on_page): # Simpler wrapper for UI + return navigate_sample(page_idx, idx_on_page, -1) + + +def save_and_next_sample_first_phase(page_idx, idx_on_page, current_text, is_accepted_by_reviewer_flag): + # Note: `current_annotator_ui` (reviewer textbox value) is not who is performing action. + # CURRENT_USERNAME is performing the action. + # `is_accepted_by_reviewer_flag` is the checkbox state (true/false) if user is a reviewer. + # If user is an annotator, this flag might not be directly applicable or always false from UI. + + # Determine if the current user is acting as a first-phase reviewer to use the 'accepted' flag + user_is_reviewer = get_user_role(CURRENT_USERNAME) == "reviewer" + save_msg = save_sample_data(page_idx, idx_on_page, current_text, CURRENT_USERNAME, + accepted_flag=is_accepted_by_reviewer_flag if user_is_reviewer else False) + print(save_msg) # Log save message + # Then navigate + return navigate_sample(page_idx, idx_on_page, 1) + + +def review_and_next_sample_second_phase(page_idx, idx_on_page, review_action: str): + feedback_msg = handle_second_phase_action(page_idx, idx_on_page, review_action) + print(feedback_msg) # Log feedback message + # Then navigate + return navigate_sample(page_idx, idx_on_page, 1) + + +def jump_to_absolute_idx(target_abs_idx_str, current_page_idx, current_idx_on_page): # Removed unused text/annotator params + global current_page_data try: - target_idx = int(target_idx) - if target_idx < 0: - target_idx = 0 + target_abs_idx = int(target_abs_idx_str) + if target_abs_idx < 0: target_abs_idx = 0 + allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(target_idx, allowed_range): - return page_idx, idx, gr.update(), gr.update(), gr.update(), "Target index is outside your assigned range.", gr.update() - new_page_idx = target_idx // PAGE_SIZE - new_idx = target_idx % PAGE_SIZE - new_data = load_page_data(new_page_idx) - if new_idx >= len(new_data): - new_idx = len(new_data) - 1 if len(new_data) > 0 else 0 - return load_interface(new_page_idx, new_idx) - except: - return load_interface(page_idx, idx) - -def save_and_next_sample(page_idx, idx, current_text, current_annotator, original_transcript): - save_sample_data(page_idx, idx, current_text, current_annotator, False) - return next_sample(page_idx, idx) - -# Audio editing functions -def trim_audio_action(page_idx, idx, trim_start, trim_end, current_text, current_annotator, original_transcript): - audio, transcript, base_status, saved_reviewer, color, editable, accepted = get_sample(page_idx, idx, CURRENT_USERNAME) - absolute_idx = page_idx * PAGE_SIZE + idx - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(absolute_idx, allowed_range): - return page_idx, idx, audio, transcript, saved_reviewer, "Sample is outside your assigned range.", transcript - temp_audio_file = None - if isinstance(audio, tuple): - sample_rate, audio_array = audio - try: - if np.issubdtype(audio_array.dtype, np.floating): - audio_array = (audio_array * 32767).astype(np.int16) - else: - audio_array = np.array(audio_array) - temp_audio_file = os.path.join(tempfile.gettempdir(), f"temp_{page_idx}{idx}.wav") - sf.write(temp_audio_file, audio_array, sample_rate) - audio = temp_audio_file - except Exception as e: - return page_idx, idx, audio, transcript, saved_reviewer, f"Error converting raw audio: {str(e)}", transcript - if not isinstance(audio, str) or not os.path.exists(audio): - return page_idx, idx, audio, transcript, saved_reviewer, "Trimming not supported for this audio format.", transcript + if not is_within_range(target_abs_idx, allowed_range): + status_msg = f"Target index {target_abs_idx} is outside your assigned range {allowed_range}." + # Return current state with error message + audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + + new_page_idx = target_abs_idx // PAGE_SIZE + new_idx_on_page_conceptual = target_abs_idx % PAGE_SIZE # This is index on the conceptual new page + + # Load data for the new page + temp_page_data = load_page_data(new_page_idx) # This updates global current_page_data + + if temp_page_data is None or temp_page_data.empty: + status_msg = f"No data found for page {new_page_idx} containing index {target_abs_idx}." + audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + + # Find the actual index on the loaded page for target_abs_idx + # The loaded page might not start exactly at new_page_idx * PAGE_SIZE if user's range is small. + # `load_page_data` now adds 'absolute_idx' and 'id_within_page' to `current_page_data` + + # Find the row with the matching absolute_idx in the newly loaded current_page_data + matching_rows = current_page_data[current_page_data['absolute_idx'] == target_abs_idx] + if not matching_rows.empty: + new_idx_on_page_actual = matching_rows.index[0] # This is the DataFrame index, should be same as 'id_within_page' + else: + # This means target_abs_idx, though in allowed_range, was not on the loaded page (e.g. page is sparse due to filtering) + # Fallback: load the first item of the page if target not found directly. + # Or better, report an issue. + status_msg = f"Index {target_abs_idx} is in range, but not found on page {new_page_idx}. Displaying start of page." + print(status_msg) # Log this + new_idx_on_page_actual = 0 # Default to first item of the loaded page + if current_page_data.empty : # Page is actually empty + audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) # Revert to old view + return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + + return load_interface_data(new_page_idx, new_idx_on_page_actual) + + except ValueError: + status_msg = "Invalid index format for jump." + audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + except Exception as e: + status_msg = f"Error jumping to index: {e}" + print(status_msg) + audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + + +# Audio editing functions (simplifying, assuming these are for phase 1 only) +def trim_audio_action(page_idx, idx_on_page, trim_start_str, trim_end_str): + # This function would need significant rework if used with the new get_sample returns. + # For now, let's assume it's for phase 1 and we fetch audio path differently or disable in phase 2. + # For simplicity in this modification, advanced audio ops might be limited. + if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Trimming disabled in Review Phase.", gr.Textbox() + + # Simplified: fetch audio path if possible + audio_val, transcript, base_status, saved_reviewer, color, editable, accepted, _ = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) + + if not isinstance(audio_val, str) or not os.path.exists(audio_val): + # Try to get original path from current_page_data for non-raw audio + if current_page_data is not None and idx_on_page < len(current_page_data): + audio_entry = current_page_data.iloc[idx_on_page]["audio"] + resolved_path = get_audio_path(audio_entry) + if isinstance(resolved_path, str) and os.path.exists(resolved_path): + audio_val = resolved_path + else: # If it's raw audio data (tuple) or URL, or non-existent path + return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Trimming not supported for this audio format or it's not a local file.", transcript + else: + return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Audio data not available for trimming.", transcript + + + absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] + voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + + try: - audio_seg = AudioSegment.from_file(audio) - start_ms = int(float(trim_start) * 1000) - end_ms = int(float(trim_end) * 1000) + audio_seg = AudioSegment.from_file(audio_val) + start_ms = int(float(trim_start_str) * 1000) + end_ms = int(float(trim_end_str) * 1000) trimmed_seg = audio_seg[start_ms:end_ms] + os.makedirs("trimmed_audio", exist_ok=True) - voice_name = os.path.basename(str(current_page_data.iloc[idx]["audio"])) - trimmed_path = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{voice_name}") - trimmed_seg.export(trimmed_path, format="wav") + trimmed_filename = f"trimmed_{absolute_idx}_{voice_name_original}" + # Ensure unique extension, wav is usually safe + if not trimmed_filename.lower().endswith(('.wav', '.mp3', '.flac')): + trimmed_filename += ".wav" + trimmed_path = os.path.join("trimmed_audio", trimmed_filename) - dataset = load_saved_annotations() - sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) - if not sample: - sample = Sample( - id=absolute_idx, - voice_name=voice_name, - original_subtitle=current_page_data.iloc[idx]["sentence"], - annotations=[] - ) - dataset.samples = dataset.samples or [] - dataset.samples.append(sample) + # Export format might need to match original or be a standard like wav + export_format = os.path.splitext(trimmed_path)[1][1:] + if not export_format: export_format = "wav" # Default if no extension + + trimmed_seg.export(trimmed_path, format=export_format) + + dataset_model = load_saved_annotations() + sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) + if not sample: # Should exist if we are editing it + return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Error: Sample not found in annotations for trimming.", transcript now = datetime.now() - annotation = next((a for a in sample.annotations or [] if a.annotator == saved_reviewer), None) - if annotation: - annotation.audio_trims = [AudioTrim(start=float(trim_start), end=float(trim_end))] - annotation.update_at = now - else: + # Associate trim with current user's annotation for this sample + annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) + if not annotation: # Create if doesn't exist annotation = Annotation( - annotator=saved_reviewer, - audio_trims=[AudioTrim(start=float(trim_start), end=float(trim_end))], + annotator=CURRENT_USERNAME, + annotated_subtitle=transcript, # Current transcript + audio_trims=[AudioTrim(start=float(trim_start_str), end=float(trim_end_str))], create_at=now, update_at=now ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) + else: + annotation.audio_trims = [AudioTrim(start=float(trim_start_str), end=float(trim_end_str))] + annotation.update_at = now - save_annotations(dataset) + save_annotations(dataset_model) new_status = f"{base_status} [Trimmed]" - return page_idx, idx, trimmed_path, transcript, saved_reviewer, new_status, transcript + return page_idx, idx_on_page, trimmed_path, transcript, saved_reviewer, new_status, transcript except Exception as e: - return page_idx, idx, audio, transcript, saved_reviewer, f"Error trimming audio: {str(e)}", transcript + return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, f"Error trimming audio: {str(e)}", transcript -def undo_trim_action(page_idx, idx, current_text, current_annotator, original_transcript): - audio, transcript, base_status, saved_reviewer, _, _, _ = get_sample(page_idx, idx, CURRENT_USERNAME) - absolute_idx = page_idx * PAGE_SIZE + idx - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(absolute_idx, allowed_range): - return page_idx, idx, audio, transcript, saved_reviewer, "Sample is outside your assigned range.", transcript - - voice_name = os.path.basename(str(current_page_data.iloc[idx]["audio"])) - dataset = load_saved_annotations() - sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) + +def undo_trim_action(page_idx, idx_on_page): + if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Undo Trim disabled in Review Phase.", gr.Textbox() + + audio_val, transcript, base_status, saved_reviewer, color, editable, accepted, _ = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) + absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] + voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + + dataset_model = load_saved_annotations() + sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if sample: - annotation = next((a for a in sample.annotations or [] if a.annotator == saved_reviewer), None) + annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) # Trim is user-specific if annotation and annotation.audio_trims: annotation.audio_trims = None annotation.update_at = datetime.now() - save_annotations(dataset) + save_annotations(dataset_model) + + # Restore original audio from backup or re-fetch from source dataset info + original_audio_path_or_data = current_page_data.iloc[idx_on_page]["audio"] # This is the source entry + restored_audio_val = get_audio_path(original_audio_path_or_data) - orig_audio = audio_backup.get(f"{absolute_idx}_{voice_name}", audio) + # key = f"{absolute_idx}_{voice_name_original}" + # orig_audio_backup = audio_backup.get(key) # Fetch from backup if available + # if not orig_audio_backup: # If not in backup, use the path from current_page_data + # orig_audio_backup = get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) + new_status = f"{base_status} [Trim undone]" - return page_idx, idx, orig_audio, transcript, saved_reviewer, new_status, transcript + return page_idx, idx_on_page, restored_audio_val, transcript, saved_reviewer, new_status, transcript -def confirm_delete_audio(page_idx, idx, current_text, current_annotator, original_transcript): - absolute_idx = page_idx * PAGE_SIZE + idx - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(absolute_idx, allowed_range): - return page_idx, idx, gr.update(), gr.update(), gr.update(), "Sample is outside your assigned range.", gr.update() - - voice_name = os.path.basename(str(current_page_data.iloc[idx]["audio"])) - dataset = load_saved_annotations() - sample = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) + +def confirm_delete_audio_action(page_idx, idx_on_page): + if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Delete disabled in Review Phase.", gr.Textbox() + + absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] + voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + + dataset_model = load_saved_annotations() + sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if not sample: sample = Sample( id=absolute_idx, - voice_name=voice_name, - original_subtitle=current_page_data.iloc[idx]["sentence"], + voice_name=voice_name_original, + original_subtitle=current_page_data.iloc[idx_on_page]["sentence"], annotations=[] ) - dataset.samples = dataset.samples or [] - dataset.samples.append(sample) - + dataset_model.samples = dataset_model.samples or [] + dataset_model.samples.append(sample) + sample.ignore_it = True now = datetime.now() + # Create/update an annotation by CURRENT_USERNAME to mark this action annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) + deleted_text_marker = "AUDIO DELETED (This audio has been removed.)" if annotation: - annotation.annotated_subtitle = "AUDIO DELETED (This audio has been removed.)" - annotation.audio_trims = None + annotation.annotated_subtitle = deleted_text_marker + annotation.audio_trims = None # Clear trims annotation.update_at = now + # Potentially clear review statuses if deletion overrides them else: annotation = Annotation( annotator=CURRENT_USERNAME, - annotated_subtitle="AUDIO DELETED (This audio has been removed.)", + annotated_subtitle=deleted_text_marker, create_at=now, update_at=now ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) + + save_annotations(dataset_model) - save_annotations(dataset) new_status = f"Sample {absolute_idx+1} [Audio deleted]" - if total_samples > 0: - new_status += f" of {total_samples}" - return page_idx, idx, None, annotation.annotated_subtitle, "deleted", new_status, annotation.annotated_subtitle + if total_samples > 0: new_status += f" of {total_samples}" + + # Return values to update UI correctly after deletion + return page_idx, idx_on_page, None, deleted_text_marker, "deleted", new_status, deleted_text_marker + -# Export functions +# Export functions (largely unchanged, ensure CURRENT_USERNAME context if it matters for export) def sanitize_string(s): - if not isinstance(s, str): - s = str(s) - return re.sub(r'[^\w\-\./]', '_', s) + if not isinstance(s, str): s = str(s) + return re.sub(r'[^\w-./]', '_', s) def sanitize_sentence(s): - if not isinstance(s, str): - s = str(s) + if not isinstance(s, str): s = str(s) return s.encode('utf-8', errors='ignore').decode('utf-8') @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) -def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token=None): +def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token_val=None): + if not token_val: + print("Cannot push to hub: No token provided for push_to_hub_with_retry.") + return print(f"Pushing dataset to {repo_id}") - dataset_dict.push_to_hub(repo_id, private=private, token=token) + dataset_dict.push_to_hub(repo_id, private=private, token=token_val) -def export_to_huggingface(repo_name, token, progress=gr.Progress()): +def export_to_huggingface(repo_name_str, hf_token_for_export, progress=gr.Progress()): + # This export logic needs to be carefully reviewed. + # It rebuilds a dataset from HF_DATASET_NAME and applies annotations. + # It should reflect the FINAL state of annotations (e.g., after second phase review if applicable). + # The current logic uses CURRENT_USERNAME for annotation preference, which might not be ideal for a global export. + # It should ideally use the "winning" annotation (e.g., accepted by reviewer, or approved in 2nd phase). + if not hf_token_for_export: + return "Export failed: Hugging Face token is missing." try: start_time = time.time() - repo_name = sanitize_string(repo_name) + repo_name_str = sanitize_string(repo_name_str) print(f"Export started at {time.strftime('%Y-%m-%d %H:%M:%S')}") - dataset = load_saved_annotations() - total_samples_export = get_dataset_info().get('num_samples', -1) - if total_samples_export <= 0: - total_samples_export = 1500 - chunk_size = 100 - num_chunks = (total_samples_export + chunk_size - 1) // chunk_size - print(f"Total samples: {total_samples_export}, chunks: {num_chunks}") - progress(0, f"Total samples: {total_samples_export}, chunks: {num_chunks}") - with tempfile.TemporaryDirectory() as temp_dir: - ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - ds_iter = iter(ds) - all_datasets = [] - processed_samples = 0 + dataset_model_annotations = load_saved_annotations() # Load all annotations + + # Use total_samples from global or re-fetch if necessary. + # The export should process all samples defined by total_samples. + # Let's assume total_samples is the definitive count. + if total_samples <= 0: + return "Export failed: Total number of samples is unknown or invalid." + + # export_total_samples = total_samples + # Using streaming for source, but collecting all data. This can be memory intensive. + # Consider processing in true streaming fashion if dataset is very large. + + ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False) # Load non-streaming for easier iteration up to total_samples + + exported_data_list = [] + progress(0, f"Preparing {total_samples} samples for export...") + + for i, source_sample in enumerate(ds_source): + if i >= total_samples: break # Limit to known total_samples + + absolute_idx = i # Assuming source_sample is ordered and corresponds to index i + + audio_entry = source_sample.get("audio") + sentence_val = source_sample.get("sentence", "") # Default original sentence + + # Determine final audio and sentence based on annotations + audio_dict_to_export = None # Default to no audio if deleted or issue - for chunk_idx in range(num_chunks): - chunk_data = [] - for _ in range(chunk_size): + # Convert audio path/data from source_sample to array for export + # This part is tricky: we need to load audio content. + # For simplicity, this example will re-use get_audio_path and then load if it's a path. + raw_audio_data = None + audio_path_or_data = get_audio_path(audio_entry) + if isinstance(audio_path_or_data, tuple): # Raw audio from get_audio_path + raw_audio_data = {"array": audio_path_or_data[1], "sampling_rate": audio_path_or_data[0]} + elif isinstance(audio_path_or_data, str) and (os.path.exists(audio_path_or_data) or audio_path_or_data.startswith("http")): + # If it's a path, load it. This might be slow. + # For URLs, datasets library handles loading when building Dataset object. + # For local paths, we need to load into array. + if os.path.exists(audio_path_or_data): try: - sample = next(ds_iter) - absolute_idx = chunk_idx * chunk_size + len(chunk_data) - audio_entry = sample["audio"] - audio_dict = None - audio_key = None - - if isinstance(audio_entry, dict) and "array" in audio_entry: - if not np.all(np.isfinite(audio_entry["array"])): - audio_dict = None - else: - audio_dict = { - "array": audio_entry["array"], - "sampling_rate": audio_entry["sampling_rate"] - } - audio_key = audio_entry.get("path", f"sample_{absolute_idx}.mp3") - audio_key = sanitize_string(audio_key) - elif isinstance(audio_entry, str): - if audio_entry.startswith("http://") or audio_entry.startswith("https://"): - audio_dict = None - audio_key = sanitize_string(audio_entry) - else: - resolved_path = get_audio_path(audio_entry) - if os.path.exists(resolved_path): - try: - audio_array, sample_rate = sf.read(resolved_path) - if not np.all(np.isfinite(audio_array)): - audio_dict = None - else: - audio_dict = { - "array": audio_array, - "sampling_rate": sample_rate - } - audio_key = sanitize_string(resolved_path) - except: - audio_dict = None - audio_key = sanitize_string(resolved_path) - else: - audio_dict = None - audio_key = sanitize_string(resolved_path) - else: - audio_dict = None - audio_key = sanitize_string(str(audio_entry)) - - sentence = sample.get("sentence", "") - sample_data = next((s for s in dataset.samples or [] if s.id == absolute_idx), None) - if sample_data: - if sample_data.ignore_it: - audio_dict = None - sentence = "AUDIO DELETED (This audio has been removed.)" - elif sample_data.annotations: - annotation = next((a for a in sample_data.annotations if a.accepted), None) or \ - next((a for a in sample_data.annotations if a.annotator == CURRENT_USERNAME), None) - if annotation and annotation.annotated_subtitle: - sentence = annotation.annotated_subtitle - if annotation.audio_trims: - trimmed_path = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{audio_key}") - if os.path.exists(trimmed_path): - audio_array, sample_rate = sf.read(trimmed_path) - audio_dict = { - "array": audio_array, - "sampling_rate": sample_rate - } + arr, sr = sf.read(audio_path_or_data) + raw_audio_data = {"array": arr, "sampling_rate": sr} + except Exception as e_load: + print(f"Warning: Could not load audio file {audio_path_or_data} for export: {e_load}") + # raw_audio_data remains None + else: # URL + raw_audio_data = audio_path_or_data # Pass URL directly, Audio feature will handle + + audio_dict_to_export = raw_audio_data + + + # Check annotations for this sample + annotation_data = next((s for s in dataset_model_annotations.samples or [] if s.id == absolute_idx), None) + + if annotation_data: + if annotation_data.ignore_it: + sentence_val = "AUDIO DELETED (This audio has been removed.)" + audio_dict_to_export = None # No audio + else: + # Determine the "best" annotation to use + # Priority: 1. Approved in 2nd phase, 2. Accepted in 1st phase by reviewer, 3. Annotator's latest + best_ann = None + if annotation_data.annotations: + # Check for 2nd phase approved + # This needs to find the annotation that WAS approved, not make a new one. + # The original annotator's submission that got approved. + if annotation_data.is_approved_in_second_phase: + # Find which annotation was approved. Iterate through them. + for ann in annotation_data.annotations: + if ann.second_phase_review_status == "approved": + best_ann = ann + break - chunk_data.append({ - "audio": audio_dict, - "sentence": sanitize_sentence(sentence) - }) + if not best_ann: # Check for 1st phase accepted + for ann in annotation_data.annotations: + if ann.is_first_phase_accepted: + best_ann = ann + break - gc.collect() - except StopIteration: - break - - if chunk_data: - chunk_dataset = Dataset.from_list(chunk_data) - chunk_dataset = chunk_dataset.cast_column("audio", Audio()) - chunk_path = os.path.join(temp_dir, f"chunk_{chunk_idx}.parquet") - chunk_dataset.to_parquet(chunk_path) - all_datasets.append(chunk_path) - processed_samples += len(chunk_data) - progress(processed_samples / total_samples_export, f"Processed {processed_samples}/{total_samples_export}") - - del chunk_data - gc.collect() + if not best_ann: # Fallback to any annotation (e.g., latest by timestamp or first found) + # This could be more sophisticated, e.g. latest updated. + # For now, take first one if multiple non-reviewed/accepted exist. + # Or, if a specific user's annotations are primary (e.g. CURRENT_USERNAME if this is a personal export) + # Let's assume any relevant annotation is fine if not formally accepted/approved. + # The original code used CURRENT_USERNAME's annotation. This might be too specific for a general export. + # Let's try to find *any* annotation from the list for the sample if no "accepted" one exists. + if annotation_data.annotations: + best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0] # latest + + if best_ann: + sentence_val = best_ann.annotated_subtitle or sentence_val # Use annotated if available + # Handle trimmed audio if specified in best_ann + if best_ann.audio_trims and audio_dict_to_export: # Only if audio exists + # This part requires that trimmed audio files are accessible and named consistently + # The original trim_audio_action saves to "trimmed_audio/trimmed_{abs_idx}_{voice_name}" + # We need to reconstruct this path or have a direct reference. + # Assuming voice_name is from original sample. + original_voice_name = sanitize_string(os.path.basename(str(get_audio_path(audio_entry) or f"sample_{absolute_idx}"))) + trimmed_path_potential = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{original_voice_name}") + # Ensure extension consistency for look up + if not os.path.splitext(trimmed_path_potential)[1]: trimmed_path_potential += ".wav" # common default + + if os.path.exists(trimmed_path_potential): + try: + arr, sr = sf.read(trimmed_path_potential) + audio_dict_to_export = {"array": arr, "sampling_rate": sr} + except Exception as e_trim_load: + print(f"Warning: Could not load trimmed audio {trimmed_path_potential}: {e_trim_load}") + # audio_dict_to_export remains as original loaded audio + # else: print(f"Trimmed audio path not found: {trimmed_path_potential}") + + exported_data_list.append({ + "audio": audio_dict_to_export, # This will be None if deleted or failed to load + "sentence": sanitize_sentence(sentence_val) + }) - if all_datasets: - combined_dataset = Dataset.from_parquet([p for p in all_datasets]) - dataset_dict = DatasetDict({"train": combined_dataset}) - progress(0.95, "Uploading to Hugging Face...") - push_to_hub_with_retry( - dataset_dict=dataset_dict, - repo_id=repo_name, - private=True, - token=token - ) - print(f"Upload done, total time: {time.time() - start_time:.2f}s") - progress(1.0, "Upload complete!") - return f"Exported to huggingface.co/datasets/{repo_name}" - else: - return "No data to export." + if (i + 1) % 100 == 0: # Progress update + progress((i + 1) / total_samples, f"Processed {i+1}/{total_samples} samples") + gc.collect() + + if not exported_data_list: + return "No data to export after processing." + + # Create Hugging Face Dataset from the collected data + # Filter out entries where audio is None if dataset schema requires audio + # final_export_list = [item for item in exported_data_list if item["audio"] is not None] + # Or handle audio being optional by schema. For Audio(), None might not be allowed if array is mandatory. + # Let's assume for now audio can be None (e.g. deleted). If Audio() cast fails, this needs adjustment. + # The Audio feature expects a path, dict with array/sr, or bytes. None might lead to issues. + # Handling: if audio_dict_to_export is None, replace with a dummy silent audio array or skip sample. + # For now, let's try passing None and see if cast_column handles it gracefully or errors. + # It's safer to ensure 'audio' is always a valid Audio structure or path. + # If audio is None (e.g. ignore_it=True), we should ensure the Audio feature can handle it. + # Typically, you might replace with a path to a very short silent audio file, or an empty array if supported. + + for item in exported_data_list: + if item["audio"] is None: # If audio was marked for deletion / ignore_it + # Provide a placeholder that Audio() can cast, e.g. path to a tiny silent wav or empty array + # For simplicity, if datasets lib allows None for audio feature, this is fine. + # Otherwise, this needs a robust placeholder. + # A common practice is to provide a dictionary with a path to a universally accessible silent file, + # or an empty numpy array for 'array' and a common 'sampling_rate'. + # Let's try with an empty array. + item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Example placeholder + elif isinstance(item["audio"], str): # If it's a URL or path string + # The Audio feature will handle loading this. + pass + elif not (isinstance(item["audio"], dict) and "array" in item["audio"] and "sampling_rate" in item["audio"]): + print(f"Warning: Invalid audio format for export for a sample, replacing with silent audio: {item['audio']}") + item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} + + + final_dataset = Dataset.from_list(exported_data_list) + final_dataset = final_dataset.cast_column("audio", Audio()) # Cast to Audio feature type + + dataset_dict_export = DatasetDict({"train": final_dataset}) + + progress(0.95, "Uploading to Hugging Face...") + push_to_hub_with_retry( + dataset_dict=dataset_dict_export, + repo_id=repo_name_str, + private=True, # Assuming private, can be a parameter + token_val=hf_token_for_export + ) + print(f"Upload done, total time: {time.time() - start_time:.2f}s") + progress(1.0, "Upload complete!") + return f"Exported to huggingface.co/datasets/{repo_name_str}" + except Exception as e: error_msg = f"Export failed: {str(e)}" - print(error_msg) + import traceback + print(f"{error_msg}\n{traceback.format_exc()}") return error_msg + # Login function -def hf_login(hf_token): - global CURRENT_USERNAME +def hf_login(hf_token_val): + global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges + + if not hf_token_val: # If user clears the box and clicks login + return gr.update(visible=True), gr.update(visible=False), "", "", "Login failed: Token cannot be empty." + try: - username = whoami(token=hf_token)['name'] + user_info = whoami(token=hf_token_val) + username = user_info['name'] + if username in ALLOWED_USERS: CURRENT_USERNAME = username - load_page_data(0) - return gr.update(visible=False), gr.update(visible=True), username, hf_token, "Login successful!" + token = hf_token_val # Store the validated token globally for other HF ops + + # Initialize/re-initialize dataset info and ranges based on logged-in user + # This ensures that if total_samples was not fetched, it's attempted again. + ds_info = get_dataset_info() # Sets global total_samples + if total_samples > 0: + annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + if SECOND_PHASE: + initialize_second_phase_assignments() # Depends on ANNOTATORS and their ranges + else: + # Handle case where total_samples is still unknown (critical for ranges) + return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, "Login successful, but failed to get dataset size. Cannot proceed." + + + # Load initial page data for this user + current_page_data = load_page_data(0) # page 0 for the current user + + # Determine initial UI state based on SECOND_PHASE + is_second_phase_active = SECOND_PHASE + + # Update visibility of components based on phase + updates = { + # Phase 1 components + "save_next_button_vis": not is_second_phase_active, + "transcript_interactive": not is_second_phase_active, + "trim_button_vis": not is_second_phase_active, + "undo_trim_button_vis": not is_second_phase_active, + "delete_button_vis": not is_second_phase_active, + "first_phase_accept_cb_vis": (not is_second_phase_active and get_user_role(CURRENT_USERNAME) == "reviewer"), + # Phase 2 components + "approve_button_vis": is_second_phase_active, + "reject_button_vis": is_second_phase_active, + } + + initial_load = load_interface_data(0, 0) # Load data for the first sample (page 0, index 0 on page) + + # Return tuple for outputs matching login_button.click() + # login_container, main_container, reviewer_textbox (as initial state), hf_token_state, login_message, + # then all the visibility/interactivity updates + return ( + gr.update(visible=False), # login_container + gr.update(visible=True), # main_container + initial_load[4], # reviewer_textbox gr.update object (initial_load[4] is reviewer text gr.update) + hf_token_val, # hf_token_state + f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if SECOND_PHASE else 'Annotation'}.", # login_message + + # UI component updates based on phase + gr.update(visible=updates["save_next_button_vis"]), + gr.update(interactive=updates["transcript_interactive"]), # This is for transcript Textarea + gr.update(visible=updates["trim_button_vis"]), + gr.update(visible=updates["undo_trim_button_vis"]), + gr.update(visible=updates["delete_button_vis"]), + gr.update(visible=updates["first_phase_accept_cb_vis"]), + gr.update(visible=updates["approve_button_vis"]), + gr.update(visible=updates["reject_button_vis"]), + + # Initial data for the interface elements from load_interface_data + initial_load[0], # page_idx_state + initial_load[1], # idx_on_page_state + initial_load[2], # audio_player + initial_load[3], # transcript (already includes interactivity) + # initial_load[4] is reviewer, already used above for initial value + initial_load[5], # status_md + initial_load[6], # original_transcript_state + ) + else: - return gr.update(visible=True), gr.update(visible=False), "", hf_token, "User not authorized!" + CURRENT_USERNAME = None + return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, "User not authorized!" except Exception as e: - return gr.update(visible=True), gr.update(visible=False), "", hf_token, f"Login failed: {str(e)}" + CURRENT_USERNAME = None + import traceback + print(f"Login failed: {str(e)}\n{traceback.format_exc()}") + return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, f"Login failed: {str(e)}" -# Set initial values -if len(current_page_data) > 0: - init_values = load_interface(0, 0) -else: - init_values = (0, 0, None, gr.update(value="", interactive=True), gr.update(value="unreviewed", elem_classes=["white"], interactive=False), "No data available.", "") + +# Set initial values for UI elements before login (mostly empty or default) +init_page_idx = 0 +init_idx_on_page = 0 +init_audio_val = None +init_transcript_val = gr.update(value="", interactive=False) # Non-interactive before login +init_reviewer_val = gr.update(value="N/A", interactive=False) +init_status_val = "Please log in." +init_original_text_val = "" # Gradio Interface css = """ -.white { background-color: white; } -.yellow { background-color: yellow; } -.blue { background-color: blue; color: white; } -.green { background-color: green; color: white; } -.pink { background-color: pink; } -.red { background-color: red; color: white; } +.white { background-color: white; color: black; } +.yellow { background-color: yellow; color: black; } +.blue { background-color: lightblue; color: black; } /* Adjusted for readability */ +.green { background-color: lightgreen; color: black; } /* Adjusted for readability */ +.pink { background-color: pink; color: black; } +.red { background-color: #FF7F7F; color: black; } /* Softer red */ +.orange { background-color: orange; color: black; } +.gray { background-color: lightgray; color: black; } +.lightgray { background-color: #f0f0f0; color: black; } /* For very subtle states */ +.reviewer-textbox input { text-align: center; font-weight: bold; } """ -with gr.Blocks(css=css, title="ASR Dataset Labeling with HF Authentication") as demo: - hf_token_state = gr.State("") - +with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: + hf_token_state = gr.State(token) # Store token for export or other uses + + # UI States for navigation and data + current_page_idx_state = gr.State(init_page_idx) + current_idx_on_page_state = gr.State(init_idx_on_page) + original_transcript_state = gr.State(init_original_text_val) # Stores original subtitle from dataset for current item + with gr.Column(visible=True, elem_id="login_container") as login_container: - gr.Markdown("## HF Authentication\nPlease enter your Hugging Face token to proceed.") - hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", placeholder="Enter your HF token") + gr.Markdown("## HF Authentication\nPlease enter your Hugging Face token (read & write permissions).") + hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", placeholder="Enter your HF token", value=token or "") login_button = gr.Button("Login") login_message = gr.Markdown("") - + with gr.Column(visible=False, elem_id="main_container") as main_container: gr.Markdown("# ASR Dataset Labeling Interface") - gr.Markdown("Listen to audio and edit transcriptions. Changes are saved via the Save & Next button.") - with gr.Row(): - current_page_idx = gr.State(value=init_values[0]) - current_idx = gr.State(value=init_values[1]) - original_transcript = gr.State(value=init_values[6]) - with gr.Column(): - audio_player = gr.Audio(value=init_values[2], label="Audio", autoplay=True) - transcript = gr.TextArea(value=init_values[3], label="Transcript", lines=5, placeholder="Edit transcript here...") - reviewer = gr.Textbox(value=init_values[4], label="Reviewer", placeholder="Reviewer (auto-filled)", interactive=False, elem_classes=["white"]) - status = gr.Markdown(value=init_values[5]) + status_md = gr.Markdown(init_status_val) # For status messages like "Sample x of y" with gr.Row(): - prev_button = gr.Button("← Previous") - save_next_button = gr.Button("Save & Next", variant="primary") - next_button = gr.Button("Next") - # Future: Add feedback input for annotators - # feedback = gr.Radio( - # ["approved", "rejected"], - # label="Feedback", - # visible=(get_user_role(CURRENT_USERNAME) == "annotator") - # ) - - with gr.Row(): - trim_start = gr.Textbox(label="Trim Start (seconds)", placeholder="e.g., 1.5") - trim_end = gr.Textbox(label="Trim End (seconds)", placeholder="e.g., 3.0") - trim_button = gr.Button("Trim Audio", variant="primary") - undo_trim_button = gr.Button("Undo Trim") - - with gr.Row(): - delete_button = gr.Button("Delete Audio", variant="stop") - with gr.Row(): - confirm_delete_button = gr.Button("Confirm Delete", visible=False) - cancel_delete_button = gr.Button("Cancel Delete", visible=False) - - with gr.Row(): - jump_text = gr.Textbox(label="Jump to Global Index", placeholder="Enter index number") - jump_button = gr.Button("Jump") - with gr.Row(): - hf_repo_name = gr.Textbox(label="Repository Name (username/dataset-name)", placeholder="e.g., your-username/asr-dataset") - with gr.Row(): - hf_export_button = gr.Button("Export to Hugging Face", variant="primary") - hf_export_status = gr.Markdown("") - - # Event handlers - save_next_button.click( - fn=save_and_next_sample, - inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - next_button.click( - fn=go_next_without_save, - inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - prev_button.click( - fn=prev_sample, - inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - jump_button.click( - fn=jump_to, - inputs=[jump_text, current_page_idx, current_idx, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - trim_button.click( - fn=trim_audio_action, - inputs=[current_page_idx, current_idx, trim_start, trim_end, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - undo_trim_button.click( - fn=undo_trim_action, - inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - delete_button.click( - fn=lambda: (gr.update(visible=True), gr.update(visible=True)), - inputs=None, - outputs=[confirm_delete_button, cancel_delete_button] - ) - confirm_delete_button.click( - fn=confirm_delete_audio, - inputs=[current_page_idx, current_idx, transcript, reviewer, original_transcript], - outputs=[current_page_idx, current_idx, audio_player, transcript, reviewer, status, original_transcript] - ) - confirm_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=confirm_delete_button) - confirm_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=cancel_delete_button) - cancel_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=confirm_delete_button) - cancel_delete_button.click(lambda: gr.update(visible=False), inputs=None, outputs=cancel_delete_button) - - hf_export_button.click(fn=export_to_huggingface, inputs=[hf_repo_name, hf_token_state], outputs=[hf_export_status], queue=False) - + with gr.Column(scale=2): + audio_player = gr.Audio(value=init_audio_val, label="Audio Sample", autoplay=False) # Autoplay off initially + transcript_tb = gr.TextArea(value=init_transcript_val['value'], label="Transcript", lines=5, + interactive=init_transcript_val.get('interactive', False)) + reviewer_tb = gr.Textbox(value=init_reviewer_val['value'], label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"]) + + with gr.Column(scale=1): + gr.Markdown("### Navigation") + prev_button = gr.Button("← Previous") + next_button = gr.Button("Next (no save)") # For first phase + + # Phase 1 Buttons + save_next_button = gr.Button("Save & Next", variant="primary", visible=not SECOND_PHASE) + first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=(not SECOND_PHASE and CURRENT_USERNAME in REVIEWERS if CURRENT_USERNAME else False)) + + # Phase 2 Buttons + approve_button = gr.Button("Approve & Next", variant="primary", visible=SECOND_PHASE) + reject_button = gr.Button("Reject & Next", variant="stop", visible=SECOND_PHASE) + + gr.Markdown("### Audio Tools (Phase 1 only)") + with gr.Row(): + trim_start_tb = gr.Textbox(label="Trim Start (s)", placeholder="e.g., 1.5", scale=1) + trim_end_tb = gr.Textbox(label="Trim End (s)", placeholder="e.g., 3.0", scale=1) + trim_button = gr.Button("Trim Audio", visible=not SECOND_PHASE) + undo_trim_button = gr.Button("Undo Trim", visible=not SECOND_PHASE) + delete_button = gr.Button("Mark Audio as Deleted", variant="stop", visible=not SECOND_PHASE) + # Confirm/Cancel for delete are managed dynamically + + with gr.Accordion("Advanced Navigation & Export", open=False): + with gr.Row(): + jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter index number") + jump_button = gr.Button("Jump") + with gr.Row(): + hf_repo_name_tb = gr.Textbox(label="Export Repository Name (username/dataset-name)", + placeholder=f"{CURRENT_USERNAME}/my-annotated-dataset" if CURRENT_USERNAME else "your-username/asr-dataset") + hf_export_button = gr.Button("Export to Hugging Face", variant="primary") + hf_export_status_md = gr.Markdown("") + + # Define outputs for login_button carefully, matching the hf_login function's return tuple + login_outputs = [ + login_container, main_container, reviewer_tb, hf_token_state, login_message, + # Visibility/interactivity updates + save_next_button, transcript_tb, trim_button, undo_trim_button, delete_button, + first_phase_accept_cb, approve_button, reject_button, + # Initial data load updates + current_page_idx_state, current_idx_on_page_state, audio_player, transcript_tb, # transcript_tb updated twice, once for interactivity, once for value + status_md, original_transcript_state + ] + # Need to ensure transcript_tb gets value update from initial_load too. + # hf_login returns initial_load[3] which is gr.update(value=text, interactive=editable) for transcript. + # So, one update to transcript_tb should be sufficient if it carries both value and interactivity. + login_button.click( fn=hf_login, inputs=[hf_token_input], - outputs=[login_container, main_container, reviewer, hf_token_state, login_message] + outputs=login_outputs + ) + + # Common outputs for navigation and actions + navigation_outputs = [ + current_page_idx_state, current_idx_on_page_state, + audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state + ] + + # Phase 1 actions + save_next_button.click( + fn=save_and_next_sample_first_phase, + inputs=[current_page_idx_state, current_idx_on_page_state, transcript_tb, first_phase_accept_cb], + outputs=navigation_outputs + ) + # 'Next (no save)' button (only for Phase 1) + next_button.click( + fn=go_next_sample_wrapper, # This simple nav doesn't save unsaved changes. User should be aware. + inputs=[current_page_idx_state, current_idx_on_page_state], + outputs=navigation_outputs + ).then( # Add a small JS to clear unsaved changes marker if any (conceptual) + None, None, None, _js="() => { /* Clear unsaved visual cues if any */ }" + ) + + prev_button.click( + fn=go_prev_sample_wrapper, # Similarly, does not auto-save. + inputs=[current_page_idx_state, current_idx_on_page_state], + outputs=navigation_outputs + ) + + # Phase 2 actions + approve_button.click( + fn=review_and_next_sample_second_phase, + inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("approved")], # Pass action string + outputs=navigation_outputs + ) + reject_button.click( + fn=review_and_next_sample_second_phase, + inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("rejected")], # Pass action string + outputs=navigation_outputs ) -demo.launch() + # Audio tools (Phase 1) + trim_button.click( + fn=trim_audio_action, + inputs=[current_page_idx_state, current_idx_on_page_state, trim_start_tb, trim_end_tb], + outputs=navigation_outputs # Outputs audio_player, status_md primarily + ) + undo_trim_button.click( + fn=undo_trim_action, + inputs=[current_page_idx_state, current_idx_on_page_state], + outputs=navigation_outputs + ) + delete_button.click( # This will be a confirmable action + fn=confirm_delete_audio_action, # Direct action for simplicity, could add confirmation dialog + inputs=[current_page_idx_state, current_idx_on_page_state], + outputs=navigation_outputs + ) + + # Jump and Export + jump_button.click( + fn=jump_to_absolute_idx, + inputs=[jump_text_tb, current_page_idx_state, current_idx_on_page_state], + outputs=navigation_outputs + ) + hf_export_button.click( + fn=export_to_huggingface, + inputs=[hf_repo_name_tb, hf_token_state], + outputs=[hf_export_status_md], + queue=True # Export can be long + ) + +# Launch the interface +if __name__ == "__main__": + # For testing, you might want to set SECOND_PHASE here or via environment variable + # Example: os.environ.get("APP_SECOND_PHASE", "False").lower() == "true" + # SECOND_PHASE = True # Force second phase for testing + if SECOND_PHASE: + print("==== APPLICATION RUNNING IN SECOND PHASE (REVIEW MODE) ====") + else: + print("==== APPLICATION RUNNING IN FIRST PHASE (ANNOTATION MODE) ====") + + demo.queue().launch(debug=True, share=False) # Share=True for ngrok link if needed \ No newline at end of file