diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,4 +1,3 @@ - import gradio as gr import os import json @@ -28,29 +27,28 @@ else: # Configuration HF_DATASET_NAME = "navidved/channelb-raw-data" -AUDIO_DIR = "audio" +AUDIO_DIR = "audio" # Not actively used if paths are absolute or in dataset item SAVE_PATH = "annotations.json" -ALLOWED_USERS = ["vargha", "navidved", "userC"] # Added userC for testing 2nd phase with >1 annotator -REVIEWERS = ["vargha"] # First phase reviewers -ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] # First phase annotators +ALLOWED_USERS = ["vargha", "navidved", "userC"] +REVIEWERS = ["vargha"] +ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] CURRENT_USERNAME = None -PAGE_SIZE = 100 # Kept for pagination logic, though review might be sample by sample +PAGE_SIZE = 100 SAVE_INTERVAL = 10 # --- SECOND PHASE CONFIGURATION --- -SECOND_PHASE = True # Set to True to activate second phase review -SECOND_PHASE_REVIEW_MAPPING = {"navidved": "vargha"} # Populated if SECOND_PHASE is True. Maps: reviewer_username -> original_annotator_username -# Example: {"navidved": "userC"} means navidved reviews userC's work +SECOND_PHASE = False # Set to True to activate second phase review +SECOND_PHASE_REVIEW_MAPPING = {} # Populated if SECOND_PHASE is True. Maps: reviewer_username -> original_annotator_username # Global state variables -current_page = 0 -ds_iter = None -current_page_data = None -audio_backup = {} +current_page = 0 # Stores the USER-RELATIVE page index +ds_iter = None # No longer maintained globally for streaming robustness +current_page_data = None # Pandas DataFrame for the current page's data +audio_backup = {} # For undo_trim, if needed (simplified) annotation_count = 0 -unsaved_changes = {} # Primarily for first phase -total_samples = 0 -annotator_ranges = {} # Stores {annotator_username: (start_idx, end_idx)} for first phase +unsaved_changes = {} +total_samples = 0 # Total samples in the HF_DATASET_NAME +annotator_ranges = {} # Stores {annotator_username: (start_abs_idx, end_abs_idx)} # Pydantic data models class AudioTrim(BaseModel): @@ -58,32 +56,27 @@ class AudioTrim(BaseModel): end: float class Annotation(BaseModel): - annotator: str # Original annotator (first phase) + annotator: str annotated_subtitle: Optional[str] = None audio_trims: Optional[List[AudioTrim]] = None - - # First phase review fields is_first_phase_accepted: bool = False first_phase_reviewer_username: Optional[str] = None - - # Second phase review fields second_phase_reviewed_by: Optional[str] = None - second_phase_review_status: Optional[str] = None # "approved" or "rejected" + second_phase_review_status: Optional[str] = None second_phase_review_timestamp: Optional[datetime] = None - create_at: datetime update_at: datetime class Sample(BaseModel): - id: int + id: int # Absolute index in the dataset voice_name: str original_subtitle: str ignore_it: bool = False description: Optional[str] = None annotations: Optional[List[Annotation]] = None - is_approved_in_second_phase: bool = False # True if the primary annotation is approved in 2nd phase + is_approved_in_second_phase: bool = False -class DatasetModel(BaseModel): # Renamed to avoid conflict with datasets.Dataset +class DatasetModel(BaseModel): samples: Optional[List[Sample]] = None # Utility functions @@ -100,7 +93,6 @@ def load_saved_annotations(): # os.remove(SAVE_PATH) # Be cautious with auto-removing dataset_model = None - if dataset_model is None and token: try: hf_path = hf_hub_download( @@ -112,8 +104,7 @@ def load_saved_annotations(): with open(hf_path, "r", encoding="utf-8") as f: data = json.load(f) dataset_model = DatasetModel(**data) - # Cache it locally - with open(SAVE_PATH, "w", encoding="utf-8") as f: + with open(SAVE_PATH, "w", encoding="utf-8") as f: # Cache locally f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) print("Loaded annotations from HF dataset repository and cached locally") except Exception as e: @@ -123,7 +114,6 @@ def load_saved_annotations(): if dataset_model is None: dataset_model = DatasetModel(samples=[]) print("Created new empty DatasetModel for annotations") - return dataset_model def save_annotations(dataset_model: DatasetModel): @@ -132,7 +122,7 @@ def save_annotations(dataset_model: DatasetModel): with open(SAVE_PATH, "w", encoding="utf-8") as f: f.write(dataset_model.model_dump_json(exclude_none=True, indent=4)) print(f"Saved annotations to {SAVE_PATH}") - annotation_count += 1 + annotation_count += 1 # This is a simple counter, not total annotations in file if annotation_count % SAVE_INTERVAL == 0 and token: push_json_to_hf() except Exception as e: @@ -164,73 +154,74 @@ def calculate_annotator_ranges(total_samples_val, annotators_list): extra_samples = total_samples_val % num_annotators ranges = {} - start = 0 + start_idx = 0 for i, annotator in enumerate(annotators_list): - end = start + samples_per_annotator - 1 + end_idx = start_idx + samples_per_annotator - 1 if i < extra_samples: - end += 1 - if end >= total_samples_val: # Ensure end does not exceed total_samples - end = total_samples_val -1 - if start <= end : # Ensure start is not greater than end - ranges[annotator] = (start, end) - start = end + 1 + end_idx += 1 + if end_idx >= total_samples_val: + end_idx = total_samples_val -1 + if start_idx <= end_idx: # Ensure valid range + ranges[annotator] = (start_idx, end_idx) + start_idx = end_idx + 1 + print(f"Calculated annotator ranges: {ranges}") return ranges def initialize_second_phase_assignments(): - global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges - if not ANNOTATORS or len(ANNOTATORS) < 1: # Requires at least 1 annotator to review their own work, or 2 for cross-review + global SECOND_PHASE_REVIEW_MAPPING, annotator_ranges, total_samples + if not ANNOTATORS or len(ANNOTATORS) < 1: print("Not enough annotators for second phase review.") SECOND_PHASE_REVIEW_MAPPING = {} return - # Ensure annotator_ranges is populated if not annotator_ranges and total_samples > 0: - print("Populating annotator_ranges for second phase initialization.") + print("Populating annotator_ranges for second phase initialization (was empty).") annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + elif not annotator_ranges and total_samples <= 0: + print("Warning: Cannot initialize second phase assignments without total_samples and annotator_ranges.") + return if len(ANNOTATORS) == 1: - # Single annotator reviews their own work if that's the desired logic - # Or, this phase might not apply. For now, let's allow self-review. annotator = ANNOTATORS[0] SECOND_PHASE_REVIEW_MAPPING[annotator] = annotator print(f"Second phase: {annotator} will review their own work.") else: - # Cyclic assignment: annotator[i] reviews annotator[i-1]'s work - for i, reviewer_user in enumerate(ANNOTATORS): + for i, reviewer_user in enumerate(ANNOTATORS): # In 2nd phase, ANNOTATORS become reviewers of other ANNOTATORS original_annotator_idx = (i - 1 + len(ANNOTATORS)) % len(ANNOTATORS) original_annotator_user = ANNOTATORS[original_annotator_idx] SECOND_PHASE_REVIEW_MAPPING[reviewer_user] = original_annotator_user print(f"Second phase: {reviewer_user} will review {original_annotator_user}'s work.") - # Verify that original annotators have ranges for reviewer, original_annotator in SECOND_PHASE_REVIEW_MAPPING.items(): if original_annotator not in annotator_ranges: - print(f"Warning: Original annotator {original_annotator} has no range defined in annotator_ranges.") - # This could happen if total_samples was 0 or annotator_ranges wasn't calculated correctly. + print(f"Warning: Original annotator {original_annotator} (being reviewed by {reviewer}) has no range defined in annotator_ranges.") + def get_user_allowed_range(username): global annotator_ranges, total_samples if SECOND_PHASE: - if not SECOND_PHASE_REVIEW_MAPPING: # Ensure it's initialized + if not SECOND_PHASE_REVIEW_MAPPING: initialize_second_phase_assignments() original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(username) if original_annotator_to_review: - # The user `username` is reviewing `original_annotator_to_review`'s work. - # The range is the original work range of `original_annotator_to_review`. - if not annotator_ranges and total_samples > 0: # Lazy init for ranges if needed + if not annotator_ranges and total_samples > 0: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - - return annotator_ranges.get(original_annotator_to_review) - else: # User is not a designated reviewer in the second phase mapping - return None # Or (0,-1) to signify no access + + user_range = annotator_ranges.get(original_annotator_to_review) + # print(f"DEBUG: User {username} (reviewer) gets range of {original_annotator_to_review}: {user_range}") + return user_range + else: + # print(f"DEBUG: User {username} not in SECOND_PHASE_REVIEW_MAPPING or no original annotator assigned.") + return None else: # First Phase Logic - if get_user_role(username) == "reviewer": # First phase reviewers see everything + if get_user_role(username) == "reviewer": return (0, total_samples - 1) if total_samples > 0 else None - elif username in annotator_ranges: # First phase annotators see their assigned range + elif username in annotator_ranges: return annotator_ranges[username] else: + # print(f"DEBUG: User {username} not a reviewer and not in annotator_ranges for first phase.") return None def is_within_range(absolute_idx, allowed_range): @@ -238,171 +229,168 @@ def is_within_range(absolute_idx, allowed_range): return False return allowed_range[0] <= absolute_idx <= allowed_range[1] -def get_user_role(username): # This defines first-phase roles +def get_user_role(username): return "reviewer" if username in REVIEWERS else "annotator" -def init_dataset_iterator(): - global ds_iter - try: - # It's better to load the dataset on demand rather than keeping an iterator open. - # For streaming, iter(load_dataset(...)) is fine if used immediately. - # ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - # ds_iter = iter(ds) - return True - except Exception as e: - print(f"Error initializing dataset iterator: {e}") - return False - -def load_page_data(page_num=0): - global current_page_data, current_page, total_samples - - # For streaming, we re-fetch and skip. - try: - ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) - temp_ds_iter = iter(ds) - except Exception as e: - print(f"Error loading dataset for page data: {e}") - current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page"]) - return current_page_data - - # Determine the actual range of samples the user can see - # This needs to be based on the full dataset indices, not just page logic - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not allowed_range: - print(f"User {CURRENT_USERNAME} has no allowed range.") - current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page"]) - return current_page_data - - # Calculate start and end absolute indices for the requested page, clipped by allowed_range - page_start_abs_idx = page_num * PAGE_SIZE - page_end_abs_idx = page_start_abs_idx + PAGE_SIZE - 1 - - # Effective start and end for fetching, considering user's total allowed range - effective_start_idx = max(page_start_abs_idx, allowed_range[0]) - effective_end_idx = min(page_end_abs_idx, allowed_range[1]) - - samples_on_page = [] - current_absolute_idx = 0 - - # Iterate through the dataset to find samples within the effective range for this page - # This can be slow for large datasets and large page_num with streaming. - # A non-streaming dataset or a more optimized way to seek would be better for large scale. - - idx_counter_for_page = 0 - for i, sample_data in enumerate(temp_ds_iter): - current_absolute_idx = i # Absolute index in the full dataset - - if current_absolute_idx > effective_end_idx : - break # Past the samples needed for this page and user range - - if current_absolute_idx >= effective_start_idx: - # This sample is within the user's allowed range and on the current conceptual page - sample_data['absolute_idx'] = current_absolute_idx - sample_data['id_within_page'] = idx_counter_for_page # relative index on current page view - samples_on_page.append(sample_data) - idx_counter_for_page +=1 - if len(samples_on_page) >= PAGE_SIZE : # Filled the page - break - - current_page = page_num - if samples_on_page: - current_page_data = pd.DataFrame(samples_on_page) - else: - # If no samples found (e.g., page is outside effective range) - current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) - print(f"No samples found for user {CURRENT_USERNAME} on page {page_num} within effective range {effective_start_idx}-{effective_end_idx}") - - gc.collect() - return current_page_data - +# init_dataset_iterator is not needed if we load on demand for streaming +# def init_dataset_iterator(): ... def get_dataset_info(): - global total_samples # Use global total_samples - if total_samples > 0: # If already fetched + global total_samples + if total_samples > 0: return {'num_samples': total_samples} try: - # Temporarily load to get info, can be slow for huge datasets if not streaming - # For streaming, num_examples might be None or -1, so actual iteration might be needed - info = load_dataset(HF_DATASET_NAME, streaming=True, split="train").info - # The 'num_examples' for a streaming dataset split might not be accurate or available. - # It's often -1 or None. You might need a way to get the true total count if it's crucial. - # For now, we'll use it if available, otherwise, it remains a challenge for pure streaming. - if hasattr(info, 'estimated_size') and info.estimated_size is not None: # Check an alternative if num_examples is not good - pass # Not directly number of samples - - # Fallback: iterate to count if num_examples is not reliable - # This is very inefficient and should be avoided if possible. - # A pre-calculated count or a different dataset split might be needed. - # For this example, we'll assume info.splits['train'].num_examples is somewhat usable - # or that a fixed total_samples is set if this is problematic. - - # Simplified: try to get from info, but acknowledge limitations - ds_info_obj = load_dataset(HF_DATASET_NAME, split="train") # Load non-streaming for info + # For streaming=True, info.num_examples might be unreliable. + # Load non-streaming just for info, then can use streaming for actual data. + ds_info_obj = load_dataset(HF_DATASET_NAME, split="train", streaming=False) # Load non-streaming for info num_samples_val = ds_info_obj.num_rows if num_samples_val and num_samples_val > 0: total_samples = num_samples_val + print(f"Dataset info: total_samples set to {total_samples}") return {'num_samples': total_samples} - - # If still no count, this is an issue for range calculations. - # For now, return -1, but this will break range logic. - print("Warning: Could not reliably determine total_samples from dataset info.") - return {'num_samples': -1} + else: # Fallback if num_rows is not reliable + print("Warning: ds_info_obj.num_rows was not positive. Trying iteration for count (may be slow).") + # Iterating a large streaming dataset to count is very inefficient. + # Consider alternative ways to get total_samples (e.g., hardcode, separate metadata file). + # For this example, if num_rows fails, we'll proceed with caution. + # If it's critical, this part needs a robust solution for getting total_samples. + ds_stream = load_dataset(HF_DATASET_NAME, split="train", streaming=True) + count = 0 + for _ in ds_stream: + count +=1 + if count > 0: + total_samples = count + print(f"Dataset info: total_samples set to {total_samples} by iteration.") + return {'num_samples': total_samples} + else: + print("Warning: Could not determine total_samples from dataset info or iteration.") + total_samples = -1 # Indicate failure + return {'num_samples': -1} except Exception as e: print(f"Error getting dataset info: {e}") + total_samples = -1 return {'num_samples': -1} - # Initial data load (moved after functions it calls are defined) -# init_dataset_iterator() # Iterator not maintained globally anymore for streaming robustness dataset_info = get_dataset_info() # This sets global total_samples if total_samples > 0: annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - if SECOND_PHASE: - initialize_second_phase_assignments() # Initialize after annotator_ranges might be populated + if SECOND_PHASE: # Initialize only if SECOND_PHASE is true from the start + initialize_second_phase_assignments() else: print("Warning: total_samples is not positive. Annotation ranges and second phase assignments may be incorrect.") annotator_ranges = {} -# Load first page data for the initial user if any -# This should happen after login when CURRENT_USERNAME is set. -# current_page_data = load_page_data(0) # Moved to hf_login success path - def get_audio_path(audio_entry): if isinstance(audio_entry, dict): if "array" in audio_entry and "sampling_rate" in audio_entry: - return (audio_entry["sampling_rate"], audio_entry["array"]) # Return tuple for direct use + return (audio_entry["sampling_rate"], audio_entry["array"]) return audio_entry.get("path", None) if isinstance(audio_entry, str): if audio_entry.startswith("http://") or audio_entry.startswith("https://"): - return audio_entry # URL - if os.path.exists(audio_entry): # Absolute path return audio_entry - # Relative path (try joining with AUDIO_DIR if one is configured) - if AUDIO_DIR: + if os.path.exists(audio_entry): + return audio_entry + if AUDIO_DIR: # Not strictly necessary if paths are always absolute/in dataset joined_path = os.path.join(AUDIO_DIR, audio_entry) if os.path.exists(joined_path): return joined_path - return audio_entry # Return as is, might be a relative path resolvable by datasets - return None # Or handle unknown type + return audio_entry # Return as is (e.g. relative path for datasets lib) + return None + +def load_page_data(page_num_within_user_view=0): + global current_page_data, current_page, total_samples + + try: + ds = load_dataset(HF_DATASET_NAME, split="train", streaming=True) + temp_ds_iter = iter(ds) + except Exception as e: + print(f"Error loading dataset for page data: {e}") + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) + return current_page_data + + user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not user_allowed_range: + print(f"User {CURRENT_USERNAME} has no allowed range.") + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) + return current_page_data + + user_start_abs, user_end_abs = user_allowed_range + num_samples_for_user = user_end_abs - user_start_abs + 1 + + if num_samples_for_user <= 0: + print(f"User {CURRENT_USERNAME} has an invalid or empty allowed range: {user_allowed_range} (num_samples_for_user: {num_samples_for_user})") + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) + return current_page_data + + effective_start_idx = user_start_abs + (page_num_within_user_view * PAGE_SIZE) + + if effective_start_idx > user_end_abs: + print(f"Requested page {page_num_within_user_view} (abs start {effective_start_idx}) is beyond user {CURRENT_USERNAME}'s allowed samples end ({user_end_abs}).") + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) + current_page = page_num_within_user_view # Still update current_page to reflect the attempt + return current_page_data + + effective_end_idx = min(effective_start_idx + PAGE_SIZE - 1, user_end_abs) + + samples_on_page = [] + idx_counter_for_page = 0 + + print(f"Loading page {page_num_within_user_view} for user {CURRENT_USERNAME}. Effective absolute range for this page: [{effective_start_idx}-{effective_end_idx}] from user range [{user_start_abs}-{user_end_abs}]") + + current_dataset_absolute_idx = -1 + skipped_count = 0 + for sample_data in temp_ds_iter: + current_dataset_absolute_idx += 1 + + if current_dataset_absolute_idx < effective_start_idx: + skipped_count += 1 + if skipped_count % 1000 == 0: # Log progress of skipping if it's a lot + print(f" Skipping... at abs_idx {current_dataset_absolute_idx}, target start {effective_start_idx}") + continue + + if current_dataset_absolute_idx > effective_end_idx: + break + + sample_data['absolute_idx'] = current_dataset_absolute_idx + sample_data['id_within_page'] = idx_counter_for_page + samples_on_page.append(sample_data) + idx_counter_for_page +=1 + + if skipped_count > 0: print(f" Finished skipping {skipped_count} samples.") + + current_page = page_num_within_user_view + + if samples_on_page: + current_page_data = pd.DataFrame(samples_on_page) + print(f"Loaded {len(samples_on_page)} samples for page {page_num_within_user_view}. First abs_idx: {samples_on_page[0]['absolute_idx']}, Last abs_idx: {samples_on_page[-1]['absolute_idx']}.") + else: + current_page_data = pd.DataFrame(columns=["audio", "sentence", "id_within_page", "absolute_idx"]) + print(f"No samples found for user {CURRENT_USERNAME} on their page {page_num_within_user_view} (effective absolute range {effective_start_idx}-{effective_end_idx})") + + gc.collect() + return current_page_data # Core functions def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_action, accepted_flag=False): global current_page_data, unsaved_changes - if current_page_data is None or idx_on_page >= len(current_page_data): + if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return "Invalid index or data not loaded for current page." actual_sample_info = current_page_data.iloc[idx_on_page] absolute_idx = actual_sample_info['absolute_idx'] - # First phase saving logic - allowed_range = get_user_allowed_range(current_user_performing_action) - if not is_within_range(absolute_idx, allowed_range) and not SECOND_PHASE: # In 2nd phase, this check is implicitly handled by page loading - return "You are not allowed to annotate this sample (out of range)." + # In first phase, range check is important. + # In second phase, page loading itself should restrict to allowed samples. + if not SECOND_PHASE: + allowed_range = get_user_allowed_range(current_user_performing_action) + if not is_within_range(absolute_idx, allowed_range): + return f"You are not allowed to annotate this sample {absolute_idx} (out of range {allowed_range})." - audio_entry_original = actual_sample_info["audio"] # This might be path or dict + audio_entry_original = actual_sample_info["audio"] voice_name = os.path.basename(str(get_audio_path(audio_entry_original) or f"sample_{absolute_idx}")) dataset_model = load_saved_annotations() @@ -419,45 +407,57 @@ def save_sample_data(page_idx, idx_on_page, transcript, current_user_performing_ dataset_model.samples.append(sample) now = datetime.now() - # In the first phase, current_user_performing_action is the annotator or reviewer. - # 'accepted_flag' is used if current_user_performing_action is a first-phase reviewer. - annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) + annotation = next((a for a in sample.annotations or [] if a.annotator == current_user_performing_action), None) # Find by actual annotator - if annotation: - annotation.annotated_subtitle = transcript.strip() - annotation.update_at = now - if get_user_role(current_user_performing_action) == "reviewer": # First phase reviewer + if get_user_role(current_user_performing_action) == "reviewer" and not SECOND_PHASE : # First phase reviewer action + # Reviewer might be acting on another's annotation or making their own. + # If accepted_flag is true, they are "accepting" *some* annotation for this sample. + # The current model is: if a reviewer saves, their input becomes *an* annotation. + # If they check "accept", this annotation is marked as accepted. + if annotation: + annotation.annotated_subtitle = transcript.strip() + annotation.update_at = now annotation.is_first_phase_accepted = accepted_flag annotation.first_phase_reviewer_username = current_user_performing_action if accepted_flag else None - else: - new_annotation_data = { - "annotator": current_user_performing_action, - "annotated_subtitle": transcript.strip(), - "create_at": now, - "update_at": now, - "is_first_phase_accepted": False # Default - } - if get_user_role(current_user_performing_action) == "reviewer": - new_annotation_data["is_first_phase_accepted"] = accepted_flag - if accepted_flag: - new_annotation_data["first_phase_reviewer_username"] = current_user_performing_action - - annotation = Annotation(**new_annotation_data) - sample.annotations = sample.annotations or [] - sample.annotations.append(annotation) + else: + annotation = Annotation( + annotator=current_user_performing_action, + annotated_subtitle=transcript.strip(), + create_at=now, + update_at=now, + is_first_phase_accepted=accepted_flag, + first_phase_reviewer_username=current_user_performing_action if accepted_flag else None + ) + sample.annotations = sample.annotations or [] + sample.annotations.append(annotation) + else: # Annotator in first phase, or any user in a context where the simple save applies + if annotation: + annotation.annotated_subtitle = transcript.strip() + annotation.update_at = now + # Annotators cannot set first_phase_accepted themselves + else: + annotation = Annotation( + annotator=current_user_performing_action, + annotated_subtitle=transcript.strip(), + create_at=now, + update_at=now, + is_first_phase_accepted=False # Default for new annotations by non-reviewers or non-accepting reviewers + ) + sample.annotations = sample.annotations or [] + sample.annotations.append(annotation) - if absolute_idx in unsaved_changes: + if absolute_idx in unsaved_changes: # Clear if it was marked as unsaved del unsaved_changes[absolute_idx] save_annotations(dataset_model) return f"✓ Saved annotation for sample {absolute_idx}" -def handle_second_phase_action(page_idx, idx_on_page, action: str): # action is "approved" or "rejected" +def handle_second_phase_action(page_idx, idx_on_page, action: str): global current_page_data, CURRENT_USERNAME if not SECOND_PHASE: return "Not in second phase." - if current_page_data is None or idx_on_page >= len(current_page_data): + if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): return "Invalid index or data not loaded for current page (second phase)." actual_sample_info = current_page_data.iloc[idx_on_page] @@ -465,36 +465,26 @@ def handle_second_phase_action(page_idx, idx_on_page, action: str): # action is original_annotator_to_review = SECOND_PHASE_REVIEW_MAPPING.get(CURRENT_USERNAME) if not original_annotator_to_review: - return "You are not assigned to review any user's work." + return f"User {CURRENT_USERNAME} is not assigned to review any user's work in SECOND_PHASE_REVIEW_MAPPING." dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if not sample: - # This case should ideally not happen if data is consistent. - # If it does, it means the sample exists in source dataset but not annotations.json. - # A reviewer in 2nd phase is reviewing existing annotation. return f"Error: Sample {absolute_idx} not found in annotations.json for review." - # Find the annotation made by the original_annotator_to_review annotation_to_review = next((ann for ann in sample.annotations or [] if ann.annotator == original_annotator_to_review), None) if not annotation_to_review: - # If original annotator did not make an annotation for this sample. - # Option 1: Create a placeholder annotation based on original_subtitle and review that. - # Option 2: Report error. For now, report error. - # This implies the first phase annotator skipped this item or it wasn't in their range correctly. - print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Reviewing original subtitle implicitly.") - # Let's create one if missing, based on original subtitle + print(f"Warning: No prior annotation by {original_annotator_to_review} for sample {absolute_idx}. Creating placeholder for review.") annotation_to_review = Annotation( annotator=original_annotator_to_review, - annotated_subtitle=sample.original_subtitle, # Use original subtitle - create_at=sample.annotations[0].create_at if sample.annotations else datetime.now(), # Approx original creation + annotated_subtitle=sample.original_subtitle, + create_at=datetime.now(), # Or try to find other annotation's timestamp update_at=datetime.now() ) sample.annotations = sample.annotations or [] sample.annotations.append(annotation_to_review) - annotation_to_review.second_phase_reviewed_by = CURRENT_USERNAME annotation_to_review.second_phase_review_status = action annotation_to_review.second_phase_review_timestamp = datetime.now() @@ -502,18 +492,18 @@ def handle_second_phase_action(page_idx, idx_on_page, action: str): # action is if action == "approved": sample.is_approved_in_second_phase = True - # If rejected, is_approved_in_second_phase could be set to False, or depend on other conditions. - # For now, only explicit approval sets it to True. + # else: # If rejected, is_approved_in_second_phase remains as is or set to False + # sample.is_approved_in_second_phase = False # Explicitly set to False on rejection save_annotations(dataset_model) return f"✓ Review ({action}) saved for sample {absolute_idx} (Original annotator: {original_annotator_to_review})" -def get_sample(page_idx, idx_on_page, current_user_displaying): # current_user_displaying is CURRENT_USERNAME - global current_page_data, unsaved_changes, total_samples +def get_sample(page_idx_user_relative, idx_on_page, current_user_displaying): + global current_page_data, total_samples if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): - return None, "", f"Invalid index. Range is 0-{len(current_page_data)-1}", "unreviewed", "white", True, False, "" + return None, "", f"Invalid index ({idx_on_page}) for current page data (len {len(current_page_data) if current_page_data is not None else 'None'}).", "unreviewed", "white", True, False, "" actual_sample_info = current_page_data.iloc[idx_on_page] absolute_idx = actual_sample_info['absolute_idx'] @@ -524,14 +514,22 @@ def get_sample(page_idx, idx_on_page, current_user_displaying): # current_user_d default_transcript = actual_sample_info["sentence"] transcript_to_display = default_transcript - # UI states - ui_reviewer_field = "unreviewed" # Textbox showing who annotated/reviewed + ui_reviewer_field = "unreviewed" ui_color = "white" - ui_editable = True # Transcript text area - ui_is_accepted_flag = False # For first phase checkmark logic, or second phase display - ui_status_message = f"Sample {absolute_idx+1}" - if total_samples > 0: - ui_status_message += f" of {total_samples}" + ui_editable = True + ui_is_accepted_flag = False + + # Build status message + status_prefix = "" + user_allowed_range = get_user_allowed_range(current_user_displaying) + if user_allowed_range: + user_start_abs, user_end_abs = user_allowed_range + current_sample_num_in_user_assignment = absolute_idx - user_start_abs + 1 + total_samples_for_user = user_end_abs - user_start_abs + 1 + status_prefix = f"Sample {current_sample_num_in_user_assignment} of {total_samples_for_user} for you (Abs Idx {absolute_idx})." + else: + status_prefix = f"Sample (Abs Idx {absolute_idx})." + dataset_model = load_saved_annotations() sample_from_json = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) @@ -545,355 +543,382 @@ def get_sample(page_idx, idx_on_page, current_user_displaying): # current_user_d ui_editable = False elif SECOND_PHASE: - ui_editable = False # Transcript not editable in 2nd phase + ui_editable = False original_annotator_being_reviewed = SECOND_PHASE_REVIEW_MAPPING.get(current_user_displaying) - if not original_annotator_being_reviewed: # Should not happen if UI is controlled properly - transcript_to_display = "Error: User not in review mapping." + if not original_annotator_being_reviewed: + transcript_to_display = "Error: You are not mapped to review any user." ui_color = "red" + ui_reviewer_field = "Error" else: ui_reviewer_field = f"Reviewing: {original_annotator_being_reviewed}" annotation_under_review = next((ann for ann in sample_from_json.annotations or [] if ann.annotator == original_annotator_being_reviewed), None) if annotation_under_review: transcript_to_display = annotation_under_review.annotated_subtitle or default_transcript - ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved") + ui_is_accepted_flag = (annotation_under_review.second_phase_review_status == "approved") # Reflects the action by THIS reviewer IF they reviewed if annotation_under_review.second_phase_reviewed_by: if annotation_under_review.second_phase_reviewed_by == current_user_displaying: - ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange" # orange for rejected by current user - else: # Reviewed by someone else + ui_color = "green" if annotation_under_review.second_phase_review_status == "approved" else "orange" + else: ui_color = "gray" - ui_reviewer_field += f" (Reviewed by {annotation_under_review.second_phase_reviewed_by})" - else: # Pending review by current_user_displaying - ui_color = "yellow" # Indicates pending current user's review - else: # No annotation from original annotator for this sample - transcript_to_display = default_transcript # Show original dataset subtitle - ui_reviewer_field += " (Original annotator made no submission)" - ui_color = "lightgray" # Needs review, but based on original + ui_reviewer_field += f" (Already reviewed by {annotation_under_review.second_phase_reviewed_by} as {annotation_under_review.second_phase_review_status})" + else: + ui_color = "yellow" # Pending this user's review + else: + transcript_to_display = default_transcript + ui_reviewer_field += " (No submission by original annotator)" + ui_color = "lightgray" else: # First Phase Logic - # Check for an accepted annotation by a first-phase reviewer accepted_first_phase_annotation = next((a for a in sample_from_json.annotations or [] if a.is_first_phase_accepted and a.first_phase_reviewer_username), None) if accepted_first_phase_annotation: transcript_to_display = accepted_first_phase_annotation.annotated_subtitle or default_transcript - ui_reviewer_field = accepted_first_phase_annotation.first_phase_reviewer_username + ui_reviewer_field = f"Accepted by: {accepted_first_phase_annotation.first_phase_reviewer_username}" ui_color = "green" ui_is_accepted_flag = True - ui_editable = (get_user_role(current_user_displaying) == "reviewer") # Only 1st phase reviewer can edit accepted + ui_editable = (get_user_role(current_user_displaying) == "reviewer") else: - # Check for annotation by the current user (annotator or reviewer) user_specific_annotation = next((a for a in sample_from_json.annotations or [] if a.annotator == current_user_displaying), None) if user_specific_annotation: transcript_to_display = user_specific_annotation.annotated_subtitle or default_transcript - ui_reviewer_field = user_specific_annotation.annotator - ui_color = "yellow" if absolute_idx not in unsaved_changes else "pink" + ui_reviewer_field = f"Your draft (as {user_specific_annotation.annotator})" + ui_color = "yellow" ui_editable = True else: - # Check for annotations by other annotators (not current user, not accepted by reviewer) - # Display the first one found for a reviewer to potentially act on, or inform annotator - other_annotations = [a for a in sample_from_json.annotations or [] if a.annotator != current_user_displaying and not a.is_first_phase_accepted] + other_annotations = [a for a in sample_from_json.annotations or [] if not a.is_first_phase_accepted] # any unaccepted if other_annotations: - # If current user is a reviewer, they see the other annotator's work + # If current user is reviewer, they see the first other annotator's work if get_user_role(current_user_displaying) == "reviewer": other_ann_to_show = other_annotations[0] transcript_to_display = other_ann_to_show.annotated_subtitle or default_transcript - ui_reviewer_field = other_ann_to_show.annotator - ui_color = "blue" # Reviewer sees other's work + ui_reviewer_field = f"Draft by: {other_ann_to_show.annotator}" + ui_color = "blue" ui_editable = True - else: # Current user is an annotator, and another annotator worked on it - # This state is a bit ambiguous. Default to original if not assigned to this user. - # For simplicity, show original if it's not their saved work. - transcript_to_display = default_transcript - ui_reviewer_field = "labeled by another annotator" + else: # Annotator sees "labeled by another" if they didn't do it + transcript_to_display = default_transcript # Show original if not their work + ui_reviewer_field = f"Labeled by: {other_annotations[0].annotator}" ui_color = "lightblue" - ui_editable = False # Annotator cannot edit other annotator's unreviewed work - else: # No annotations at all, or only unreviewed by others and user is annotator - if absolute_idx in unsaved_changes: - transcript_to_display = unsaved_changes[absolute_idx] - ui_reviewer_field = current_user_displaying - ui_color = "pink" - ui_editable = True - # else, default_transcript, unreviewed, white, editable=True (already set) + ui_editable = False # Cannot edit others' unreviewed work if you are also annotator + # else default_transcript, unreviewed, white, editable=True already set - # If no sample_from_json, then it's a fresh sample from dataset - # transcript_to_display remains default_transcript. ui states remain default. - # This case is hit if annotations.json doesn't have this absolute_idx yet. - - # Status message update - current_page_for_status = page_idx + 1 # page_idx is 0-indexed - # If current_page_data has 'absolute_idx', we can use that - # page_num_from_abs = (absolute_idx // PAGE_SIZE) + 1 - - ui_status_message = f"{ui_status_message} - Page {current_page_for_status}" - if SECOND_PHASE : + # If absolute_idx in unsaved_changes, it's a visual cue, actual text is already from above logic. + if not SECOND_PHASE and absolute_idx in unsaved_changes: + ui_color = "pink" # Overrides previous color if unsaved changes by current user + + ui_status_message = f"{status_prefix} Page {page_idx_user_relative + 1} (User-view)." + if SECOND_PHASE: ui_status_message += " (Review Phase)" else: ui_status_message += " (Annotation Phase)" + + # For reviewer checkbox in first phase + show_accept_checkbox = not SECOND_PHASE and get_user_role(current_user_displaying) == "reviewer" - - return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript + return audio_val, transcript_to_display, ui_status_message, ui_reviewer_field, ui_color, ui_editable, ui_is_accepted_flag, default_transcript, gr.update(visible=show_accept_checkbox) -def load_interface_data(page_idx, idx_on_page): # Renamed from load_interface to avoid conflict - # This function is primarily a wrapper around get_sample for UI updates - audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) +def load_interface_data(page_idx_user_relative, idx_on_page): + audio, text, base_status, saved_reviewer_text, color, editable, accepted_flag, original_dataset_text, accept_cb_update = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) - # Audio backup logic (can be simplified or removed if not strictly needed for undo_trim) - # absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] if current_page_data is not None and idx_on_page < len(current_page_data) else -1 - # audio_entry_original = current_page_data.iloc[idx_on_page]["audio"] if current_page_data is not None and idx_on_page < len(current_page_data) else "" - # key = f"{absolute_idx}_{os.path.basename(str(get_audio_path(audio_entry_original) or 'unknown'))}" - # if key not in audio_backup and audio is not None: # Backup the audio playable value - # audio_backup[key] = audio - return ( - page_idx, # current_page_idx state - idx_on_page, # current_idx_on_page state - audio, # audio_player value - gr.update(value=text, interactive=editable), # transcript update - gr.update(value=saved_reviewer_text, elem_classes=[color]), # reviewer Textbox update - base_status, # status markdown update - original_dataset_text # original_transcript state + page_idx_user_relative, + idx_on_page, + audio, + gr.update(value=text, interactive=editable), + gr.update(value=saved_reviewer_text, elem_classes=[color]), + base_status, + original_dataset_text, + accept_cb_update, # For the first_phase_accept_cb visibility + accepted_flag # For the first_phase_accept_cb value state ) -# Navigation functions -def navigate_sample(page_idx, idx_on_page, direction: int): # direction: 1 for next, -1 for prev - global current_page_data, total_samples +def navigate_sample(page_idx_user_relative, idx_on_page, direction: int): + global current_page_data if current_page_data is None or len(current_page_data) == 0: - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No data loaded.", gr.update() + # This case might happen if initial load failed or user has no samples + user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) + err_msg = "No data loaded. Try reloading or check your assigned range." + if not user_allowed_range or user_allowed_range[0] > user_allowed_range[1]: + err_msg = "You have no samples assigned or your range is invalid." + + # Return a state that indicates error but doesn't crash UI + # The number of outputs must match `navigation_outputs_extended` + return page_idx_user_relative, idx_on_page, None, gr.update(value="Error", interactive=False), gr.update(value="Error"), err_msg, "", gr.update(visible=False), False + target_idx_on_page = idx_on_page + direction - - new_page_idx = page_idx + new_page_idx_user_relative = page_idx_user_relative new_idx_on_page = target_idx_on_page - if target_idx_on_page < 0: # Need to go to previous page - if page_idx > 0: - new_page_idx = page_idx - 1 - # Load new page data and set index to last item - temp_data = load_page_data(new_page_idx) + user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not user_allowed_range: # Should not happen if page data exists, but as a safeguard + return page_idx_user_relative, idx_on_page, gr.update(), gr.update(), gr.update(), "Error: No allowed range for navigation.", gr.update(), gr.update(visible=False), False + + + if target_idx_on_page < 0: + if page_idx_user_relative > 0: + new_page_idx_user_relative = page_idx_user_relative - 1 + temp_data = load_page_data(new_page_idx_user_relative) # This updates global current_page_data if temp_data is not None and not temp_data.empty: new_idx_on_page = len(temp_data) - 1 - else: # Previous page is empty or out of allowed range - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No more samples in this direction (prev page).", gr.update() - else: # Already on first item of first page - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "At the beginning of your assigned samples.", gr.update() + else: # Prev user-relative page is empty (should not happen if page_idx_user_relative > 0 and ranges are correct) + # Stay on current sample, show message + audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) + status = status + " [Already at the first sample of this page/range]" + return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag + else: # Already on first item of first user-relative page + audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) + status = status + " [At the beginning of your assigned samples]" + return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag - elif target_idx_on_page >= len(current_page_data): # Need to go to next page - new_page_idx = page_idx + 1 - temp_data = load_page_data(new_page_idx) # load_page_data updates current_page_data + elif target_idx_on_page >= len(current_page_data): + # Try to go to next user-relative page + new_page_idx_user_relative = page_idx_user_relative + 1 + temp_data = load_page_data(new_page_idx_user_relative) # Updates global current_page_data if temp_data is not None and not temp_data.empty: new_idx_on_page = 0 - else: # Next page is empty or out of allowed range - # Check if we are at the very end of the allowed samples - allowed_range = get_user_allowed_range(CURRENT_USERNAME) + else: # Next user-relative page is empty (means we are at the end of user's allowed samples) current_abs_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - if allowed_range and current_abs_idx >= allowed_range[1]: - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "At the end of your assigned samples.", gr.update() - else: - return page_idx, idx_on_page, gr.update(), gr.update(), gr.update(), "No more samples in this direction (next page).", gr.update() - - # If we switched page, current_page_data is already updated by load_page_data. - # If staying on same page, it's fine. - return load_interface_data(new_page_idx, new_idx_on_page) + is_at_very_end = user_allowed_range and current_abs_idx >= user_allowed_range[1] + + audio, text, status, rev, color, edit, acc_flag, orig_text, cb_vis = get_sample(page_idx_user_relative, idx_on_page, CURRENT_USERNAME) + if is_at_very_end: + status = status + " [At the end of your assigned samples]" + else: # Should ideally not be hit if temp_data is empty after trying next page + status = status + " [No more samples in this direction (next page empty)]" + return page_idx_user_relative, idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status, orig_text, cb_vis, acc_flag + return load_interface_data(new_page_idx_user_relative, new_idx_on_page) -def go_next_sample_wrapper(page_idx, idx_on_page): # Simpler wrapper for UI - return navigate_sample(page_idx, idx_on_page, 1) -def go_prev_sample_wrapper(page_idx, idx_on_page): # Simpler wrapper for UI - return navigate_sample(page_idx, idx_on_page, -1) +def go_next_sample_wrapper(page_idx_user_relative, idx_on_page): + return navigate_sample(page_idx_user_relative, idx_on_page, 1) +def go_prev_sample_wrapper(page_idx_user_relative, idx_on_page): + return navigate_sample(page_idx_user_relative, idx_on_page, -1) -def save_and_next_sample_first_phase(page_idx, idx_on_page, current_text, is_accepted_by_reviewer_flag): - # Note: `current_annotator_ui` (reviewer textbox value) is not who is performing action. - # CURRENT_USERNAME is performing the action. - # `is_accepted_by_reviewer_flag` is the checkbox state (true/false) if user is a reviewer. - # If user is an annotator, this flag might not be directly applicable or always false from UI. - - # Determine if the current user is acting as a first-phase reviewer to use the 'accepted' flag + +def save_and_next_sample_first_phase(page_idx_user_relative, idx_on_page, current_text, is_accepted_by_reviewer_flag): user_is_reviewer = get_user_role(CURRENT_USERNAME) == "reviewer" - save_msg = save_sample_data(page_idx, idx_on_page, current_text, CURRENT_USERNAME, - accepted_flag=is_accepted_by_reviewer_flag if user_is_reviewer else False) - print(save_msg) # Log save message - # Then navigate - return navigate_sample(page_idx, idx_on_page, 1) + accepted_to_save = is_accepted_by_reviewer_flag if user_is_reviewer else False + + save_msg = save_sample_data(page_idx_user_relative, idx_on_page, current_text, CURRENT_USERNAME, accepted_flag=accepted_to_save) + print(save_msg) + + # After saving, navigate to the next sample + # The navigation outputs will reload the UI for the next sample + return navigate_sample(page_idx_user_relative, idx_on_page, 1) -def review_and_next_sample_second_phase(page_idx, idx_on_page, review_action: str): - feedback_msg = handle_second_phase_action(page_idx, idx_on_page, review_action) - print(feedback_msg) # Log feedback message - # Then navigate - return navigate_sample(page_idx, idx_on_page, 1) +def review_and_next_sample_second_phase(page_idx_user_relative, idx_on_page, review_action: str): + feedback_msg = handle_second_phase_action(page_idx_user_relative, idx_on_page, review_action) + print(feedback_msg) + return navigate_sample(page_idx_user_relative, idx_on_page, 1) -def jump_to_absolute_idx(target_abs_idx_str, current_page_idx, current_idx_on_page): # Removed unused text/annotator params +def jump_to_absolute_idx(target_abs_idx_str, current_page_idx_user_relative, current_idx_on_page): global current_page_data + try: target_abs_idx = int(target_abs_idx_str) if target_abs_idx < 0: target_abs_idx = 0 - allowed_range = get_user_allowed_range(CURRENT_USERNAME) - if not is_within_range(target_abs_idx, allowed_range): - status_msg = f"Target index {target_abs_idx} is outside your assigned range {allowed_range}." - # Return current state with error message - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - - new_page_idx = target_abs_idx // PAGE_SIZE - new_idx_on_page_conceptual = target_abs_idx % PAGE_SIZE # This is index on the conceptual new page + user_allowed_range = get_user_allowed_range(CURRENT_USERNAME) + if not user_allowed_range or not is_within_range(target_abs_idx, user_allowed_range): + status_msg = f"Target index {target_abs_idx} is outside your assigned range {user_allowed_range or 'N/A'}." + audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc + + user_start_abs, _ = user_allowed_range + offset_from_user_start = target_abs_idx - user_start_abs + + if offset_from_user_start < 0: + status_msg = f"Logic Error: Target index {target_abs_idx} has negative offset from user start {user_start_abs}." + print(status_msg) + audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc - # Load data for the new page - temp_page_data = load_page_data(new_page_idx) # This updates global current_page_data + new_user_relative_page_idx = offset_from_user_start // PAGE_SIZE + temp_page_data_df = load_page_data(new_user_relative_page_idx) # This updates global current_page_data - if temp_page_data is None or temp_page_data.empty: - status_msg = f"No data found for page {new_page_idx} containing index {target_abs_idx}." - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + if temp_page_data_df is None or temp_page_data_df.empty: + status_msg = f"No data found for your page {new_user_relative_page_idx} (containing abs index {target_abs_idx})." + audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc - # Find the actual index on the loaded page for target_abs_idx - # The loaded page might not start exactly at new_page_idx * PAGE_SIZE if user's range is small. - # `load_page_data` now adds 'absolute_idx' and 'id_within_page' to `current_page_data` - - # Find the row with the matching absolute_idx in the newly loaded current_page_data matching_rows = current_page_data[current_page_data['absolute_idx'] == target_abs_idx] if not matching_rows.empty: - new_idx_on_page_actual = matching_rows.index[0] # This is the DataFrame index, should be same as 'id_within_page' + new_idx_on_page_actual = matching_rows.iloc[0]['id_within_page'] else: - # This means target_abs_idx, though in allowed_range, was not on the loaded page (e.g. page is sparse due to filtering) - # Fallback: load the first item of the page if target not found directly. - # Or better, report an issue. - status_msg = f"Index {target_abs_idx} is in range, but not found on page {new_page_idx}. Displaying start of page." - print(status_msg) # Log this - new_idx_on_page_actual = 0 # Default to first item of the loaded page - if current_page_data.empty : # Page is actually empty - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) # Revert to old view - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt - - return load_interface_data(new_page_idx, new_idx_on_page_actual) + status_msg = f"Index {target_abs_idx} (your page {new_user_relative_page_idx}) in range, but not found on loaded page. Displaying start of page." + print(status_msg) + new_idx_on_page_actual = 0 + if current_page_data.empty : + audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg + " (Page empty)", orig_txt, cb_vis, acc + + return load_interface_data(new_user_relative_page_idx, new_idx_on_page_actual) except ValueError: status_msg = "Invalid index format for jump." - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt except Exception as e: - status_msg = f"Error jumping to index: {e}" - print(status_msg) - audio, text, _, rev, color, edit, acc, orig_txt = get_sample(current_page_idx, current_idx_on_page, CURRENT_USERNAME) - return current_page_idx, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt + import traceback + status_msg = f"Error jumping to index: {str(e)}" + print(f"{status_msg}\n{traceback.format_exc()}") + + # Fallback for errors + audio, text, _, rev, color, edit, acc, orig_txt, cb_vis = get_sample(current_page_idx_user_relative, current_idx_on_page, CURRENT_USERNAME) + return current_page_idx_user_relative, current_idx_on_page, audio, gr.update(value=text, interactive=edit), gr.update(value=rev, elem_classes=[color]), status_msg, orig_txt, cb_vis, acc -# Audio editing functions (simplifying, assuming these are for phase 1 only) -def trim_audio_action(page_idx, idx_on_page, trim_start_str, trim_end_str): - # This function would need significant rework if used with the new get_sample returns. - # For now, let's assume it's for phase 1 and we fetch audio path differently or disable in phase 2. - # For simplicity in this modification, advanced audio ops might be limited. - if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Trimming disabled in Review Phase.", gr.Textbox() +# Audio editing functions +def trim_audio_action(page_idx_user_relative, idx_on_page, trim_start_str, trim_end_str): + # Outputs must match navigation_outputs_extended + default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], # page, idx, audio, transcript, reviewer + load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", # status + *load_interface_data(page_idx_user_relative, idx_on_page)[6:]) # original_text, cb_vis, cb_val - # Simplified: fetch audio path if possible - audio_val, transcript, base_status, saved_reviewer, color, editable, accepted, _ = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) - - if not isinstance(audio_val, str) or not os.path.exists(audio_val): - # Try to get original path from current_page_data for non-raw audio - if current_page_data is not None and idx_on_page < len(current_page_data): - audio_entry = current_page_data.iloc[idx_on_page]["audio"] - resolved_path = get_audio_path(audio_entry) - if isinstance(resolved_path, str) and os.path.exists(resolved_path): - audio_val = resolved_path - else: # If it's raw audio data (tuple) or URL, or non-existent path - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Trimming not supported for this audio format or it's not a local file.", transcript - else: - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Audio data not available for trimming.", transcript + if SECOND_PHASE: return default_return("Trimming disabled in Review Phase.") + if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): + return default_return("Audio data not available (page error).") + + audio_entry_from_df = current_page_data.iloc[idx_on_page]["audio"] + original_audio_path_info = get_audio_path(audio_entry_from_df) # This could be (sr, array) or path str + + # Trimming requires a loadable file path. If it's raw array, pydub needs it saved first or handled differently. + # For simplicity, assume get_audio_path gives a string path if it's not raw data. + if not isinstance(original_audio_path_info, str) or not (os.path.exists(original_audio_path_info) or original_audio_path_info.startswith("http")): + # If it is raw data, we might need to save it to a temp file to use pydub.from_file + # Or, if pydub can handle the array directly (it can via AudioSegment(data=..., sample_width=..., frame_rate=..., channels=...)) + # This part needs more robust handling if original_audio_path_info is (sr, array) often. + # For now, only proceed if it's a usable path. + if isinstance(original_audio_path_info, tuple): # Raw data + return default_return("Trimming raw audio array directly not yet implemented via pydub.from_file. Save first or use array ops.") + return default_return("Trimming not supported for this audio source or it's not a local/remote file.") absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + # voice_name_original = os.path.basename(original_audio_path_info if isinstance(original_audio_path_info, str) else f"sample_{absolute_idx}") + voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}")) try: - audio_seg = AudioSegment.from_file(audio_val) + # If original_audio_path_info is a URL, pydub might need it downloaded first. + # Let's assume local paths for simplicity of example with from_file. + # For URLs, you'd fetch then load. + temp_dir_for_download = None + audio_to_load = original_audio_path_info + if isinstance(original_audio_path_info, str) and original_audio_path_info.startswith("http"): + temp_dir_for_download = tempfile.mkdtemp() + # Simple way to get filename from URL for extension + url_fname = original_audio_path_info.split("/")[-1].split("?")[0] + local_fpath = os.path.join(temp_dir_for_download, url_fname or "downloaded_audio") + + import requests + response = requests.get(original_audio_path_info, stream=True) + response.raise_for_status() + with open(local_fpath, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + audio_to_load = local_fpath + + audio_seg = AudioSegment.from_file(audio_to_load) start_ms = int(float(trim_start_str) * 1000) end_ms = int(float(trim_end_str) * 1000) trimmed_seg = audio_seg[start_ms:end_ms] os.makedirs("trimmed_audio", exist_ok=True) - trimmed_filename = f"trimmed_{absolute_idx}_{voice_name_original}" - # Ensure unique extension, wav is usually safe - if not trimmed_filename.lower().endswith(('.wav', '.mp3', '.flac')): - trimmed_filename += ".wav" + # Sanitize voice_name_original further if it comes from URL or complex path + safe_voice_name = re.sub(r'[^\w.-]', '_', voice_name_original) + trimmed_filename = f"trimmed_{absolute_idx}_{safe_voice_name}" + if not os.path.splitext(trimmed_filename)[1]: trimmed_filename += ".wav" # ensure extension trimmed_path = os.path.join("trimmed_audio", trimmed_filename) - # Export format might need to match original or be a standard like wav export_format = os.path.splitext(trimmed_path)[1][1:] - if not export_format: export_format = "wav" # Default if no extension + if not export_format: export_format = "wav" trimmed_seg.export(trimmed_path, format=export_format) + + if temp_dir_for_download: # Cleanup downloaded file + shutil.rmtree(temp_dir_for_download) + dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) - if not sample: # Should exist if we are editing it - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, "Error: Sample not found in annotations for trimming.", transcript + if not sample: return default_return(f"Error: Sample {absolute_idx} not found in annotations for trimming.") now = datetime.now() - # Associate trim with current user's annotation for this sample annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) - if not annotation: # Create if doesn't exist - annotation = Annotation( - annotator=CURRENT_USERNAME, - annotated_subtitle=transcript, # Current transcript - audio_trims=[AudioTrim(start=float(trim_start_str), end=float(trim_end_str))], - create_at=now, - update_at=now - ) + if not annotation: + annotation = Annotation(annotator=CURRENT_USERNAME, create_at=now, update_at=now) # Subtitle from UI sample.annotations = sample.annotations or [] sample.annotations.append(annotation) - else: - annotation.audio_trims = [AudioTrim(start=float(trim_start_str), end=float(trim_end_str))] - annotation.update_at = now + + annotation.audio_trims = [AudioTrim(start=float(trim_start_str), end=float(trim_end_str))] + annotation.update_at = now + # The transcript itself isn't changed by trim, user saves that separately. save_annotations(dataset_model) - new_status = f"{base_status} [Trimmed]" - return page_idx, idx_on_page, trimmed_path, transcript, saved_reviewer, new_status, transcript + + # Update UI to play the new trimmed_path + current_ui_state = load_interface_data(page_idx_user_relative, idx_on_page) + return (current_ui_state[0], current_ui_state[1], trimmed_path, # page,idx, new audio + current_ui_state[3], current_ui_state[4], # transcript, reviewer + current_ui_state[5] + " [Trimmed]", # status + *current_ui_state[6:]) # original_text, cb_vis, cb_val + except Exception as e: - return page_idx, idx_on_page, audio_val, transcript, saved_reviewer, f"Error trimming audio: {str(e)}", transcript + if temp_dir_for_download and os.path.exists(temp_dir_for_download): # Ensure cleanup on error too + shutil.rmtree(temp_dir_for_download) + return default_return(f"Error trimming audio: {str(e)}") -def undo_trim_action(page_idx, idx_on_page): - if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Undo Trim disabled in Review Phase.", gr.Textbox() +def undo_trim_action(page_idx_user_relative, idx_on_page): + default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], + load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", + *load_interface_data(page_idx_user_relative, idx_on_page)[6:]) + if SECOND_PHASE: return default_return("Undo Trim disabled in Review Phase.") - audio_val, transcript, base_status, saved_reviewer, color, editable, accepted, _ = get_sample(page_idx, idx_on_page, CURRENT_USERNAME) - absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): + return default_return("Audio data not available (page error).") + absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) if sample: - annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) # Trim is user-specific + annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) if annotation and annotation.audio_trims: annotation.audio_trims = None annotation.update_at = datetime.now() save_annotations(dataset_model) - # Restore original audio from backup or re-fetch from source dataset info - original_audio_path_or_data = current_page_data.iloc[idx_on_page]["audio"] # This is the source entry - restored_audio_val = get_audio_path(original_audio_path_or_data) + # Restore original audio path from current_page_data + original_audio_entry = current_page_data.iloc[idx_on_page]["audio"] + restored_audio_val = get_audio_path(original_audio_entry) - # key = f"{absolute_idx}_{voice_name_original}" - # orig_audio_backup = audio_backup.get(key) # Fetch from backup if available - # if not orig_audio_backup: # If not in backup, use the path from current_page_data - # orig_audio_backup = get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) + current_ui_state = load_interface_data(page_idx_user_relative, idx_on_page) + return (current_ui_state[0], current_ui_state[1], restored_audio_val, # page,idx, restored audio + current_ui_state[3], current_ui_state[4], # transcript, reviewer + current_ui_state[5] + " [Trim undone]", # status + *current_ui_state[6:]) - new_status = f"{base_status} [Trim undone]" - return page_idx, idx_on_page, restored_audio_val, transcript, saved_reviewer, new_status, transcript +def confirm_delete_audio_action(page_idx_user_relative, idx_on_page): + default_return = lambda msg: (*load_interface_data(page_idx_user_relative, idx_on_page)[0:5], + load_interface_data(page_idx_user_relative, idx_on_page)[5] + f" [{msg}]", + *load_interface_data(page_idx_user_relative, idx_on_page)[6:]) + if SECOND_PHASE: return default_return("Delete disabled in Review Phase.") -def confirm_delete_audio_action(page_idx, idx_on_page): - if SECOND_PHASE: return page_idx, idx_on_page, gr.Audio(), gr.Textbox(), gr.Textbox(), "Delete disabled in Review Phase.", gr.Textbox() + if current_page_data is None or idx_on_page < 0 or idx_on_page >= len(current_page_data): + return default_return("Audio data not available (page error).") absolute_idx = current_page_data.iloc[idx_on_page]['absolute_idx'] - voice_name_original = os.path.basename(str(current_page_data.iloc[idx_on_page]["audio"])) + voice_name_original = os.path.basename(str(get_audio_path(current_page_data.iloc[idx_on_page]["audio"]) or f"sample_{absolute_idx}")) + dataset_model = load_saved_annotations() sample = next((s for s in dataset_model.samples or [] if s.id == absolute_idx), None) @@ -904,45 +929,35 @@ def confirm_delete_audio_action(page_idx, idx_on_page): original_subtitle=current_page_data.iloc[idx_on_page]["sentence"], annotations=[] ) - dataset_model.samples = dataset_model.samples or [] + dataset_model.samples = dataset_model.samples or [] # Ensure list exists dataset_model.samples.append(sample) sample.ignore_it = True now = datetime.now() - # Create/update an annotation by CURRENT_USERNAME to mark this action - annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) deleted_text_marker = "AUDIO DELETED (This audio has been removed.)" + annotation = next((a for a in sample.annotations or [] if a.annotator == CURRENT_USERNAME), None) if annotation: annotation.annotated_subtitle = deleted_text_marker - annotation.audio_trims = None # Clear trims + annotation.audio_trims = None annotation.update_at = now - # Potentially clear review statuses if deletion overrides them else: - annotation = Annotation( - annotator=CURRENT_USERNAME, - annotated_subtitle=deleted_text_marker, - create_at=now, - update_at=now - ) + annotation = Annotation(annotator=CURRENT_USERNAME, annotated_subtitle=deleted_text_marker, create_at=now, update_at=now) sample.annotations = sample.annotations or [] sample.annotations.append(annotation) - save_annotations(dataset_model) - new_status = f"Sample {absolute_idx+1} [Audio deleted]" - if total_samples > 0: new_status += f" of {total_samples}" - - # Return values to update UI correctly after deletion - return page_idx, idx_on_page, None, deleted_text_marker, "deleted", new_status, deleted_text_marker + # After deleting, reload the interface for this item, which will show it as deleted + return load_interface_data(page_idx_user_relative, idx_on_page) -# Export functions (largely unchanged, ensure CURRENT_USERNAME context if it matters for export) +# Export functions def sanitize_string(s): if not isinstance(s, str): s = str(s) return re.sub(r'[^\w-./]', '_', s) def sanitize_sentence(s): if not isinstance(s, str): s = str(s) + # Basic sanitization, consider more robust methods if complex characters are common return s.encode('utf-8', errors='ignore').decode('utf-8') @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) @@ -953,201 +968,161 @@ def push_to_hub_with_retry(dataset_dict, repo_id, private=True, token_val=None): print(f"Pushing dataset to {repo_id}") dataset_dict.push_to_hub(repo_id, private=private, token=token_val) + def export_to_huggingface(repo_name_str, hf_token_for_export, progress=gr.Progress()): - # This export logic needs to be carefully reviewed. - # It rebuilds a dataset from HF_DATASET_NAME and applies annotations. - # It should reflect the FINAL state of annotations (e.g., after second phase review if applicable). - # The current logic uses CURRENT_USERNAME for annotation preference, which might not be ideal for a global export. - # It should ideally use the "winning" annotation (e.g., accepted by reviewer, or approved in 2nd phase). if not hf_token_for_export: return "Export failed: Hugging Face token is missing." + if not repo_name_str or len(repo_name_str.split('/')) != 2: + return "Export failed: Repository name must be in 'username/dataset-name' format." + try: start_time = time.time() - repo_name_str = sanitize_string(repo_name_str) + # repo_name_str = sanitize_string(repo_name_str) # Sanitization might be too aggressive for HF repo names print(f"Export started at {time.strftime('%Y-%m-%d %H:%M:%S')}") - dataset_model_annotations = load_saved_annotations() # Load all annotations - - # Use total_samples from global or re-fetch if necessary. - # The export should process all samples defined by total_samples. - # Let's assume total_samples is the definitive count. - if total_samples <= 0: - return "Export failed: Total number of samples is unknown or invalid." - - # export_total_samples = total_samples - # Using streaming for source, but collecting all data. This can be memory intensive. - # Consider processing in true streaming fashion if dataset is very large. + dataset_model_annotations = load_saved_annotations() - ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False) # Load non-streaming for easier iteration up to total_samples + if total_samples <= 0: # Use global total_samples + # Try to fetch again if not set + info = get_dataset_info() + if total_samples <= 0: + return "Export failed: Total number of samples is unknown or invalid." + + # Load source dataset non-streamed for easier full iteration + ds_source = load_dataset(HF_DATASET_NAME, split="train", streaming=False) exported_data_list = [] progress(0, f"Preparing {total_samples} samples for export...") + num_processed_from_source = 0 for i, source_sample in enumerate(ds_source): - if i >= total_samples: break # Limit to known total_samples - - absolute_idx = i # Assuming source_sample is ordered and corresponds to index i + if i >= total_samples: break # Should not happen if ds_source.num_rows matches total_samples + num_processed_from_source +=1 - audio_entry = source_sample.get("audio") - sentence_val = source_sample.get("sentence", "") # Default original sentence + absolute_idx = i # Assuming source gives ordered samples matching index - # Determine final audio and sentence based on annotations - audio_dict_to_export = None # Default to no audio if deleted or issue + audio_entry = source_sample.get("audio") # This is {path: str} or {array:..., sampling_rate:...} + sentence_val = source_sample.get("sentence", "") - # Convert audio path/data from source_sample to array for export - # This part is tricky: we need to load audio content. - # For simplicity, this example will re-use get_audio_path and then load if it's a path. - raw_audio_data = None - audio_path_or_data = get_audio_path(audio_entry) - if isinstance(audio_path_or_data, tuple): # Raw audio from get_audio_path - raw_audio_data = {"array": audio_path_or_data[1], "sampling_rate": audio_path_or_data[0]} - elif isinstance(audio_path_or_data, str) and (os.path.exists(audio_path_or_data) or audio_path_or_data.startswith("http")): - # If it's a path, load it. This might be slow. - # For URLs, datasets library handles loading when building Dataset object. - # For local paths, we need to load into array. - if os.path.exists(audio_path_or_data): - try: - arr, sr = sf.read(audio_path_or_data) - raw_audio_data = {"array": arr, "sampling_rate": sr} - except Exception as e_load: - print(f"Warning: Could not load audio file {audio_path_or_data} for export: {e_load}") - # raw_audio_data remains None - else: # URL - raw_audio_data = audio_path_or_data # Pass URL directly, Audio feature will handle - - audio_dict_to_export = raw_audio_data - + # Base audio data for export (can be path string or dict for Audio feature) + # If source_sample['audio'] is like {'path': '...', 'bytes': None}, datasets lib handles it. + # If it's {'array': ..., 'sampling_rate': ...}, it's also fine. + audio_dict_to_export = audio_entry - # Check annotations for this sample annotation_data = next((s for s in dataset_model_annotations.samples or [] if s.id == absolute_idx), None) if annotation_data: if annotation_data.ignore_it: sentence_val = "AUDIO DELETED (This audio has been removed.)" - audio_dict_to_export = None # No audio + # Represent deleted audio: empty array or specific silent audio path + audio_dict_to_export = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} else: - # Determine the "best" annotation to use - # Priority: 1. Approved in 2nd phase, 2. Accepted in 1st phase by reviewer, 3. Annotator's latest best_ann = None if annotation_data.annotations: - # Check for 2nd phase approved - # This needs to find the annotation that WAS approved, not make a new one. - # The original annotator's submission that got approved. - if annotation_data.is_approved_in_second_phase: - # Find which annotation was approved. Iterate through them. - for ann in annotation_data.annotations: - if ann.second_phase_review_status == "approved": - best_ann = ann - break + approved_anns = [a for a in annotation_data.annotations if a.second_phase_review_status == "approved"] + if SECOND_PHASE and approved_anns: # Priority to 2nd phase approved + best_ann = sorted(approved_anns, key=lambda x: x.second_phase_review_timestamp, reverse=True)[0] if approved_anns else None - if not best_ann: # Check for 1st phase accepted - for ann in annotation_data.annotations: - if ann.is_first_phase_accepted: - best_ann = ann - break + if not best_ann: # Then 1st phase accepted + accepted_anns = [a for a in annotation_data.annotations if a.is_first_phase_accepted] + best_ann = sorted(accepted_anns, key=lambda x: x.update_at, reverse=True)[0] if accepted_anns else None - if not best_ann: # Fallback to any annotation (e.g., latest by timestamp or first found) - # This could be more sophisticated, e.g. latest updated. - # For now, take first one if multiple non-reviewed/accepted exist. - # Or, if a specific user's annotations are primary (e.g. CURRENT_USERNAME if this is a personal export) - # Let's assume any relevant annotation is fine if not formally accepted/approved. - # The original code used CURRENT_USERNAME's annotation. This might be too specific for a general export. - # Let's try to find *any* annotation from the list for the sample if no "accepted" one exists. - if annotation_data.annotations: - best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0] # latest + if not best_ann: # Fallback to latest annotation if no formal approval/acceptance + best_ann = sorted(annotation_data.annotations, key=lambda x: x.update_at, reverse=True)[0] if best_ann: - sentence_val = best_ann.annotated_subtitle or sentence_val # Use annotated if available - # Handle trimmed audio if specified in best_ann - if best_ann.audio_trims and audio_dict_to_export: # Only if audio exists - # This part requires that trimmed audio files are accessible and named consistently - # The original trim_audio_action saves to "trimmed_audio/trimmed_{abs_idx}_{voice_name}" - # We need to reconstruct this path or have a direct reference. - # Assuming voice_name is from original sample. - original_voice_name = sanitize_string(os.path.basename(str(get_audio_path(audio_entry) or f"sample_{absolute_idx}"))) - trimmed_path_potential = os.path.join("trimmed_audio", f"trimmed_{absolute_idx}_{original_voice_name}") - # Ensure extension consistency for look up - if not os.path.splitext(trimmed_path_potential)[1]: trimmed_path_potential += ".wav" # common default - - if os.path.exists(trimmed_path_potential): - try: - arr, sr = sf.read(trimmed_path_potential) - audio_dict_to_export = {"array": arr, "sampling_rate": sr} - except Exception as e_trim_load: - print(f"Warning: Could not load trimmed audio {trimmed_path_potential}: {e_trim_load}") - # audio_dict_to_export remains as original loaded audio - # else: print(f"Trimmed audio path not found: {trimmed_path_potential}") - + sentence_val = best_ann.annotated_subtitle if best_ann.annotated_subtitle is not None else sentence_val + + if best_ann.audio_trims and audio_dict_to_export: # If audio exists and is trimmed + # Reconstruct trimmed audio path (must be consistent with saving) + original_voice_name_for_trim = os.path.basename(str(get_audio_path(audio_entry) or f"sample_{absolute_idx}")) + safe_voice_name_for_trim = re.sub(r'[^\w.-]', '_', original_voice_name_for_trim) + trimmed_fname_base = f"trimmed_{absolute_idx}_{safe_voice_name_for_trim}" + # Try common extensions or use one stored in AudioTrim if available + potential_trimmed_path = os.path.join("trimmed_audio", trimmed_fname_base + ".wav") # Assume wav for now + + if os.path.exists(potential_trimmed_path): + # Load trimmed audio into array/sr format for export + arr, sr = sf.read(potential_trimmed_path) + audio_dict_to_export = {"array": arr, "sampling_rate": sr} + else: + print(f"Warning: Trimmed audio file {potential_trimmed_path} not found for sample {absolute_idx}. Exporting original/untrimmed.") + exported_data_list.append({ - "audio": audio_dict_to_export, # This will be None if deleted or failed to load + "audio": audio_dict_to_export, "sentence": sanitize_sentence(sentence_val) }) - if (i + 1) % 100 == 0: # Progress update + if (i + 1) % 100 == 0: progress((i + 1) / total_samples, f"Processed {i+1}/{total_samples} samples") gc.collect() + + if num_processed_from_source != total_samples: + print(f"Warning: Processed {num_processed_from_source} from source, but expected total_samples {total_samples}.") + if not exported_data_list: return "No data to export after processing." - # Create Hugging Face Dataset from the collected data - # Filter out entries where audio is None if dataset schema requires audio - # final_export_list = [item for item in exported_data_list if item["audio"] is not None] - # Or handle audio being optional by schema. For Audio(), None might not be allowed if array is mandatory. - # Let's assume for now audio can be None (e.g. deleted). If Audio() cast fails, this needs adjustment. - # The Audio feature expects a path, dict with array/sr, or bytes. None might lead to issues. - # Handling: if audio_dict_to_export is None, replace with a dummy silent audio array or skip sample. - # For now, let's try passing None and see if cast_column handles it gracefully or errors. - # It's safer to ensure 'audio' is always a valid Audio structure or path. - # If audio is None (e.g. ignore_it=True), we should ensure the Audio feature can handle it. - # Typically, you might replace with a path to a very short silent audio file, or an empty array if supported. - + # Ensure audio format is compatible before creating Dataset for item in exported_data_list: - if item["audio"] is None: # If audio was marked for deletion / ignore_it - # Provide a placeholder that Audio() can cast, e.g. path to a tiny silent wav or empty array - # For simplicity, if datasets lib allows None for audio feature, this is fine. - # Otherwise, this needs a robust placeholder. - # A common practice is to provide a dictionary with a path to a universally accessible silent file, - # or an empty numpy array for 'array' and a common 'sampling_rate'. - # Let's try with an empty array. - item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Example placeholder - elif isinstance(item["audio"], str): # If it's a URL or path string - # The Audio feature will handle loading this. - pass - elif not (isinstance(item["audio"], dict) and "array" in item["audio"] and "sampling_rate" in item["audio"]): - print(f"Warning: Invalid audio format for export for a sample, replacing with silent audio: {item['audio']}") + if item["audio"] is None: # Should have been replaced by placeholder if deleted item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} + elif isinstance(item["audio"], dict) and 'path' in item["audio"] and item["audio"]['path'] is None: # Path is None, but dict exists + item["audio"] = {"array": np.array([], dtype=np.float32), "sampling_rate": 16000} # Replace with placeholder + try: + final_dataset = Dataset.from_list(exported_data_list) + final_dataset = final_dataset.cast_column("audio", Audio(sampling_rate=16000)) # Specify common sampling rate + except Exception as e_cast: + # Detailed error for casting issues + print(f"Error during Dataset.from_list or cast_column: {e_cast}") + # Inspect a few items from exported_data_list if casting fails + for idx, problematic_item in enumerate(exported_data_list[:5]): + print(f"Sample item {idx} for export: Audio type {type(problematic_item['audio'])}, Audio content: {str(problematic_item['audio'])[:200]}") + return f"Export failed during data conversion: {e_cast}. Check audio data formats." - final_dataset = Dataset.from_list(exported_data_list) - final_dataset = final_dataset.cast_column("audio", Audio()) # Cast to Audio feature type dataset_dict_export = DatasetDict({"train": final_dataset}) progress(0.95, "Uploading to Hugging Face...") + target_repo_id = f"{whoami(token=hf_token_for_export)['name']}/{repo_name_str.split('/')[-1]}" # Ensures user owns repo + push_to_hub_with_retry( dataset_dict=dataset_dict_export, - repo_id=repo_name_str, - private=True, # Assuming private, can be a parameter + repo_id=target_repo_id, # Use sanitized and correctly owned repo ID + private=True, token_val=hf_token_for_export ) - print(f"Upload done, total time: {time.time() - start_time:.2f}s") + end_time = time.time() + print(f"Upload done, total time: {end_time - start_time:.2f}s") progress(1.0, "Upload complete!") - return f"Exported to huggingface.co/datasets/{repo_name_str}" + return f"Exported to huggingface.co/datasets/{target_repo_id}" except Exception as e: - error_msg = f"Export failed: {str(e)}" import traceback + error_msg = f"Export failed: {str(e)}" print(f"{error_msg}\n{traceback.format_exc()}") return error_msg # Login function def hf_login(hf_token_val): - global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges + global CURRENT_USERNAME, token, current_page_data, total_samples, annotator_ranges, SECOND_PHASE_REVIEW_MAPPING - if not hf_token_val: # If user clears the box and clicks login - return gr.update(visible=True), gr.update(visible=False), "", "", "Login failed: Token cannot be empty." + if not hf_token_val: + # Keep main_container hidden. Return tuple must match login_outputs. + # The number of outputs is 16. + return (gr.update(visible=True), gr.update(visible=False), # login_container, main_container + gr.update(value="N/A"), hf_token_val, "Login failed: Token cannot be empty.", # reviewer_tb, hf_token_state, login_message + # Visibility updates (all default to not visible or specific phase logic) + gr.update(visible=not SECOND_PHASE), gr.update(interactive=not SECOND_PHASE), # save_next, transcript_tb + gr.update(visible=not SECOND_PHASE), gr.update(visible=not SECOND_PHASE), gr.update(visible=not SECOND_PHASE), # trim, undo, delete + gr.update(visible=False), # first_phase_accept_cb (depends on role) + gr.update(visible=SECOND_PHASE), gr.update(visible=SECOND_PHASE), # approve, reject + # Initial data states + 0, 0, None, gr.update(value=""), # page_idx, idx_on_page, audio_player, transcript_tb value + "Please log in.", "") # status_md, original_transcript_state try: user_info = whoami(token=hf_token_val) @@ -1155,139 +1130,124 @@ def hf_login(hf_token_val): if username in ALLOWED_USERS: CURRENT_USERNAME = username - token = hf_token_val # Store the validated token globally for other HF ops + token = hf_token_val # Store validated token - # Initialize/re-initialize dataset info and ranges based on logged-in user - # This ensures that if total_samples was not fetched, it's attempted again. + # (Re)-initialize dataset info and user-specific ranges/assignments ds_info = get_dataset_info() # Sets global total_samples - if total_samples > 0: - annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) - if SECOND_PHASE: - initialize_second_phase_assignments() # Depends on ANNOTATORS and their ranges - else: - # Handle case where total_samples is still unknown (critical for ranges) - return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, "Login successful, but failed to get dataset size. Cannot proceed." + if total_samples <= 0: + # Return tuple must match login_outputs. + return (gr.update(visible=True), gr.update(visible=False), + gr.update(value="Error"), hf_token_val, "Login OK, but failed to get dataset size. Cannot proceed.", + *( [gr.update(visible=False)] * 8 ), # visibility updates + 0,0,None,gr.update(value=""), "Error", "") - # Load initial page data for this user - current_page_data = load_page_data(0) # page 0 for the current user + annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + if SECOND_PHASE: + initialize_second_phase_assignments() # Depends on ANNOTATORS and ranges - # Determine initial UI state based on SECOND_PHASE - is_second_phase_active = SECOND_PHASE - - # Update visibility of components based on phase - updates = { - # Phase 1 components - "save_next_button_vis": not is_second_phase_active, - "transcript_interactive": not is_second_phase_active, - "trim_button_vis": not is_second_phase_active, - "undo_trim_button_vis": not is_second_phase_active, - "delete_button_vis": not is_second_phase_active, - "first_phase_accept_cb_vis": (not is_second_phase_active and get_user_role(CURRENT_USERNAME) == "reviewer"), - # Phase 2 components - "approve_button_vis": is_second_phase_active, - "reject_button_vis": is_second_phase_active, - } + user_allowed_range_check = get_user_allowed_range(CURRENT_USERNAME) + if not user_allowed_range_check or user_allowed_range_check[0] > user_allowed_range_check[1]: + # Return tuple must match login_outputs. + return (gr.update(visible=True), gr.update(visible=False), + gr.update(value="Error"), hf_token_val, f"Login OK, but user {CURRENT_USERNAME} has no samples assigned for {'review' if SECOND_PHASE else 'annotation'}.", + *( [gr.update(visible=False)] * 8 ), + 0,0,None,gr.update(value=""), "Error: No samples assigned.", "") + + + # Load initial page data (user-relative page 0) + current_page_data = load_page_data(0) # Page 0 for this user - initial_load = load_interface_data(0, 0) # Load data for the first sample (page 0, index 0 on page) + is_second_phase_active = SECOND_PHASE + user_is_first_phase_reviewer = not is_second_phase_active and get_user_role(CURRENT_USERNAME) == "reviewer" + + # Initial load for UI elements + # load_interface_data returns: page_idx, idx_on_page, audio, transcript_update, reviewer_update, status, orig_text, cb_visibility_update, cb_value + initial_load_tuple = load_interface_data(0, 0) # User-relative page 0, item 0 - # Return tuple for outputs matching login_button.click() - # login_container, main_container, reviewer_textbox (as initial state), hf_token_state, login_message, - # then all the visibility/interactivity updates return ( gr.update(visible=False), # login_container gr.update(visible=True), # main_container - initial_load[4], # reviewer_textbox gr.update object (initial_load[4] is reviewer text gr.update) + initial_load_tuple[4], # reviewer_tb gr.update object hf_token_val, # hf_token_state - f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if SECOND_PHASE else 'Annotation'}.", # login_message + f"Login successful! Welcome {CURRENT_USERNAME}. Phase: {'Review' if is_second_phase_active else 'Annotation'}.", # login_message - # UI component updates based on phase - gr.update(visible=updates["save_next_button_vis"]), - gr.update(interactive=updates["transcript_interactive"]), # This is for transcript Textarea - gr.update(visible=updates["trim_button_vis"]), - gr.update(visible=updates["undo_trim_button_vis"]), - gr.update(visible=updates["delete_button_vis"]), - gr.update(visible=updates["first_phase_accept_cb_vis"]), - gr.update(visible=updates["approve_button_vis"]), - gr.update(visible=updates["reject_button_vis"]), - - # Initial data for the interface elements from load_interface_data - initial_load[0], # page_idx_state - initial_load[1], # idx_on_page_state - initial_load[2], # audio_player - initial_load[3], # transcript (already includes interactivity) - # initial_load[4] is reviewer, already used above for initial value - initial_load[5], # status_md - initial_load[6], # original_transcript_state + gr.update(visible=not is_second_phase_active), # save_next_button + gr.update(interactive=not is_second_phase_active), # transcript_tb interactivity + gr.update(visible=not is_second_phase_active), # trim_button + gr.update(visible=not is_second_phase_active), # undo_trim_button + gr.update(visible=not is_second_phase_active), # delete_button + initial_load_tuple[7], # first_phase_accept_cb visibility update + gr.update(visible=is_second_phase_active), # approve_button + gr.update(visible=is_second_phase_active), # reject_button + + initial_load_tuple[0], # page_idx_state + initial_load_tuple[1], # idx_on_page_state + initial_load_tuple[2], # audio_player + initial_load_tuple[3], # transcript_tb (already a gr.update obj with value & interactivity) + initial_load_tuple[5], # status_md + initial_load_tuple[6], # original_transcript_state + # The first_phase_accept_cb value is handled by its direct output connection later ) - else: + else: # User not in ALLOWED_USERS CURRENT_USERNAME = None - return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, "User not authorized!" + return (gr.update(visible=True), gr.update(visible=False), + gr.update(value="N/A"), hf_token_val, "User not authorized!", + *( [gr.update(visible=False)] * 8 ), + 0,0,None,gr.update(value=""),"Not Authorized","") except Exception as e: CURRENT_USERNAME = None import traceback - print(f"Login failed: {str(e)}\n{traceback.format_exc()}") - return gr.update(visible=True), gr.update(visible=False), "", hf_token_val, f"Login failed: {str(e)}" - + login_err_msg = f"Login failed: {str(e)}" + print(f"{login_err_msg}\n{traceback.format_exc()}") + return (gr.update(visible=True), gr.update(visible=False), + gr.update(value="Error"), hf_token_val, login_err_msg, + *( [gr.update(visible=False)] * 8 ), + 0,0,None,gr.update(value=""), "Login Error","") -# Set initial values for UI elements before login (mostly empty or default) -init_page_idx = 0 -init_idx_on_page = 0 -init_audio_val = None -init_transcript_val = gr.update(value="", interactive=False) # Non-interactive before login -init_reviewer_val = gr.update(value="N/A", interactive=False) -init_status_val = "Please log in." -init_original_text_val = "" # Gradio Interface css = """ -.white { background-color: white; color: black; } -.yellow { background-color: yellow; color: black; } -.blue { background-color: lightblue; color: black; } /* Adjusted for readability */ -.green { background-color: lightgreen; color: black; } /* Adjusted for readability */ -.pink { background-color: pink; color: black; } -.red { background-color: #FF7F7F; color: black; } /* Softer red */ -.orange { background-color: orange; color: black; } -.gray { background-color: lightgray; color: black; } -.lightgray { background-color: #f0f0f0; color: black; } /* For very subtle states */ +.white { background-color: white; color: black; } .yellow { background-color: yellow; color: black; } +.blue { background-color: lightblue; color: black; } .green { background-color: lightgreen; color: black; } +.pink { background-color: pink; color: black; } .red { background-color: #FF7F7F; color: black; } +.orange { background-color: orange; color: black; } .gray { background-color: lightgray; color: black; } +.lightgray { background-color: #f0f0f0; color: black; } .reviewer-textbox input { text-align: center; font-weight: bold; } """ with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: - hf_token_state = gr.State(token) # Store token for export or other uses - - # UI States for navigation and data - current_page_idx_state = gr.State(init_page_idx) - current_idx_on_page_state = gr.State(init_idx_on_page) - original_transcript_state = gr.State(init_original_text_val) # Stores original subtitle from dataset for current item + hf_token_state = gr.State(token) + current_page_idx_state = gr.State(0) # User-relative page index + current_idx_on_page_state = gr.State(0) + original_transcript_state = gr.State("") with gr.Column(visible=True, elem_id="login_container") as login_container: - gr.Markdown("## HF Authentication\nPlease enter your Hugging Face token (read & write permissions).") - hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", placeholder="Enter your HF token", value="") + gr.Markdown("## HF Authentication") + hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", value=os.getenv("hf_token","")) login_button = gr.Button("Login") login_message = gr.Markdown("") with gr.Column(visible=False, elem_id="main_container") as main_container: gr.Markdown("# ASR Dataset Labeling Interface") - status_md = gr.Markdown(init_status_val) # For status messages like "Sample x of y" + status_md = gr.Markdown("Please log in.") with gr.Row(): with gr.Column(scale=2): - audio_player = gr.Audio(value=init_audio_val, label="Audio Sample", autoplay=False) # Autoplay off initially - transcript_tb = gr.TextArea(value=init_transcript_val['value'], label="Transcript", lines=5, - interactive=init_transcript_val.get('interactive', False)) - reviewer_tb = gr.Textbox(value=init_reviewer_val['value'], label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"]) + audio_player = gr.Audio(label="Audio Sample", autoplay=False) + transcript_tb = gr.TextArea(label="Transcript", lines=5, interactive=False) + reviewer_tb = gr.Textbox(label="Annotation Status / Reviewer", interactive=False, elem_classes=["white", "reviewer-textbox"]) with gr.Column(scale=1): gr.Markdown("### Navigation") prev_button = gr.Button("← Previous") - next_button = gr.Button("Next (no save)") # For first phase + next_button = gr.Button("Next (no save)") - # Phase 1 Buttons + # Phase 1 save_next_button = gr.Button("Save & Next", variant="primary", visible=not SECOND_PHASE) - first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=(not SECOND_PHASE and CURRENT_USERNAME in REVIEWERS if CURRENT_USERNAME else False)) + first_phase_accept_cb = gr.Checkbox(label="Accept (Reviewer)", visible=False, value=False) # Visibility controlled by login & get_sample - # Phase 2 Buttons + # Phase 2 approve_button = gr.Button("Approve & Next", variant="primary", visible=SECOND_PHASE) reject_button = gr.Button("Reject & Next", variant="stop", visible=SECOND_PHASE) @@ -1298,94 +1258,89 @@ with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: trim_button = gr.Button("Trim Audio", visible=not SECOND_PHASE) undo_trim_button = gr.Button("Undo Trim", visible=not SECOND_PHASE) delete_button = gr.Button("Mark Audio as Deleted", variant="stop", visible=not SECOND_PHASE) - # Confirm/Cancel for delete are managed dynamically with gr.Accordion("Advanced Navigation & Export", open=False): with gr.Row(): - jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter index number") + jump_text_tb = gr.Textbox(label="Jump to Global Index", placeholder="Enter dataset absolute index") jump_button = gr.Button("Jump") with gr.Row(): - hf_repo_name_tb = gr.Textbox(label="Export Repository Name (username/dataset-name)", - placeholder=f"{CURRENT_USERNAME}/my-annotated-dataset" if CURRENT_USERNAME else "your-username/asr-dataset") + default_repo_name = f"{CURRENT_USERNAME}/my-annotated-dataset" if CURRENT_USERNAME else "your-username/asr-dataset" + hf_repo_name_tb = gr.Textbox(label="Export Repository Name (username/dataset-name)", value=default_repo_name) hf_export_button = gr.Button("Export to Hugging Face", variant="primary") hf_export_status_md = gr.Markdown("") - # Define outputs for login_button carefully, matching the hf_login function's return tuple + # Outputs for login_button (16 outputs) login_outputs = [ login_container, main_container, reviewer_tb, hf_token_state, login_message, - # Visibility/interactivity updates save_next_button, transcript_tb, trim_button, undo_trim_button, delete_button, - first_phase_accept_cb, approve_button, reject_button, - # Initial data load updates + first_phase_accept_cb, # This will receive gr.update(visible=...) from login + approve_button, reject_button, current_page_idx_state, current_idx_on_page_state, audio_player, transcript_tb, status_md, original_transcript_state + # Note: transcript_tb appears twice: once for interactivity, once for value. Gradio handles this. + # The value for first_phase_accept_cb itself will be updated by navigation/load_interface_data ] + login_button.click(fn=hf_login, inputs=[hf_token_input], outputs=login_outputs) - login_button.click( - fn=hf_login, - inputs=[hf_token_input], - outputs=login_outputs - ) - - # Common outputs for navigation and actions - navigation_outputs = [ + # Common outputs for navigation and actions that reload sample view (9 outputs) + navigation_outputs_extended = [ current_page_idx_state, current_idx_on_page_state, - audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state + audio_player, transcript_tb, reviewer_tb, status_md, original_transcript_state, + first_phase_accept_cb, # For visibility + first_phase_accept_cb # For value ] - # Phase 1 actions + # Phase 1 save_next_button.click( fn=save_and_next_sample_first_phase, inputs=[current_page_idx_state, current_idx_on_page_state, transcript_tb, first_phase_accept_cb], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) - next_button.click( fn=go_next_sample_wrapper, inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs - ) # REMOVED the problematic .then() call here - + outputs=navigation_outputs_extended + ) prev_button.click( fn=go_prev_sample_wrapper, inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) - # Phase 2 actions + # Phase 2 approve_button.click( fn=review_and_next_sample_second_phase, inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("approved")], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) reject_button.click( fn=review_and_next_sample_second_phase, inputs=[current_page_idx_state, current_idx_on_page_state, gr.State("rejected")], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) # Audio tools (Phase 1) trim_button.click( fn=trim_audio_action, inputs=[current_page_idx_state, current_idx_on_page_state, trim_start_tb, trim_end_tb], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) undo_trim_button.click( fn=undo_trim_action, inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) delete_button.click( fn=confirm_delete_audio_action, inputs=[current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) # Jump and Export jump_button.click( fn=jump_to_absolute_idx, inputs=[jump_text_tb, current_page_idx_state, current_idx_on_page_state], - outputs=navigation_outputs + outputs=navigation_outputs_extended ) hf_export_button.click( fn=export_to_huggingface, @@ -1394,11 +1349,36 @@ with gr.Blocks(css=css, title="ASR Dataset Labeling Tool") as demo: queue=True ) -# Launch the interface if __name__ == "__main__": + # --- Global config override for testing --- + # SECOND_PHASE = True # <<<<<<< SET THIS TO True TO TEST SECOND PHASE + # ALLOWED_USERS = ["vargha", "navidved", "userC"] + # REVIEWERS = ["vargha"] # First phase reviewers + # ANNOTATORS = ["navidved", "userC"] # First phase annotators (become reviewers in 2nd phase if in this list) + + # Re-initialize based on potential overrides FOR TESTING + # In a real app, these would be set once at the top. + # ANNOTATORS = [user for user in ALLOWED_USERS if user not in REVIEWERS] + # if total_samples > 0: # If total_samples was determined + # annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + # if SECOND_PHASE: + # initialize_second_phase_assignments() + # else: + # print("Main block: total_samples not positive, ranges/assignments might be an issue if not loaded by login.") + + if SECOND_PHASE: - print("==== APPLICATION RUNNING IN SECOND PHASE (REVIEW MODE) ====") + print("==== APPLICATION LAUNCHING IN SECOND PHASE (REVIEW MODE) ====") + # Ensure assignments are made if they haven't been due to total_samples not being ready + if not SECOND_PHASE_REVIEW_MAPPING and total_samples > 0 and ANNOTATORS: + print("Late initialization of second phase assignments...") + if not annotator_ranges: # Should be populated by now if total_samples is known + annotator_ranges = calculate_annotator_ranges(total_samples, ANNOTATORS) + initialize_second_phase_assignments() + elif not SECOND_PHASE_REVIEW_MAPPING : + print("Warning: Second phase active, but review mapping is empty. Check total_samples and ANNOTATORS list.") + else: - print("==== APPLICATION RUNNING IN FIRST PHASE (ANNOTATION MODE) ====") + print("==== APPLICATION LAUNCHING IN FIRST PHASE (ANNOTATION MODE) ====") demo.queue().launch(debug=True, share=False) \ No newline at end of file