Spaces:
Sleeping
Sleeping
import io | |
import os | |
import re | |
import pathlib | |
import shutil | |
import subprocess | |
import pandas as pd | |
from datasets import load_dataset, get_dataset_config_names | |
from loguru import logger | |
from typing import List | |
UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files") | |
CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml") | |
# Ensure the upload directory exists | |
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) | |
STAGES = [ | |
"ingestion", | |
"upload_ingest_to_hub", | |
"summarization", | |
"chunking", | |
"single_shot_question_generation", | |
"answer_generation", | |
#"evaluate_models", | |
#"create_leaderboard" | |
# "judge_answers", # to uncomment when fixed | |
] | |
def save_files(files: List[pathlib.Path]) -> str: | |
"""Save uploaded files to the UPLOAD_DIRECTORY safely""" | |
saved_paths = [] | |
for file in files: | |
try: | |
source_path = pathlib.Path(file) | |
destination_path = UPLOAD_DIRECTORY / source_path.name | |
if not source_path.exists(): | |
print(f"File not found: {source_path}") | |
continue # Skip missing files | |
shutil.move(str(source_path), str(destination_path)) | |
saved_paths.append(str(destination_path)) | |
except Exception as e: | |
print(f"Error moving file {file}: {e}") | |
return ( | |
f"Files saved to: {', '.join(saved_paths)}" | |
if saved_paths | |
else "No files were saved" | |
) | |
def update_dataset(stages, hf_org, hf_prefix): | |
""" | |
Updates the dataset based on the provided stages and dataset configuration. | |
""" | |
ingestion_df = pd.DataFrame() | |
summarization_df = pd.DataFrame() | |
single_hop_df = pd.DataFrame() | |
answers_df = pd.DataFrame() | |
# Construct dataset name from config | |
dataset_name = f"{hf_org}/{hf_prefix}" | |
# TODO: add cache dir | |
# Will be able to group everything in one pass once the names get homogeneized | |
if "ingestion" in stages: | |
# TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits) | |
ingestion_ds = load_dataset(dataset_name, name="ingested", split="train", streaming=True) | |
ingestion_df = pd.DataFrame([next(iter(ingestion_ds)) for _ in range(5)]) | |
if "summarization" in stages: | |
summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True) | |
summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(5)]) | |
if "single_shot_question_generation" in stages: | |
single_hop_ds = load_dataset(dataset_name, name="single_shot_question_generation", split="train", streaming=True) | |
single_hop_df = pd.DataFrame([next(iter(single_hop_ds)) for _ in range(5)]) | |
if "answer_generation" in stages: | |
answers_ds = load_dataset(dataset_name, name="answer_generation", split="train", streaming=True) | |
answers_df = pd.DataFrame([next(iter(answers_ds)) for _ in range(5)]) | |
return (ingestion_df, summarization_df, single_hop_df, answers_df) | |
class SubprocessManager: | |
def __init__(self, command): | |
self.command = command | |
self.process = None | |
self.output_stream = io.StringIO() | |
self.exit_code = None | |
def start_process(self, custom_env: dict | None): | |
"""Start the subprocess.""" | |
if self.is_running(): | |
logger.info("Process is already running") | |
return | |
self.output_stream = io.StringIO() | |
self.exit_code = None | |
try: | |
logger.info(f"Starting process with command: {' '.join(self.command)}") | |
self.process = subprocess.Popen( | |
self.command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, # Combine stderr with stdout | |
text=True, | |
bufsize=1, | |
start_new_session=True, | |
env=custom_env, | |
) | |
os.set_blocking(self.process.stdout.fileno(), False) | |
logger.info(f"Started process with PID: {self.process.pid}") | |
except Exception as e: | |
logger.error(f"Failed to start process: {str(e)}") | |
return | |
def read_and_get_output(self): | |
"""Read subprocess output, capture it, and return log and completed stages.""" | |
current_output = "" | |
completed_stages = [] | |
if self.process and self.process.stdout: | |
try: | |
while True: | |
line = self.process.stdout.readline() | |
if line: | |
self.output_stream.write(line) | |
else: | |
break | |
except BlockingIOError: | |
pass | |
current_output = self.output_stream.getvalue() | |
completed_stages = list(set(re.findall(r"Successfully completed stage: (\w+)", current_output))) | |
return current_output, completed_stages | |
def stop_process(self): | |
"""Terminate the subprocess.""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGTERM to the Process") | |
try: | |
self.process.terminate() | |
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate | |
logger.info(f"Process terminated by user with exit code {self.exit_code}") | |
except subprocess.TimeoutExpired: | |
logger.warning("Process did not terminate within timeout, sending SIGKILL") | |
self.kill_process() | |
def kill_process(self): | |
"""Forcefully kill the subprocess""" | |
if not self.is_running(): | |
logger.info("Process is not running") | |
return | |
logger.info("Sending SIGKILL to the Process") | |
try: | |
self.process.kill() | |
self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed | |
logger.info(f"Process killed by user with exit code {self.exit_code}") | |
except subprocess.TimeoutExpired: | |
logger.error("Process could not be killed within timeout") | |
def is_running(self): | |
"""Check if the subprocess is still running""" | |
if self.process is None: | |
return False | |
return self.process.poll() is None | |
def get_exit_details(self): | |
"""Return exit code and reason if process has terminated""" | |
if self.process is None: | |
return None, "Process was never started" | |
if self.is_running(): | |
return None, "Process is still running" | |
if not self.exit_code is None and self.exit_code != 0 : | |
return self.exit_code, "Process exited abnormaly" | |
return self.exit_code, "Process exited normaly" | |