Maharshi Gor
Updated workflow APIs, code clean up and minor functions for hf pipeline support
f064c62
import glob | |
import json | |
import logging | |
import os | |
import re | |
import traceback | |
from datetime import datetime, timedelta, timezone | |
import gradio as gr | |
import yaml | |
from loguru import logger | |
from app_configs import DAILY_SUBMISSION_LIMIT_PER_USER | |
from display.formatting import styled_error, styled_message | |
from envs import API, EVAL_REQUESTS_PATH, EXAMPLES_PATH, OWNER, QUEUE_REPO | |
from shared.workflows.structs import TossupWorkflow, Workflow | |
from submission.structs import CompetitionType, Submission, SubmissionStatus | |
def get_user_submissions(username: str, competition_type: str, pattern: str = None) -> list[Submission]: | |
"""Get all submissions for a user.""" | |
out_dir = f"{EVAL_REQUESTS_PATH}/{username}" | |
submissions = [] | |
if not os.path.exists(out_dir): | |
return submissions | |
for file in os.listdir(out_dir): | |
if not file.startswith(f"{competition_type}_"): | |
continue | |
if pattern is not None and pattern not in file: | |
continue | |
try: | |
with open(os.path.join(out_dir, file), "r") as f: | |
submission = Submission.from_dict(json.load(f)) | |
submissions.append(submission) | |
except Exception as e: | |
logger.error(f"Error loading submission {file}: {e}") | |
return submissions | |
def get_user_submission_names(competition_type: str, profile: gr.OAuthProfile | None) -> list[str]: | |
"""Get all submission model names for a user.""" | |
if profile is None: | |
logger.info("No user profile provided. Returning empty list.") | |
return [] | |
submissions = get_user_submissions(profile.username, competition_type) | |
return [f"{s.username}/{s.model_name}" for s in submissions] | |
def get_demo_example_submissions(competition_type: str) -> list[str]: | |
"""Get all submissions for a demo example.""" | |
examples_dir = f"{EXAMPLES_PATH}/{competition_type}" | |
return [f"{OWNER}/{os.path.basename(f).removesuffix('.yaml')}" for f in glob.glob(f"{examples_dir}/*.yaml")] | |
def get_user_submissions_by_date( | |
username: str, competition_type: str, year: int, month: int, day: int | |
) -> dict[str, list[Submission]]: | |
"""Get all submissions for a user for a given competition type.""" | |
date_str = datetime(year, month, day).strftime("%Y%m%d") | |
out_dir = f"{EVAL_REQUESTS_PATH}/{username}" | |
if not os.path.exists(out_dir): | |
return {} | |
submissions = [] | |
for file in os.listdir(out_dir): | |
if file.startswith(f"{competition_type}_") and date_str in file: | |
try: | |
submission = Submission.from_dict(json.load(open(os.path.join(out_dir, file)))) | |
submissions.append(submission) | |
except Exception as e: | |
logger.exception(e) | |
logger.warning(f"Error loading submission {file}: {e}") | |
return submissions | |
def get_user_submissions_today(username: str, competition_type: str) -> list[Submission]: | |
"""Get all submissions for a user for a given competition type.""" | |
today = datetime.now(timezone.utc) | |
return get_user_submissions_by_date(username, competition_type, today.year, today.month, today.day) | |
def get_time_until_next_submission(tz: timezone = timezone.utc) -> str: | |
next_day_00 = datetime.now(tz) + timedelta(days=1) | |
next_day_00 = next_day_00.replace(hour=0, minute=0, second=0, microsecond=0) | |
remaining_time = next_day_00 - datetime.now(tz) | |
hours = remaining_time.seconds // 3600 | |
minutes = (remaining_time.seconds % 3600) // 60 | |
remaining_time_str = f"{hours} hours {minutes} mins" | |
return remaining_time_str | |
def create_workflow_submission( | |
username: str, | |
model_name: str, | |
description: str, | |
workflow: Workflow, | |
competition_type: CompetitionType, | |
) -> Submission: | |
""" | |
Create a submission for a tossup model. | |
Args: | |
name: Display name of the submission | |
description: Detailed description of what the submission does | |
user_email: Email of the user who created the submission | |
workflow: The workflow configuration for the tossup model | |
Returns: | |
Submission object if successful, None if validation fails | |
""" | |
# Create the submission | |
dt = datetime.now(timezone.utc) | |
submission = Submission( | |
id=f"{competition_type}__{dt.strftime('%Y%m%d_%H%M%S')}__{username}__{model_name.lower().replace(' ', '_')}", | |
model_name=model_name, | |
username=username, | |
description=description, | |
competition_type=competition_type, | |
submission_type="simple_workflow", | |
workflow=workflow, | |
status="submitted", | |
created_at=dt.isoformat(), | |
updated_at=dt.isoformat(), | |
) | |
return submission | |
def create_hf_submission( | |
username: str, | |
model_name: str, | |
description: str, | |
competition_type: CompetitionType, | |
) -> Submission: | |
""" | |
Create a submission for a tossup model. | |
Args: | |
username: Username of the user who created the submission | |
model_name: Name of the model | |
description: Detailed description of what the submission does | |
competition_type: Type of competition | |
Returns: | |
Submission object if successful, None if validation fails | |
""" | |
# Create the submission | |
dt = datetime.now(timezone.utc) | |
submission = Submission( | |
id=f"{competition_type}__hf__{dt.strftime('%Y%m%d_%H%M%S')}__{username}__{model_name.lower().replace(' ', '_')}", | |
model_name=model_name, | |
username=username, | |
description=description, | |
competition_type=competition_type, | |
submission_type="hf_pipeline", | |
status="submitted", | |
created_at=dt.isoformat(), | |
updated_at=dt.isoformat(), | |
) | |
return submission | |
def validate_model_name(model_name: str): | |
# check if model_name has no white spaces, no special characters apart from _ and - | |
if " " in model_name: | |
return False, "Model name cannot contain spaces." | |
if not re.match(r"^[a-zA-Z0-9_-]+$", model_name): | |
return False, "Model name can only contain letters, numbers, underscores, and hyphens." | |
if not re.match(r"^[a-zA-Z]", model_name): | |
return False, "Model name must start with a letter." | |
return True, "" | |
def submit_model( | |
model_name: str, | |
description: str, | |
workflow: Workflow, | |
competition_type: CompetitionType, | |
profile: gr.OAuthProfile | None, | |
) -> str: | |
""" | |
Submit a tossup model for evaluation. | |
Args: | |
name: Display name of the submission | |
description: Detailed description of what the submission does | |
user_email: Email of the user who created the submission | |
workflow: The workflow configuration for the tossup model | |
Returns: | |
Status message | |
""" | |
if profile is None: | |
return styled_error("Authentication required. Please log in first to submit your model.") | |
username = profile.username | |
if len(get_user_submissions_today(username, competition_type)) >= DAILY_SUBMISSION_LIMIT_PER_USER: | |
time_str = get_time_until_next_submission() | |
return styled_error( | |
f"Daily submission limit of {DAILY_SUBMISSION_LIMIT_PER_USER} reached. Please try again in \n {time_str}." | |
) | |
if f"{username}/{model_name}" in get_user_submission_names(competition_type, profile): | |
return styled_error( | |
f"Submission Error!<br>'{model_name}' already exists. Please use a different name for your submission." | |
) | |
is_valid, error_msg = validate_model_name(model_name) | |
if not is_valid: | |
return styled_error(f"Submission Error! Invalid model name '{model_name}'.<br>{error_msg}") | |
try: | |
submission = create_workflow_submission( | |
username=username, | |
model_name=model_name, | |
description=description, | |
workflow=workflow, | |
competition_type=competition_type, | |
) | |
# Convert to dictionary format | |
submission_dict = submission.to_dict() | |
# Create output directory path | |
out_dir = f"{EVAL_REQUESTS_PATH}/{username}" | |
out_path = f"{out_dir}/{submission.id}.json" | |
# Upload to HuggingFace dataset | |
API.upload_file( | |
path_or_fileobj=json.dumps(submission_dict, indent=2).encode(), | |
path_in_repo=out_path.split("eval-queue/")[1], | |
repo_id=QUEUE_REPO, | |
repo_type="dataset", | |
commit_message=f"Add tossup submission {submission.id}", | |
) | |
return styled_message( | |
f"Successfully submitted tossup model!<br>" | |
f"Submission name: {username}/{model_name}<br>" | |
f"Please wait for up to an hour for the model to show in the PENDING list." | |
) | |
except Exception as e: | |
traceback.print_exc() | |
return styled_error(f"Error submitting model: {str(e)}") | |
def load_demo_example(model_name: str, competition_type: CompetitionType) -> Workflow | TossupWorkflow: | |
"""Load a demo example submission.""" | |
examples_dir = f"{EXAMPLES_PATH}/{competition_type}" | |
filepath = f"{examples_dir}/{model_name}.yaml" | |
if not os.path.exists(filepath): | |
raise ValueError(f"Demo example file {filepath} not found") | |
with open(filepath, "r") as f: | |
yaml_data = yaml.safe_load(f) | |
if competition_type == "tossup": | |
return TossupWorkflow.model_validate(yaml_data) | |
else: | |
return Workflow.model_validate(yaml_data) | |
def load_submission(model_name: str, competition_type: CompetitionType, profile: gr.OAuthProfile | None) -> Submission: | |
if profile is None: | |
logging.error("Authentication required. Please log in to view your submissions.") | |
return styled_error("Authentication required. Please log in to view your submissions.") | |
username = profile.username | |
submissions = get_user_submissions(username, competition_type, model_name) | |
if len(submissions) == 0: | |
return styled_error(f"Submission {model_name} not found.") | |
return submissions[0] | |
if __name__ == "__main__": | |
# Example usage | |
from shared.workflows.factory import create_quizbowl_simple_step_initial_setup | |
# Create workflow | |
model_step = create_quizbowl_simple_step_initial_setup() | |
model_step.model = "gpt-4" | |
model_step.provider = "openai" | |
model_step.temperature = 0.7 | |
workflow = Workflow( | |
inputs=["question_text"], | |
outputs={"answer": "A.answer", "confidence": "A.confidence"}, | |
steps={"A": model_step}, | |
) | |
# Submit model | |
result = submit_model( | |
model_name="GPT-4 Tossup", | |
description="A simple GPT-4 model for tossup questions", | |
workflow=workflow, | |
competition_type="tossup", | |
) | |
print(result) | |
# %% | |