Spaces:
Sleeping
Sleeping
import io | |
import os | |
import re | |
import shutil | |
import pathlib | |
import subprocess | |
from typing import List, Union, Optional | |
from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
import pandas as pd | |
from loguru import logger | |
import gradio as gr | |
from datasets import load_dataset | |
from yourbench_space import PATH | |
STAGES = [ | |
"ingestion", | |
"upload_ingest_to_hub", | |
"summarization", | |
"chunking", | |
"single_shot_question_generation", | |
"multi_hop_question_generation", | |
"lighteval", | |
] | |
STAGE_DISPLAY_MAP = { | |
"ingestion": "Process Input Docs", | |
"upload_ingest_to_hub": "Upload Dataset to Hub", | |
"summarization": "Summarize Documents", | |
"chunking": "Chunk Documents", | |
"single_shot_question_generation": "Generate Single Shot Questions", | |
"multi_hop_question_generation": "Generate Multi Hop Questions", | |
"lighteval": "Generate Lighteval Subset", | |
} | |
def to_commentable_yaml(obj): | |
""" | |
Recursively converts standard Python dicts and lists into | |
ruamel.yaml's CommentedMap and CommentedSeq so that comments | |
can be attached when dumping YAML | |
""" | |
# Convert dict to CommentedMap with recursively processed values | |
if isinstance(obj, dict): | |
return CommentedMap({k: to_commentable_yaml(v) for k, v in obj.items()}) | |
# Convert list to CommentedSeq with recursively processed elements | |
elif isinstance(obj, list): | |
return CommentedSeq([to_commentable_yaml(i) for i in obj]) | |
# Return non-container values as-is | |
return obj | |
def map_stage_names(stages: list[str]) -> list[str]: | |
return [STAGE_DISPLAY_MAP.get(stage, stage) for stage in stages] | |
def is_running_locally() -> bool: | |
""" | |
Returns True if Gradio is running locally, False if it's running in a Hugging Face Space. | |
""" | |
return os.getenv("SPACE_ID") is None # SPACE_ID is set in Hugging Face Spaces | |
def save_files(oauth_token: gr.OAuthToken | None, session_state: gr.State, files: List[pathlib.Path]) -> str: | |
"""Save uploaded files to the UPLOAD_DIRECTORY/uuid safely""" | |
if oauth_token is None and not is_running_locally(): | |
gr.Warning("You need to log in to use this Space") | |
return | |
saved_paths = [] | |
for file in [file.name for file in files]: | |
try: | |
source_path = pathlib.Path(file) | |
upload_directory_uuid = pathlib.Path(f"{PATH}/{session_state.value}/uploaded_files") | |
# Ensure the upload directory exists | |
upload_directory_uuid.mkdir(parents=True, exist_ok=True) | |
destination_path = upload_directory_uuid / source_path.name | |
if not source_path.exists(): | |
print(f"File not found: {source_path}") | |
continue # Skip missing files | |
shutil.move(str(source_path), str(destination_path)) | |
saved_paths.append(str(destination_path)) | |
except Exception as e: | |
print(f"Error moving file {file}: {e}") | |
return f"Files saved to: {', '.join(saved_paths)}" if saved_paths else "No files were saved" | |
def update_dataset(stages: list, hf_org: str, hf_prefix: str, oauth_token: gr.OAuthToken): | |
""" | |
Updates the dataset based on the provided stages and dataset configuration. | |
""" | |
ingestion_df = pd.DataFrame() | |
summarization_df = pd.DataFrame() | |
single_shot_df = pd.DataFrame() | |
multi_hop_df = pd.DataFrame() | |
lighteval_df = pd.DataFrame() | |
# Construct dataset name from config | |
dataset_name = f"{hf_org}/{hf_prefix}" | |
if STAGE_DISPLAY_MAP["upload_ingest_to_hub"] in stages: | |
ingestion_ds = load_dataset( | |
dataset_name, name="ingested", split="train", streaming=True, token=oauth_token.token | |
).select_columns("document_text") | |
ingestion_df = pd.DataFrame(ingestion_ds.take(1)) | |
if STAGE_DISPLAY_MAP["summarization"] in stages: | |
summarization_ds = load_dataset( | |
dataset_name, name="summarized", split="train", streaming=True, token=oauth_token.token | |
).select_columns(["document_summary", "summarization_model"]) | |
summarization_df = pd.DataFrame(summarization_ds.take(5)) | |
if STAGE_DISPLAY_MAP["single_shot_question_generation"] in stages: | |
single_shot_ds = load_dataset( | |
dataset_name, | |
name="single_shot_questions", | |
split="train", | |
streaming=True, | |
token=oauth_token.token, | |
).select_columns(["question", "self_answer", "estimated_difficulty"]) | |
single_shot_df = pd.DataFrame(single_shot_ds.take(5)) | |
if STAGE_DISPLAY_MAP["multi_hop_question_generation"] in stages: | |
multi_hop_ds = load_dataset( | |
dataset_name, | |
name="multi_hop_questions", | |
split="train", | |
streaming=True, | |
token=oauth_token.token, | |
).select_columns(["question", "self_answer", "estimated_difficulty"]) | |
multi_hop_df = pd.DataFrame(multi_hop_ds.take(5)) | |
if STAGE_DISPLAY_MAP["lighteval"] in stages: | |
lighteval_ds = load_dataset( | |
dataset_name, name="lighteval", split="train", streaming=True, token=oauth_token.token | |
).select_columns(["question", "ground_truth_answer", "question_category", "kind"]) | |
lighteval_df = pd.DataFrame(lighteval_ds.take(5)) | |
return (ingestion_df, summarization_df, single_shot_df, multi_hop_df, lighteval_df) | |
def should_enable_eval_tab(stages): | |
logger.info(f"Stages received: {stages}") | |
logger.info(f"Lighteval stage name: {STAGE_DISPLAY_MAP['lighteval']}") | |
return STAGE_DISPLAY_MAP["lighteval"] in stages | |
def on_generation_succsess(stages): | |
stages = stages or [] | |
if STAGE_DISPLAY_MAP["lighteval"] in stages: | |
gr.Success("🌟 Your Dataset is ready for evaluation!") | |
return gr.update(selected=2), gr.update(interactive=True, visible=True) | |
return gr.update(), gr.update(interactive=False, visible=True) | |
class SubprocessManagerGroup: | |
"""Instanciates one manager per user (should be used as a singleton class)""" | |
def __init__(self): | |
self.managers: dict[str, SubprocessManager] = {} | |
def grab_uuid(uid: Union[str, gr.State]): | |
"""If a gradio session state is provided, we pull the uuid from its value - else we assume the str is the uuid""" | |
if isinstance(uid, gr.State): | |
uid = uid.value | |
return uid | |
def create(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid] = SubprocessManager(uid) | |
def get(self, uid: Union[str, "gr.State"]) -> Optional["SubprocessManager"]: | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
return self.managers.get(uid) | |
def remove(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
if manager := self.managers.get(uid): | |
manager.stop_process() | |
manager.clean_workdir() | |
del self.managers[uid] | |
def clean_workdir(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
if manager := self.managers.get(uid): | |
manager.clean_workdir() | |
def start_process(self, uid: Union[str, gr.State], custom_env: dict | None): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].start_process(custom_env=custom_env) | |
def stop_process(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].stop_process() | |
def kill_process(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].kill_process() | |
def read_and_get_output(self, uid: Union[str, gr.State]): | |
if uid is None: | |
return "", [] | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
return self.managers[uid].read_and_get_output() | |
def is_running(self, uid: Union[str, gr.State]) -> bool: | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
if manager := self.managers.get(uid): | |
return manager.is_running() | |
return False | |
class SubprocessManager: | |
def __init__(self, session_uid: str): | |
self.session_uid = session_uid | |
self.path = pathlib.Path(f"{PATH}/{session_uid}") | |
self.path.mkdir(parents=True, exist_ok=True) | |
self.config_path = pathlib.Path(f"{self.path}/config.yml") | |
self.command = ["uv", "run", "yourbench", "run", "--config", str(self.config_path)] | |
self.process = None | |
self.output_stream = io.StringIO() | |
self.exit_code = None | |
def start_process(self, custom_env: dict | None): | |
"""Start the subprocess.""" | |
if self.is_running(): | |
logger.info("Process is already running") | |
return | |
self.output_stream = io.StringIO() | |
self.exit_code = None | |
try: | |
logger.info(f"Starting process with command: {' '.join(self.command)}") | |
self.process = subprocess.Popen( | |
self.command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, # Combine stderr with stdout | |
text=True, | |
bufsize=1, | |
start_new_session=True, | |
env=custom_env, | |
) | |
os.set_blocking(self.process.stdout.fileno(), False) | |
logger.info(f"Started process with PID: {self.process.pid}") | |
except Exception as e: | |
logger.error(f"Failed to start process: {str(e)}") | |
return | |
def read_and_get_output(self): | |
"""Read subprocess output, capture it, and return log and completed stages.""" | |
current_output = "" | |
completed_stages = [] | |
if self.process and self.process.stdout: | |
try: | |
while True: | |
line = self.process.stdout.readline() | |
if line: | |
self.output_stream.write(line) | |
else: | |
break | |
except BlockingIOError: | |
pass | |
current_output = self.output_stream.getvalue() | |
completed_stages = list(set(re.findall(r"Completed stage: '([^']*)'", current_output))) | |
return current_output, map_stage_names(completed_stages) | |
def clean_workdir(self): | |
shutil.rmtree(self.path, ignore_errors=True) | |
def stop_process(self): | |
"""Terminate the subprocess.""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGTERM to the Process") | |
try: | |
self.process.terminate() | |
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate | |
logger.info(f"Process terminated by user with exit code {self.exit_code}") | |
except subprocess.TimeoutExpired: | |
logger.warning("Process did not terminate within timeout, sending SIGKILL") | |
self.kill_process() | |
def kill_process(self): | |
"""Forcefully kill the subprocess""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGKILL to the Process") | |
try: | |
self.process.kill() | |
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed | |
logger.info(f"Process killed by user with exit code {self.exit_code}") | |
except subprocess.TimeoutExpired: | |
logger.error("Process could not be killed within timeout") | |
def is_running(self): | |
"""Check if the subprocess is still running""" | |
if self.process is None: | |
return False | |
return self.process.poll() is None | |
def get_exit_details(self): | |
"""Return exit code and reason if process has terminated""" | |
if self.process is None: | |
return None, "Process was never started" | |
if self.is_running(): | |
return None, "Process is still running" | |
if self.exit_code is not None and self.exit_code != 0: | |
return self.exit_code, "Process exited abnormaly" | |
return self.exit_code, "Process exited normaly" | |
def __del__(self): | |
"""Stop the process when object is deleted""" | |
if self.process: | |
self.process.kill() | |
self.clean_workdir() | |