Spaces:
Sleeping
Sleeping
Clémentine
added a singleton-like class to manage all managers per session, plus session state management. Also fixes secret passing to the new leaderboard space
133c6d8
import io | |
import os | |
import re | |
import pathlib | |
import shutil | |
import subprocess | |
import gradio as gr | |
import pandas as pd | |
from collections import defaultdict | |
from datasets import load_dataset | |
from loguru import logger | |
from typing import List, Union | |
STAGES = [ | |
"ingestion", | |
"upload_ingest_to_hub", | |
"summarization", | |
"chunking", | |
"single_shot_question_generation", | |
"answer_generation", | |
#"evaluate_models", | |
#"create_leaderboard" | |
# "judge_answers", # to uncomment when fixed | |
] | |
def save_files(session_state: gr.State, files: List[pathlib.Path]) -> str: | |
"""Save uploaded files to the UPLOAD_DIRECTORY/uuid safely""" | |
uuid = session_state.value | |
saved_paths = [] | |
for file in files: | |
try: | |
source_path = pathlib.Path(file) | |
upload_directory_uuid = pathlib.Path(f"/app/{uuid}/uploaded_files") | |
# Ensure the upload directory exists | |
upload_directory_uuid.mkdir(parents=True, exist_ok=True) | |
destination_path = upload_directory_uuid / source_path.name | |
if not source_path.exists(): | |
print(f"File not found: {source_path}") | |
continue # Skip missing files | |
shutil.move(str(source_path), str(destination_path)) | |
saved_paths.append(str(destination_path)) | |
except Exception as e: | |
print(f"Error moving file {file}: {e}") | |
return ( | |
f"Files saved to: {', '.join(saved_paths)}" | |
if saved_paths | |
else "No files were saved" | |
) | |
def update_dataset(stages, hf_org, hf_prefix): | |
""" | |
Updates the dataset based on the provided stages and dataset configuration. | |
""" | |
ingestion_df = pd.DataFrame() | |
summarization_df = pd.DataFrame() | |
single_hop_df = pd.DataFrame() | |
answers_df = pd.DataFrame() | |
# Construct dataset name from config | |
dataset_name = f"{hf_org}/{hf_prefix}" | |
if "ingestion" in stages: | |
# TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits) | |
ingestion_ds = load_dataset(dataset_name, name="ingested", split="train").select_columns("document_text") | |
ingestion_df = pd.DataFrame(ingestion_ds[0]) # only one row | |
if "summarization" in stages: | |
summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True).select_columns(['raw_document_summary', 'document_summary', 'summarization_model']) | |
summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(5)]) | |
if "single_shot_question_generation" in stages: | |
single_hop_ds = load_dataset(dataset_name, name="single_shot_question_generation", split="train", streaming=True) | |
single_hop_df = pd.DataFrame([next(iter(single_hop_ds)) for _ in range(5)]) | |
if "answer_generation" in stages: | |
answers_ds = load_dataset(dataset_name, name="answer_generation", split="train", streaming=True) | |
answers_df = pd.DataFrame([next(iter(answers_ds)) for _ in range(5)]) | |
return (ingestion_df, summarization_df, single_hop_df, answers_df) | |
class SubprocessManagerGroup: | |
"""Instanciates one manager per user (should be used as a singleton class)""" | |
def __init__(self): | |
self.managers: dict[str, SubprocessManager] = {} | |
def grab_uuid(uid: Union[str, gr.State]): | |
"""If a gradio session state is provided, we pull the uuid from its value - else we assume the str is the uuid""" | |
if isinstance(uid, gr.State): | |
uid = uid.value | |
return uid | |
def create(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid] = SubprocessManager(uid) | |
def get(self, uid: Union[str, gr.State]) -> "SubprocessManager": | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
return self.managers[uid] | |
def remove(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
del self.managers[uid] | |
def start_process(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].start_process() | |
def stop_process(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].stop_process() | |
def kill_process(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].kill_process() | |
def read_and_get_output(self, uid: Union[str, gr.State]): | |
uid = SubprocessManagerGroup.grab_uuid(uid) | |
self.managers[uid].read_and_get_output() | |
class SubprocessManager: | |
def __init__(self, session_uid: str): | |
self.session_uid = session_uid | |
self.path = pathlib.Path(f"/app/{session_uid}") | |
self.path.mkdir(parents=True, exist_ok=True) | |
self.config_path = pathlib.Path(f"/app/{session_uid}/config.yml") | |
self.command = ["uv", "run", "yourbench", f"--config", self.config_path] | |
self.process = None | |
self.output_stream = io.StringIO() | |
self.exit_code = None | |
def start_process(self, custom_env: dict | None): | |
"""Start the subprocess.""" | |
if self.is_running(): | |
logger.info("Process is already running") | |
return | |
self.output_stream = io.StringIO() | |
self.exit_code = None | |
try: | |
logger.info(f"Starting process with command: {' '.join(self.command)}") | |
self.process = subprocess.Popen( | |
self.command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, # Combine stderr with stdout | |
text=True, | |
bufsize=1, | |
start_new_session=True, | |
env=custom_env, | |
) | |
os.set_blocking(self.process.stdout.fileno(), False) | |
logger.info(f"Started process with PID: {self.process.pid}") | |
except Exception as e: | |
logger.error(f"Failed to start process: {str(e)}") | |
return | |
def read_and_get_output(self): | |
"""Read subprocess output, capture it, and return log and completed stages.""" | |
current_output = "" | |
completed_stages = [] | |
if self.process and self.process.stdout: | |
try: | |
while True: | |
line = self.process.stdout.readline() | |
if line: | |
self.output_stream.write(line) | |
else: | |
break | |
except BlockingIOError: | |
pass | |
current_output = self.output_stream.getvalue() | |
completed_stages = list(set(re.findall(r"Successfully completed stage: (\w+)", current_output))) | |
return current_output, completed_stages | |
def stop_process(self): | |
"""Terminate the subprocess.""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGTERM to the Process") | |
try: | |
self.process.terminate() | |
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate | |
logger.info(f"Process terminated by user with exit code {self.exit_code}") | |
except subprocess.TimeoutExpired: | |
logger.warning("Process did not terminate within timeout, sending SIGKILL") | |
self.kill_process() | |
def kill_process(self): | |
"""Forcefully kill the subprocess""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGKILL to the Process") | |
try: | |
self.process.kill() | |
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed | |
logger.info(f"Process killed by user with exit code {self.exit_code}") | |
except subprocess.TimeoutExpired: | |
logger.error("Process could not be killed within timeout") | |
def is_running(self): | |
"""Check if the subprocess is still running""" | |
if self.process is None: | |
return False | |
return self.process.poll() is None | |
def get_exit_details(self): | |
"""Return exit code and reason if process has terminated""" | |
if self.process is None: | |
return None, "Process was never started" | |
if self.is_running(): | |
return None, "Process is still running" | |
if not self.exit_code is None and self.exit_code != 0 : | |
return self.exit_code, "Process exited abnormaly" | |
return self.exit_code, "Process exited normaly" | |