import gradio as gr
import json
import os
from pathlib import Path
import time
def create_reranking_interface(task_data):
"""Create a Gradio interface for reranking evaluation."""
samples = task_data["samples"]
results = {"task_name": task_data["task_name"], "task_type": "reranking", "annotations": []}
completed_samples = {s["id"]: False for s in samples}
# Load existing results if available
output_path = f"{task_data['task_name']}_human_results.json"
if os.path.exists(output_path):
try:
with open(output_path, "r") as f:
saved_results = json.load(f)
if "annotations" in saved_results:
results["annotations"] = saved_results["annotations"]
# Update completed_samples based on loaded data
for annotation in saved_results["annotations"]:
sample_id = annotation.get("sample_id")
if sample_id and sample_id in completed_samples:
completed_samples[sample_id] = True
except Exception as e:
print(f"Error loading existing results: {e}")
def save_ranking(rankings, sample_id):
"""Save the current set of rankings."""
try:
# Check if all documents have rankings
if not rankings or len(rankings) == 0:
return "⚠️ No rankings provided", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
all_ranked = all(r is not None and r != "" for r in rankings)
if not all_ranked:
return "⚠️ Please assign a rank to all documents before submitting", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Convert rankings to integers with better error handling
try:
processed_rankings = [int(r) for r in rankings]
except ValueError:
return "⚠️ Invalid ranking value. Please use only numbers.", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Check for duplicate rankings
if len(set(processed_rankings)) != len(processed_rankings):
return "⚠️ Each document must have a unique rank. Please review your rankings.", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Store this annotation in memory
existing_idx = next((i for i, a in enumerate(results["annotations"]) if a["sample_id"] == sample_id), None)
if existing_idx is not None:
results["annotations"][existing_idx] = {
"sample_id": sample_id,
"rankings": processed_rankings
}
else:
results["annotations"].append({
"sample_id": sample_id,
"rankings": processed_rankings
})
completed_samples[sample_id] = True
# Always save to file for redundancy
try:
output_path = f"{task_data['task_name']}_human_results.json"
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
return f"✅ Rankings saved successfully", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
except Exception as file_error:
# If file saving fails, still mark as success since we saved in memory
print(f"File save error: {file_error}")
return f"✅ Rankings saved in memory (file save failed)", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
except Exception as e:
# Return specific error message
print(f"Save ranking error: {e}")
return f"Error: {str(e)}", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
with gr.Blocks(theme=gr.themes.Soft()) as demo:
# Header section with title and progress indicators
with gr.Row(equal_height=True):
with gr.Column(scale=3):
gr.Markdown(f"# {task_data['task_name']} - Human Reranking Evaluation")
with gr.Column(scale=1):
progress_text = gr.Textbox(
label="Progress",
value=f"Progress: 0/{len(samples)}",
interactive=False
)
# Instructions in a collapsible section
with gr.Accordion("📋 Task Instructions", open=False):
gr.Markdown("""
## Task Instructions
{instructions}
### How to use this interface:
1. Read the query at the top
2. Review each document carefully
3. Assign a rank to each document (1 = most relevant, higher numbers = less relevant)
- Use the dropdown menus to select ranks
4. Each document must have a unique rank
5. Click "Submit Rankings" to save rankings for the current query
6. Use "Previous" and "Next" to navigate between queries
7. Your rankings are automatically saved when you submit or navigate (if auto-save is enabled)
8. Click "Save All Results" periodically to ensure all your work is saved to disk
**Button Explanations:**
- **Submit Rankings**: Saves rankings for the CURRENT query only
- **Save All Results**: Saves ALL submitted rankings to a file on disk
- **Auto-save**: When enabled, automatically saves rankings when navigating between queries
""".format(instructions=task_data.get("instructions", "Rank documents by their relevance to the query.")))
# Hidden state variables
current_sample_id = gr.State(value=samples[0]["id"])
auto_save_enabled = gr.State(value=True)
# Status and control section
with gr.Row(equal_height=True):
with gr.Column(scale=3):
status_box = gr.Textbox(
label="Status",
value="Ready to start evaluation",
interactive=False
)
with gr.Column(scale=1):
auto_save_toggle = gr.Checkbox(
label="Auto-save when navigating",
value=True
)
# Main content area
with gr.Group():
# Query section with clear visual distinction
with gr.Box():
gr.Markdown("## 📝 Query")
query_text = gr.Textbox(
value=samples[0]["query"],
label="",
interactive=False,
elem_classes=["query-text"]
)
# Documents section with improved layout
gr.Markdown("## 📄 Documents to Rank")
# Container for documents and rankings
doc_containers = []
ranking_inputs = []
validation_indicators = []
# Create a clean header with explanatory labels for quick ranking tools
gr.Markdown("### Quick Ranking Tools", elem_classes=["tools-header"])
with gr.Box(elem_classes=["tools-container"]):
with gr.Row(equal_height=True):
with gr.Column(scale=4):
sequential_btn = gr.Button("Rank 1,2,3... (Sequential)", variant="secondary")
with gr.Column(scale=4):
reverse_btn = gr.Button("Rank n,n-1... (Reverse)", variant="secondary")
with gr.Column(scale=3):
clear_btn = gr.Button("Clear All Rankings", variant="secondary")
with gr.Row():
gr.Markdown("Use these buttons to quickly assign rankings to all documents at once.", elem_classes=["tools-help"])
# Make document textboxes much smaller
with gr.Box():
for i, doc in enumerate(samples[0]["candidates"]):
row_class = "document-row-even" if i % 2 == 0 else "document-row-odd"
with gr.Row(equal_height=True, elem_classes=["document-row", row_class]):
with gr.Column(scale=1, min_width=50):
gr.HTML(f"
{i+1}
")
with gr.Column(scale=7):
doc_box = gr.Textbox(
value=doc,
label=f"Document {i+1}",
interactive=False,
elem_classes=["document-text"],
lines=2, # Reduce to only 2 visible lines
)
doc_containers.append(doc_box)
with gr.Column(scale=2):
# Dropdown for ranking
rank_input = gr.Dropdown(
choices=[str(j) for j in range(1, len(samples[0]["candidates"])+1)],
label=f"Rank",
value="",
elem_classes=["rank-dropdown"]
)
ranking_inputs.append(rank_input)
with gr.Column(scale=2):
# Validation indicator
validation = gr.HTML(value="")
validation_indicators.append(validation)
# Navigation and submission controls
with gr.Row(equal_height=True):
prev_btn = gr.Button("← Previous Query", size="sm")
submit_btn = gr.Button("Submit Rankings", size="lg", variant="primary")
next_btn = gr.Button("Next Query →", size="sm")
# Save results button
with gr.Row():
save_btn = gr.Button("💾 Save All Results", variant="secondary", size="sm")
results_info = gr.HTML(value=f"Results will be saved to {task_data['task_name']}_human_results.json
")
# CSS for styling
gr.HTML("""
""")
def validate_rankings(*rankings):
"""Simplified validation with less HTML for better performance."""
results = []
all_valid = True
for rank in rankings:
if rank is None or rank == "":
# Use simpler HTML with less styling for faster rendering
results.append("⚠️ Missing")
all_valid = False
else:
# Use simpler HTML with less styling for faster rendering
results.append("✓ Rank " + str(rank))
return results + [all_valid] # Return validation indicators and validity flag
def on_ranking_change(*rankings):
"""Simplified validation for better performance."""
validation_results = validate_rankings(*rankings)
return validation_results[:-1] # Return only the validation indicators
def submit_rankings(*args):
"""Submit rankings with more efficient validation."""
# Get the last argument (sample_id) and the rankings
if len(args) < 1:
return "Error: No arguments provided", progress_text.value
# Verify we have enough rankings
if len(args) < len(ranking_inputs) + 1:
return "Error: Not enough ranking inputs provided", progress_text.value
sample_id = args[-1]
rankings = args[:len(ranking_inputs)]
# First validate the rankings
validation_results = validate_rankings(*rankings)
all_valid = validation_results[-1] # Last item is validity flag
validation_indicators_values = validation_results[:-1] # Remove validity flag
# Update validation indicators - less frequently
# Only update if really needed
for i, result in enumerate(validation_indicators_values):
if i < len(validation_indicators):
validation_indicators[i].update(value=result)
# Check for duplicate rankings
if all_valid:
try:
processed_rankings = [int(r) for r in rankings]
if len(set(processed_rankings)) != len(processed_rankings):
dup_ranks = {}
for i, r in enumerate(processed_rankings):
if r in dup_ranks:
dup_ranks[r].append(i)
else:
dup_ranks[r] = [i]
# Use simpler HTML for duplicate messages
for rank, indices in dup_ranks.items():
if len(indices) > 1:
for idx in indices:
if idx < len(validation_indicators):
validation_indicators[idx].update(
value=f"⚠️ Duplicate {rank}"
)
return "⚠️ Each document must have a unique rank. Please fix duplicate rankings.", progress_text.value
except:
pass
# If not all valid, return error message
if not all_valid:
return "⚠️ Please assign a rank to all documents before submitting", progress_text.value
# Save the validated rankings
status, progress = save_ranking(rankings, sample_id)
# Provide clear success feedback - with simpler HTML
if "✅" in status:
for i in range(len(validation_indicators)):
validation_indicators[i].update(
value="✓ Saved"
)
return status, progress
def load_sample(sample_id):
"""Load a specific sample into the interface."""
sample = next((s for s in samples if s["id"] == sample_id), None)
if not sample:
return [query_text.value] + [d.value for d in doc_containers] + [""] * len(ranking_inputs) + [""] * len(validation_indicators) + [sample_id, progress_text.value, status_box.value]
# Update query
new_query = sample["query"]
# Update documents
new_docs = []
for i, doc in enumerate(sample["candidates"]):
if i < len(doc_containers):
new_docs.append(doc)
# Initialize rankings
new_rankings = [""] * len(ranking_inputs)
# Check if this sample has already been annotated
existing_annotation = next((a for a in results["annotations"] if a["sample_id"] == sample_id), None)
if existing_annotation:
# Restore previous rankings
for i, rank in enumerate(existing_annotation["rankings"]):
if i < len(new_rankings) and rank is not None:
new_rankings[i] = str(rank)
# Update progress
current_idx = samples.index(sample)
new_progress = f"Progress: {sum(completed_samples.values())}/{len(samples)}"
new_status = f"Viewing query {current_idx + 1} of {len(samples)}"
if completed_samples[sample_id]:
new_status += " (already completed)"
# Initialize validation indicators
validation_results = validate_rankings(*new_rankings)
validation_indicators_values = validation_results[:-1] # Remove validity flag
return [new_query] + new_docs + new_rankings + validation_indicators_values + [sample_id, new_progress, new_status]
def auto_save_and_navigate(direction, current_id, auto_save, *rankings):
"""Save rankings if auto-save is enabled, then navigate."""
# Extract rankings (remove validation indicators)
actual_rankings = rankings[:len(ranking_inputs)]
# If auto-save is enabled, try to save the current rankings
status_msg = ""
progress_msg = f"Progress: {sum(completed_samples.values())}/{len(samples)}"
if auto_save:
# Only save if all rankings are provided
validation_results = validate_rankings(*actual_rankings)
all_valid = validation_results[-1] # Last item is validity flag
if all_valid:
status_msg, progress_msg = save_ranking(actual_rankings, current_id)
# Navigate to the next/previous sample
if direction == "next":
new_id = next_sample(current_id)
else:
new_id = prev_sample(current_id)
# Return the new sample ID and status message
return new_id, status_msg, progress_msg
def next_sample(current_id):
"""Load the next sample."""
current_sample = next((s for s in samples if s["id"] == current_id), None)
if not current_sample:
return current_id
current_idx = samples.index(current_sample)
if current_idx < len(samples) - 1:
next_sample = samples[current_idx + 1]
return next_sample["id"]
return current_id
def prev_sample(current_id):
"""Load the previous sample."""
current_sample = next((s for s in samples if s["id"] == current_id), None)
if not current_sample:
return current_id
current_idx = samples.index(current_sample)
if current_idx > 0:
prev_sample = samples[current_idx - 1]
return prev_sample["id"]
return current_id
def save_results():
"""Save all collected results to a file."""
output_path = f"{task_data['task_name']}_human_results.json"
try:
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
return f"✅ Results saved to {output_path} ({len(results['annotations'])} annotations)"
except Exception as e:
return f"Error saving results: {str(e)}"
# Define functions for the quick ranking buttons
def assign_sequential_ranks():
values = [str(i+1) for i in range(len(samples[0]["candidates"]))]
# Skip validation until all ranks are assigned
return values
def assign_reverse_ranks():
n = len(samples[0]["candidates"])
values = [str(n-i) for i in range(n)]
# Skip validation until all ranks are assigned
return values
def clear_rankings():
values = [""] * len(samples[0]["candidates"])
# Clear validation indicators when clearing rankings
for indicator in validation_indicators:
indicator.update(value="")
return values
# Connect quick ranking buttons
sequential_btn.click(
fn=assign_sequential_ranks,
inputs=None,
outputs=ranking_inputs
)
reverse_btn.click(
fn=assign_reverse_ranks,
inputs=None,
outputs=ranking_inputs
)
clear_btn.click(
fn=clear_rankings,
inputs=None,
outputs=ranking_inputs
)
# Wire up events (Gradio 3.x syntax)
submit_btn.click(
fn=submit_rankings,
inputs=ranking_inputs + [current_sample_id],
outputs=[status_box, progress_text]
)
# Auto-save and navigate events
def handle_next(current_id, auto_save, *rankings):
# First, handle auto-save - only if needed
if auto_save and any(r != "" for r in rankings):
new_id, status, progress = auto_save_and_navigate("next", current_id, auto_save, *rankings)
else:
new_id = next_sample(current_id)
status, progress = "", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Then, load the new sample with minimal validation
outputs = load_sample(new_id)
# Update only status and progress if needed
if status:
outputs[-2] = progress
outputs[-1] = status
return outputs
def handle_prev(current_id, auto_save, *rankings):
# First, handle auto-save - only if needed
if auto_save and any(r != "" for r in rankings):
new_id, status, progress = auto_save_and_navigate("prev", current_id, auto_save, *rankings)
else:
new_id = prev_sample(current_id)
status, progress = "", f"Progress: {sum(completed_samples.values())}/{len(samples)}"
# Then, load the new sample with minimal validation
outputs = load_sample(new_id)
# Update only status and progress if needed
if status:
outputs[-2] = progress
outputs[-1] = status
return outputs
# Connect navigation with Gradio 3.x syntax
next_btn.click(
fn=handle_next,
inputs=[current_sample_id, auto_save_toggle] + ranking_inputs,
outputs=[query_text] + doc_containers + ranking_inputs + validation_indicators + [current_sample_id, progress_text, status_box]
)
prev_btn.click(
fn=handle_prev,
inputs=[current_sample_id, auto_save_toggle] + ranking_inputs,
outputs=[query_text] + doc_containers + ranking_inputs + validation_indicators + [current_sample_id, progress_text, status_box]
)
# Connect save button
save_btn.click(
fn=save_results,
inputs=None,
outputs=[status_box]
)
# Connect auto-save toggle
def update_auto_save(enabled):
return enabled
auto_save_toggle.change(
fn=update_auto_save,
inputs=[auto_save_toggle],
outputs=[auto_save_enabled]
)
# Reduce frequency of validation
# Only connect validation to the first ranking input to reduce event handlers
ranking_inputs[0].change(
fn=on_ranking_change,
inputs=ranking_inputs,
outputs=validation_indicators
)
# Helper function for ranking - sort documents by rankings
def rank_by_relevance(*args):
"""Sorts the documents by their current rankings for a clearer view."""
# Last argument is sample_id
sample_id = args[-1]
rankings = args[:-1]
# Check if we have valid rankings
valid_rankings = []
for i, r in enumerate(rankings):
if r is not None and r != "":
try:
valid_rankings.append((i, int(r)))
except:
pass
# If we don't have enough valid rankings, do nothing
if len(valid_rankings) < 2:
return [status_box.value]
# Sort by rank
valid_rankings.sort(key=lambda x: x[1])
# Generate message showing the ranking order
result = "Current ranking order:
"
for idx, _ in valid_rankings:
doc_text = doc_containers[idx].value
# Truncate if too long
if len(doc_text) > 100:
doc_text = doc_text[:97] + "..."
result += f"- Doc {idx+1}: {doc_text}
"
result += "
"
return [result]
return demo
# Main app with file upload capability and improved task management
def create_main_app():
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# MTEB Human Evaluation Demo")
task_container = gr.HTML()
loaded_task_info = gr.JSON(label="Loaded Task Information", visible=False)
# CSS for consistent styling throughout the app
gr.HTML("""
""")
tabs = gr.Tabs()
with tabs:
with gr.TabItem("Demo"):
gr.Markdown("""
## MTEB Human Evaluation Interface
This interface allows you to evaluate the relevance of documents for reranking tasks.
""", elem_classes=["section-header"])
# Function to get the most recent task file
def get_latest_task_file():
# Check first in uploaded_tasks directory
os.makedirs("uploaded_tasks", exist_ok=True)
uploaded_tasks = [f for f in os.listdir("uploaded_tasks") if f.endswith(".json")]
if uploaded_tasks:
# Sort by modification time, newest first
uploaded_tasks.sort(key=lambda x: os.path.getmtime(os.path.join("uploaded_tasks", x)), reverse=True)
task_path = os.path.join("uploaded_tasks", uploaded_tasks[0])
# Verify this is a valid task file
try:
with open(task_path, "r") as f:
task_data = json.load(f)
if "task_name" in task_data and "samples" in task_data:
return task_path
except:
pass
# Look for task files in the current directory
current_dir_tasks = [f for f in os.listdir(".") if f.endswith("_human_eval.json")]
if current_dir_tasks:
# Sort by modification time, newest first
current_dir_tasks.sort(key=lambda x: os.path.getmtime(x), reverse=True)
return current_dir_tasks[0]
# Fall back to fixed example if available
if os.path.exists("AskUbuntuDupQuestions_human_eval.json"):
return "AskUbuntuDupQuestions_human_eval.json"
# No valid task file found
return None
# Load the task file
task_file = get_latest_task_file()
with gr.Box(elem_classes=["content-box"]):
if task_file:
try:
with open(task_file, "r") as f:
task_data = json.load(f)
# Show which task is currently loaded
gr.Markdown(f"**Current Task: {task_data['task_name']}** ({len(task_data['samples'])} samples)")
# Display the interface
demo = create_reranking_interface(task_data)
task_container.update(value=f"Task loaded: {task_file}
")
except Exception as e:
gr.Markdown(f"**Error loading task: {str(e)}**", elem_classes=["status-message"])
gr.Markdown("Please upload a valid task file in the 'Upload & Evaluate' tab.")
else:
gr.Markdown("**No task file found**", elem_classes=["status-message"])
gr.Markdown("Please upload a valid task file in the 'Upload & Evaluate' tab.")
with gr.TabItem("Upload & Evaluate"):
gr.Markdown("""
## Upload Your Own Task File
If you have a prepared task file, you can upload it here to create an evaluation interface.
""", elem_classes=["section-header"])
with gr.Row():
with gr.Column(scale=1):
with gr.Box(elem_classes=["content-box"]):
file_input = gr.File(label="Upload a task file (JSON)")
load_btn = gr.Button("Load Task", variant="primary")
message = gr.Textbox(label="Status", interactive=False, elem_classes=["status-message"])
# Add task list for previously uploaded tasks
with gr.Box(elem_classes=["content-box"]):
gr.Markdown("### Previous Uploads", elem_classes=["section-header"])
# Function to list existing task files in the tasks directory
def list_task_files():
os.makedirs("uploaded_tasks", exist_ok=True)
tasks = [f for f in os.listdir("uploaded_tasks") if f.endswith(".json")]
if not tasks:
return "No task files uploaded yet."
return "\n".join([f"- {t}" for t in tasks])
task_list = gr.Markdown(list_task_files())
refresh_btn = gr.Button("Refresh List")
# Add results management section
with gr.Box(elem_classes=["content-box"]):
gr.Markdown("### Results Management", elem_classes=["section-header"])
# Function to list existing result files
def list_result_files():
results = [f for f in os.listdir(".") if f.endswith("_human_results.json")]
if not results:
return "No result files available yet."
result_links = []
for r in results:
# Calculate completion stats
try:
with open(r, "r") as f:
result_data = json.load(f)
annotation_count = len(result_data.get("annotations", []))
task_name = result_data.get("task_name", "Unknown")
result_links.append(f"- {r} ({annotation_count} annotations for {task_name})")
except:
result_links.append(f"- {r}")
return "\n".join(result_links)
results_list = gr.Markdown(list_result_files())
download_results_btn = gr.Button("Download Results")
# Handle file upload and storage
def handle_upload(file):
if not file:
return "Please upload a task file", task_list.value, ""
try:
# Create directory if it doesn't exist
os.makedirs("uploaded_tasks", exist_ok=True)
# Read the uploaded file
with open(file.name, "r") as f:
task_data = json.load(f)
# Validate task format
if "task_name" not in task_data or "samples" not in task_data:
return "Invalid task file format. Must contain 'task_name' and 'samples' fields.", task_list.value, ""
# Save to a consistent location
task_filename = f"uploaded_tasks/{task_data['task_name']}_task.json"
with open(task_filename, "w") as f:
json.dump(task_data, f, indent=2)
return f"✅ Task '{task_data['task_name']}' uploaded successfully with {len(task_data['samples'])} samples. Please refresh the app and use the Demo tab to evaluate it.", list_task_files(), f"""
Task uploaded successfully!
Task Name: {task_data['task_name']}
Samples: {len(task_data['samples'])}
To evaluate this task:
- Refresh the app
- The Demo tab will now use your uploaded task
- Complete your evaluations
- Results will be saved as {task_data['task_name']}_human_results.json
"""
except Exception as e:
return f"⚠️ Error processing task file: {str(e)}", task_list.value, ""
# Function to prepare results for download
def prepare_results_for_download():
results = [f for f in os.listdir(".") if f.endswith("_human_results.json")]
if not results:
return None
# Create a zip file with all results
import zipfile
zip_path = "mteb_human_eval_results.zip"
with zipfile.ZipFile(zip_path, 'w') as zipf:
for r in results:
zipf.write(r)
return zip_path
# Connect events
load_btn.click(
fn=handle_upload,
inputs=[file_input],
outputs=[message, task_list, task_container]
)
refresh_btn.click(
fn=list_task_files,
inputs=None,
outputs=[task_list]
)
download_results_btn.click(
fn=prepare_results_for_download,
inputs=None,
outputs=[gr.File(label="Download Results")]
)
with gr.TabItem("Results Management"):
gr.Markdown("""
## Manage Evaluation Results
View, download, and analyze your evaluation results.
""", elem_classes=["section-header"])
# Function to load and display result stats
def get_result_stats():
results = [f for f in os.listdir(".") if f.endswith("_human_results.json")]
if not results:
return "No result files available yet."
stats = []
for r in results:
try:
with open(r, "r") as f:
result_data = json.load(f)
task_name = result_data.get("task_name", "Unknown")
annotations = result_data.get("annotations", [])
annotation_count = len(annotations)
# Calculate completion percentage
sample_ids = set(a.get("sample_id") for a in annotations)
# Try to get the total sample count from the corresponding task file
total_samples = 0
# Try uploaded_tasks directory first
task_file = f"uploaded_tasks/{task_name}_task.json"
if os.path.exists(task_file):
with open(task_file, "r") as f:
task_data = json.load(f)
total_samples = len(task_data.get("samples", []))
else:
# Try human_eval file in current directory
task_file = f"{task_name}_human_eval.json"
if os.path.exists(task_file):
with open(task_file, "r") as f:
task_data = json.load(f)
total_samples = len(task_data.get("samples", []))
completion = f"{len(sample_ids)}/{total_samples}" if total_samples else f"{len(sample_ids)} samples"
stats.append(f"### {task_name}\n- Annotations: {annotation_count}\n- Completion: {completion}\n- File: {r}")
except Exception as e:
stats.append(f"### {r}\n- Error loading results: {str(e)}")
return "\n\n".join(stats)
with gr.Box(elem_classes=["content-box"]):
result_stats = gr.Markdown(get_result_stats())
refresh_results_btn = gr.Button("Refresh Results", variant="secondary")
# Add download options
with gr.Box(elem_classes=["content-box"]):
gr.Markdown("### Download Options", elem_classes=["section-header"])
with gr.Row():
download_all_btn = gr.Button("Download All Results (ZIP)", variant="primary")
result_select = gr.Dropdown(choices=[f for f in os.listdir(".") if f.endswith("_human_results.json")], label="Select Result to Download")
download_selected_btn = gr.Button("Download Selected", variant="secondary")
# Function to prepare all results for download as ZIP
def prepare_all_results():
import zipfile
zip_path = "mteb_human_eval_results.zip"
with zipfile.ZipFile(zip_path, 'w') as zipf:
for r in [f for f in os.listdir(".") if f.endswith("_human_results.json")]:
zipf.write(r)
return zip_path
# Function to return a single result file
def get_selected_result(filename):
if not filename:
return None
if os.path.exists(filename):
return filename
return None
# Update dropdown when refreshing results
def update_result_dropdown():
return gr.Dropdown.update(choices=[f for f in os.listdir(".") if f.endswith("_human_results.json")])
# Connect events
refresh_results_btn.click(
fn=get_result_stats,
inputs=None,
outputs=[result_stats]
)
refresh_results_btn.click(
fn=update_result_dropdown,
inputs=None,
outputs=[result_select]
)
download_all_btn.click(
fn=prepare_all_results,
inputs=None,
outputs=[gr.File(label="Download All Results")]
)
download_selected_btn.click(
fn=get_selected_result,
inputs=[result_select],
outputs=[gr.File(label="Download Selected Result")]
)
return app
# Create the app
demo = create_main_app()
if __name__ == "__main__":
demo.launch()