AdnanElAssadi's picture
Update app.py
77dbca6 verified
raw
history blame
31.5 kB
import gradio as gr
import json
import os
from pathlib import Path
import time
def create_reranking_interface(task_data):
"""Create a Gradio interface for reranking evaluation."""
samples = task_data["samples"]
results = {"task_name": task_data["task_name"], "task_type": "reranking", "annotations": []}
completed_samples = {s["id"]: False for s in samples}
# Load existing results if available
output_path = f"{task_data['task_name']}_human_results.json"
if os.path.exists(output_path):
try:
with open(output_path, "r") as f:
saved_results = json.load(f)
if "annotations" in saved_results:
results["annotations"] = saved_results["annotations"]
# Update completed_samples based on loaded data
for annotation in saved_results["annotations"]:
sample_id = annotation.get("sample_id")
if sample_id and sample_id in completed_samples:
completed_samples[sample_id] = True
except Exception as e:
print(f"Error loading existing results: {e}")
def save_ranking(rankings, sample_id):
"""Save the current set of rankings."""
try:
# Check if all documents have rankings
if not rankings or len(rankings) == 0:
return "⚠️ No rankings provided", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
all_ranked = all(r is not None and r != "" for r in rankings)
if not all_ranked:
return "⚠️ Please assign a rank to all documents before submitting", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Convert rankings to integers with better error handling
try:
processed_rankings = [int(r) for r in rankings]
except ValueError:
return "⚠️ Invalid ranking value. Please use only numbers.", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Check for duplicate rankings
if len(set(processed_rankings)) != len(processed_rankings):
return "⚠️ Each document must have a unique rank. Please review your rankings.", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Store this annotation in memory
existing_idx = next((i for i, a in enumerate(results["annotations"]) if a["sample_id"] == sample_id), None)
if existing_idx is not None:
results["annotations"][existing_idx] = {
"sample_id": sample_id,
"rankings": processed_rankings
}
else:
results["annotations"].append({
"sample_id": sample_id,
"rankings": processed_rankings
})
completed_samples[sample_id] = True
# Always save to file for redundancy
try:
output_path = f"{task_data['task_name']}_human_results.json"
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
return f"✅ Rankings saved successfully", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
except Exception as file_error:
# If file saving fails, still mark as success since we saved in memory
print(f"File save error: {file_error}")
return f"✅ Rankings saved in memory (file save failed)", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
except Exception as e:
# Return specific error message
print(f"Save ranking error: {e}")
return f"Error: {str(e)}", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(f"# {task_data['task_name']} - Human Reranking Evaluation")
with gr.Accordion("Instructions", open=True):
gr.Markdown("""
## Task Instructions
{instructions}
### How to use this interface:
1. Read the query at the top
2. Review each document carefully
3. Assign a rank to each document (1 = most relevant, higher numbers = less relevant)
4. Each document must have a unique rank
5. Click "Submit Rankings" when you're done with the current query
6. Use "Previous" and "Next" to navigate between queries
7. Your rankings are automatically saved when you submit or navigate
""".format(instructions=task_data.get("instructions", "Rank documents by their relevance to the query.")))
current_sample_id = gr.State(value=samples[0]["id"])
current_state = gr.State(value={"auto_save_enabled": True, "last_saved": time.time()})
with gr.Row():
progress_text = gr.Textbox(label="Progress", value=f"Progress: 0/{len(samples)}", interactive=False)
status_box = gr.Textbox(label="Status", value="Ready to start evaluation", interactive=False)
auto_save_toggle = gr.Checkbox(label="Auto-save when navigating", value=True)
with gr.Group():
gr.Markdown("## Query:")
query_text = gr.Textbox(value=samples[0]["query"], label="", interactive=False)
gr.Markdown("## Documents to Rank:")
# Create document displays and ranking inputs in synchronized pairs
doc_containers = []
ranking_inputs = []
validation_indicators = []
with gr.Column():
# Quick ranking tools
with gr.Row():
gr.Markdown("### Quick Ranking Options:")
sequential_btn = gr.Button("Rank in Order (1,2,3...)")
reverse_btn = gr.Button("Reverse Order (n,n-1,...)")
clear_btn = gr.Button("Clear All Rankings")
# Document display with better UI for ranking
for i, doc in enumerate(samples[0]["candidates"]):
with gr.Row():
with gr.Column(scale=4):
doc_box = gr.Textbox(
value=doc,
label=f"Document {i+1}",
interactive=False
)
doc_containers.append(doc_box)
with gr.Column(scale=1):
# Use Radio buttons for ranking rather than dropdowns
# This provides a more visual and error-resistant interface
rank_input = gr.Radio(
choices=[str(j) for j in range(1, len(samples[0]["candidates"])+1)],
label=f"Rank",
value="",
interactive=True
)
ranking_inputs.append(rank_input)
# Add validation indicator
with gr.Column(scale=1, min_width=50):
validation = gr.HTML(value="")
validation_indicators.append(validation)
with gr.Row():
prev_btn = gr.Button("← Previous Query", size="sm")
submit_btn = gr.Button("Submit Rankings", size="lg", variant="primary")
next_btn = gr.Button("Next Query →", size="sm")
with gr.Row():
save_btn = gr.Button("💾 Save All Results", variant="secondary")
results_info = gr.HTML(value=f"<p>Results will be saved to <code>{task_data['task_name']}_human_results.json</code></p>")
def validate_rankings(*rankings):
"""Validate rankings and update indicators."""
results = []
all_valid = True
for rank in rankings:
if rank is None or rank == "":
results.append("⚠️")
all_valid = False
else:
results.append("✓")
return results, all_valid
def load_sample(sample_id):
"""Load a specific sample into the interface."""
sample = next((s for s in samples if s["id"] == sample_id), None)
if not sample:
return [query_text.value] + [d.value for d in doc_containers] + [""] * len(ranking_inputs) + validation_indicators + [sample_id, progress_text.value, status_box.value]
# Update query
new_query = sample["query"]
# Update documents
new_docs = []
for i, doc in enumerate(sample["candidates"]):
if i < len(doc_containers):
new_docs.append(doc)
# Initialize rankings
new_rankings = [""] * len(ranking_inputs)
# Check if this sample has already been annotated
existing_annotation = next((a for a in results["annotations"] if a["sample_id"] == sample_id), None)
if existing_annotation:
# Restore previous rankings
for i, rank in enumerate(existing_annotation["rankings"]):
if i < len(new_rankings) and rank is not None:
new_rankings[i] = str(rank)
# Update progress
current_idx = samples.index(sample)
new_progress = f"Progress: {sum(completed_samples.values())}/{len(samples)}"
new_status = f"Viewing query {current_idx + 1} of {len(samples)}"
if completed_samples[sample_id]:
new_status += " (already completed)"
# Initialize validation indicators
validation_results, _ = validate_rankings(*new_rankings)
return [new_query] + new_docs + new_rankings + validation_results + [sample_id, new_progress, new_status]
def auto_save_and_navigate(direction, current_id, auto_save, *rankings):
"""Save rankings if auto-save is enabled, then navigate."""
# Extract rankings (remove validation indicators)
actual_rankings = rankings[:len(ranking_inputs)]
# If auto-save is enabled, try to save the current rankings
status_msg = ""
progress_msg = f"Progress: {sum(completed_samples.values())}/{len(samples)}"
if auto_save:
# Only save if all rankings are provided
validation_results, all_valid = validate_rankings(*actual_rankings)
if all_valid:
status_msg, progress_msg = save_ranking(actual_rankings, current_id)
# Navigate to the next/previous sample
if direction == "next":
new_id = next_sample(current_id)
else:
new_id = prev_sample(current_id)
# Return the new sample ID and status message
return new_id, status_msg, progress_msg
def next_sample(current_id):
"""Load the next sample."""
current_sample = next((s for s in samples if s["id"] == current_id), None)
if not current_sample:
return current_id
current_idx = samples.index(current_sample)
if current_idx < len(samples) - 1:
next_sample = samples[current_idx + 1]
return next_sample["id"]
return current_id
def prev_sample(current_id):
"""Load the previous sample."""
current_sample = next((s for s in samples if s["id"] == current_id), None)
if not current_sample:
return current_id
current_idx = samples.index(current_sample)
if current_idx > 0:
prev_sample = samples[current_idx - 1]
return prev_sample["id"]
return current_id
def save_results():
"""Save all collected results to a file."""
output_path = f"{task_data['task_name']}_human_results.json"
try:
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
current_state.value["last_saved"] = time.time()
return f"✅ Results saved to {output_path} ({len(results['annotations'])} annotations)"
except Exception as e:
return f"Error saving results: {str(e)}"
# Function to assign sequential ranks
def assign_sequential_ranks():
return [str(i+1) for i in range(len(samples[0]["candidates"]))]
# Function to assign reverse ranks
def assign_reverse_ranks():
n = len(samples[0]["candidates"])
return [str(n-i) for i in range(n)]
# Function to clear all rankings
def clear_rankings():
return ["" for _ in range(len(samples[0]["candidates"]))]
# Define a function that collects all ranking values and validates them
def submit_rankings(*args):
# Get the last argument (sample_id) and the rankings
if len(args) < 1:
return "Error: No arguments provided", progress_text.value
# Verify we have enough rankings
if len(args) < len(ranking_inputs) + 1:
return "Error: Not enough ranking inputs provided", progress_text.value
sample_id = args[-1]
rankings = args[:len(ranking_inputs)]
# First validate the rankings
validation_results, all_valid = validate_rankings(*rankings)
# Update validation indicators
for i, result in enumerate(validation_results):
validation_indicators[i].update(value=result)
# If not all valid, return error message
if not all_valid:
return "⚠️ Please assign a rank to all documents before submitting", progress_text.value
# Save the validated rankings
status, progress = save_ranking(rankings, sample_id)
return status, progress
# Connect events - Direct input/output connections for reliability
submit_btn.click(
submit_rankings,
inputs=ranking_inputs + [current_sample_id],
outputs=[status_box, progress_text]
)
# Apply auto-save before navigation if enabled
next_btn.click(
auto_save_and_navigate,
inputs=["next", current_sample_id, auto_save_toggle] + ranking_inputs,
outputs=[current_sample_id, status_box, progress_text]
).then(
load_sample,
inputs=[current_sample_id],
outputs=[query_text] + doc_containers + ranking_inputs + validation_indicators + [current_sample_id, progress_text, status_box]
)
prev_btn.click(
auto_save_and_navigate,
inputs=["prev", current_sample_id, auto_save_toggle] + ranking_inputs,
outputs=[current_sample_id, status_box, progress_text]
).then(
load_sample,
inputs=[current_sample_id],
outputs=[query_text] + doc_containers + ranking_inputs + validation_indicators + [current_sample_id, progress_text, status_box]
)
# Connect quick ranking buttons
sequential_btn.click(
assign_sequential_ranks,
outputs=ranking_inputs
)
reverse_btn.click(
assign_reverse_ranks,
outputs=ranking_inputs
)
clear_btn.click(
clear_rankings,
outputs=ranking_inputs
)
# Connect save button
save_btn.click(save_results, outputs=[status_box])
# Add validation on ranking changes
for i, ranking in enumerate(ranking_inputs):
ranking.change(
validate_rankings,
inputs=ranking_inputs,
outputs=validation_indicators + [gr.State(value=None)] # Add dummy output to match function return
)
# Set up auto-save feature
auto_save_toggle.change(
lambda x: {"auto_save_enabled": x},
inputs=[auto_save_toggle],
outputs=[current_state]
)
return demo
# Main app with file upload capability and improved task management
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# MTEB Human Evaluation Demo")
with gr.Tabs():
with gr.TabItem("Demo"):
gr.Markdown("""
## MTEB Human Evaluation Interface
This interface allows you to evaluate the relevance of documents for reranking tasks.
""")
# Function to get the most recent task file
def get_latest_task_file():
# Check first in uploaded_tasks directory
os.makedirs("uploaded_tasks", exist_ok=True)
uploaded_tasks = [f for f in os.listdir("uploaded_tasks") if f.endswith(".json")]
if uploaded_tasks:
# Sort by modification time, newest first
uploaded_tasks.sort(key=lambda x: os.path.getmtime(os.path.join("uploaded_tasks", x)), reverse=True)
task_path = os.path.join("uploaded_tasks", uploaded_tasks[0])
# Verify this is a valid task file
try:
with open(task_path, "r") as f:
task_data = json.load(f)
if "task_name" in task_data and "samples" in task_data:
return task_path
except:
pass
# Look for task files in the current directory
current_dir_tasks = [f for f in os.listdir(".") if f.endswith("_human_eval.json")]
if current_dir_tasks:
# Sort by modification time, newest first
current_dir_tasks.sort(key=lambda x: os.path.getmtime(x), reverse=True)
return current_dir_tasks[0]
# Fall back to fixed example if available
if os.path.exists("AskUbuntuDupQuestions_human_eval.json"):
return "AskUbuntuDupQuestions_human_eval.json"
# No valid task file found
return None
# Load the task file
task_file = get_latest_task_file()
if task_file:
try:
with open(task_file, "r") as f:
task_data = json.load(f)
# Show which task is currently loaded
gr.Markdown(f"**Current Task: {task_data['task_name']}** ({len(task_data['samples'])} samples)")
# Display the interface
reranking_demo = create_reranking_interface(task_data)
except Exception as e:
gr.Markdown(f"**Error loading task: {str(e)}**")
gr.Markdown("Please upload a valid task file in the 'Upload & Evaluate' tab.")
else:
gr.Markdown("**No task file found**")
gr.Markdown("Please upload a valid task file in the 'Upload & Evaluate' tab.")
with gr.TabItem("Upload & Evaluate"):
gr.Markdown("""
## Upload Your Own Task File
If you have a prepared task file, you can upload it here to create an evaluation interface.
""")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload a task file (JSON)")
load_btn = gr.Button("Load Task")
message = gr.Textbox(label="Status", interactive=False)
# Add task list for previously uploaded tasks
gr.Markdown("### Previous Uploads")
# Function to list existing task files in the tasks directory
def list_task_files():
os.makedirs("uploaded_tasks", exist_ok=True)
tasks = [f for f in os.listdir("uploaded_tasks") if f.endswith(".json")]
if not tasks:
return "No task files uploaded yet."
return "\n".join([f"- {t}" for t in tasks])
task_list = gr.Markdown(list_task_files())
refresh_btn = gr.Button("Refresh List")
# Add results management section
gr.Markdown("### Results Management")
# Function to list existing result files
def list_result_files():
results = [f for f in os.listdir(".") if f.endswith("_human_results.json")]
if not results:
return "No result files available yet."
result_links = []
for r in results:
# Calculate completion stats
try:
with open(r, "r") as f:
result_data = json.load(f)
annotation_count = len(result_data.get("annotations", []))
task_name = result_data.get("task_name", "Unknown")
result_links.append(f"- {r} ({annotation_count} annotations for {task_name})")
except:
result_links.append(f"- {r}")
return "\n".join(result_links)
results_list = gr.Markdown(list_result_files())
download_results_btn = gr.Button("Download Results")
# Right side - will contain the actual interface
with gr.Column(scale=2):
task_container = gr.HTML()
loaded_task_info = gr.JSON(label="Loaded Task Information", visible=False)
# Handle file upload and storage
def handle_upload(file):
if not file:
return "Please upload a task file", task_list.value, task_container.value, loaded_task_info.value
try:
# Create directory if it doesn't exist
os.makedirs("uploaded_tasks", exist_ok=True)
# Read the uploaded file
with open(file.name, "r") as f:
task_data = json.load(f)
# Validate task format
if "task_name" not in task_data or "samples" not in task_data:
return "Invalid task file format. Must contain 'task_name' and 'samples' fields.", task_list.value, task_container.value, loaded_task_info.value
# Save to a consistent location
task_filename = f"uploaded_tasks/{task_data['task_name']}_task.json"
with open(task_filename, "w") as f:
json.dump(task_data, f, indent=2)
# Show task info
task_info = {
"task_name": task_data["task_name"],
"samples": len(task_data["samples"]),
"file_path": task_filename
}
return f"Task '{task_data['task_name']}' uploaded successfully with {len(task_data['samples'])} samples. Please refresh the app and use the Demo tab to evaluate it.", list_task_files(), f"""
<div style="padding: 20px; background-color: #f0f0f0; border-radius: 10px;">
<h3>Task uploaded successfully!</h3>
<p>Task Name: {task_data['task_name']}</p>
<p>Samples: {len(task_data['samples'])}</p>
<p>To evaluate this task:</p>
<ol>
<li>Refresh the app</li>
<li>The Demo tab will now use your uploaded task</li>
<li>Complete your evaluations</li>
<li>Results will be saved as {task_data['task_name']}_human_results.json</li>
</ol>
</div>
""", task_info
except Exception as e:
return f"Error processing task file: {str(e)}", task_list.value, task_container.value, loaded_task_info.value
# Function to prepare results for download
def prepare_results_for_download():
results = [f for f in os.listdir(".") if f.endswith("_human_results.json")]
if not results:
return None
# Create a zip file with all results
import zipfile
zip_path = "mteb_human_eval_results.zip"
with zipfile.ZipFile(zip_path, 'w') as zipf:
for r in results:
zipf.write(r)
return zip_path
# Connect events
load_btn.click(handle_upload, inputs=[file_input], outputs=[message, task_list, task_container, loaded_task_info])
refresh_btn.click(list_task_files, outputs=[task_list])
download_results_btn.click(prepare_results_for_download, outputs=[gr.File(label="Download Results")])
with gr.TabItem("Results Management"):
gr.Markdown("""
## Manage Evaluation Results
View, download, and analyze your evaluation results.
""")
# Function to load and display result stats
def get_result_stats():
results = [f for f in os.listdir(".") if f.endswith("_human_results.json")]
if not results:
return "No result files available yet."
stats = []
for r in results:
try:
with open(r, "r") as f:
result_data = json.load(f)
task_name = result_data.get("task_name", "Unknown")
annotations = result_data.get("annotations", [])
annotation_count = len(annotations)
# Calculate completion percentage
sample_ids = set(a.get("sample_id") for a in annotations)
# Try to get the total sample count from the corresponding task file
total_samples = 0
# Try uploaded_tasks directory first
task_file = f"uploaded_tasks/{task_name}_task.json"
if os.path.exists(task_file):
with open(task_file, "r") as f:
task_data = json.load(f)
total_samples = len(task_data.get("samples", []))
else:
# Try human_eval file in current directory
task_file = f"{task_name}_human_eval.json"
if os.path.exists(task_file):
with open(task_file, "r") as f:
task_data = json.load(f)
total_samples = len(task_data.get("samples", []))
completion = f"{len(sample_ids)}/{total_samples}" if total_samples else f"{len(sample_ids)} samples"
stats.append(f"### {task_name}\n- Annotations: {annotation_count}\n- Completion: {completion}\n- File: {r}")
except Exception as e:
stats.append(f"### {r}\n- Error loading results: {str(e)}")
return "\n\n".join(stats)
result_stats = gr.Markdown(get_result_stats())
refresh_results_btn = gr.Button("Refresh Results")
# Add download options
with gr.Row():
download_all_btn = gr.Button("Download All Results (ZIP)")
result_select = gr.Dropdown(choices=[f for f in os.listdir(".") if f.endswith("_human_results.json")], label="Select Result to Download")
download_selected_btn = gr.Button("Download Selected")
# Connect events
refresh_results_btn.click(get_result_stats, outputs=[result_stats])
# Function to prepare all results for download as ZIP
def prepare_all_results():
import zipfile
zip_path = "mteb_human_eval_results.zip"
with zipfile.ZipFile(zip_path, 'w') as zipf:
for r in [f for f in os.listdir(".") if f.endswith("_human_results.json")]:
zipf.write(r)
return zip_path
# Function to return a single result file
def get_selected_result(filename):
if not filename:
return None
if os.path.exists(filename):
return filename
return None
# Update dropdown when refreshing results
def update_result_dropdown():
return gr.Dropdown.update(choices=[f for f in os.listdir(".") if f.endswith("_human_results.json")])
refresh_results_btn.click(update_result_dropdown, outputs=[result_select])
download_all_btn.click(prepare_all_results, outputs=[gr.File(label="Download All Results")])
download_selected_btn.click(get_selected_result, inputs=[result_select], outputs=[gr.File(label="Download Selected Result")])
if __name__ == "__main__":
demo.launch()