Spaces:
Running
Running
import os | |
import re | |
import subprocess | |
import yaml | |
import gradio as gr | |
import requests | |
from huggingface_hub import HfApi, get_token | |
CMD = ["python" ,"run_job.py"] | |
with open("README.md") as f: | |
METADATA = yaml.safe_load(f.read().split("---\n")[1]) | |
TITLE = METADATA["title"] | |
EMOJI = METADATA["emoji"] | |
spaceId = os.environ.get("SPACE_ID") or "lhoestq/run-duckdb" | |
try: | |
process = subprocess.run(CMD + ["--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
HELP = not process.returncode and (process.stdout or process.stderr).decode() | |
except Exception: | |
HELP = False | |
DRY_RUN = bool(HELP) and bool(m :=re.search("--dry(-|_)run", HELP)) and m.group(0) | |
def parse_log(line: str, pbars: dict[str, float]): | |
if (percent_match := re.search("\\d+(?:\\.\\d+)?%", line)) and any(c in line.split("%")[1][:10] for c in "|ββ"): | |
[pbars.pop(desc) for desc, percent in pbars.items() if percent == 1.] | |
percent = float(percent_match.group(0)[:-1]) / 100 | |
desc = line[:percent_match.start()].strip() or "Progress" | |
pbars[desc] = percent | |
yield "" | |
else: | |
yield line | |
def dry_run(src, config, split, dst, query): | |
if not all([src, config, split, dst, query]): | |
raise gr.Error("Please fill source, destination and query.") | |
args = ["--src", src, "--config", config, "--split", split, "--dst", dst, "--query", query, DRY_RUN] | |
cmd = CMD + args | |
logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n" | |
yield {output_markdown: logs, progress_labels: gr.Label(visible=False)} | |
process = subprocess.Popen(cmd, stdout=subprocess.PIPE) | |
for line in iter(process.stdout.readline, b""): | |
logs += line.decode() | |
yield {output_markdown: logs} | |
def run(src, config, split, dst, query, oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None): | |
if not all([src, config, split, dst, query]): | |
raise gr.Error("Please fill source, destination and query.") | |
if oauth_token and profile: | |
token = oauth_token.token | |
username = profile.username | |
elif (token := get_token()): | |
username = HfApi().whoami(token=token)["name"] | |
else: | |
raise gr.Error("Please log in to run the job.") | |
args = ["--src", src, "--config", config, "--split", split, "--dst", dst, "--query", query] | |
cmd = CMD + args | |
logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n" | |
pbars = {} | |
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} | |
resp = requests.post( | |
f"https://huggingface.co/api/jobs/{username}", | |
json={ | |
"spaceId": spaceId, | |
"arguments": args, | |
"command": CMD, | |
"environment": {}, | |
"flavor": "cpu-basic" | |
}, | |
headers={"Authorization": f"Bearer {token}"} | |
) | |
if resp.status_code != 200: | |
logs += resp.text | |
pbars = {"Finished with an error β": 1.0} | |
else: | |
job_id = resp.json()["metadata"]["job_id"] | |
resp = requests.get( | |
f"https://huggingface.co/api/jobs/{username}/{job_id}/logs-stream", | |
headers={"Authorization": f"Bearer {token}"} | |
) | |
for line in iter(resp.raw.readline, b""): | |
logs += parse_log(line.decode(), pbars=pbars) | |
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} | |
pbars = {"Finished" + (" β " if process.returncode == 0 else " with an error β"): 1.0} | |
yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))} | |
READ_FUNCTIONS = ("pl.read_parquet", "pl.read_csv", "pl.read_json") | |
NUM_TRENDING_DATASETS = 10 | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(scale=10): | |
gr.Markdown(f"# {TITLE} {EMOJI}") | |
with gr.Column(): | |
gr.LoginButton() | |
with gr.Row(): | |
with gr.Column(scale=10): | |
with gr.Row(): | |
loading_codes_json = gr.JSON([], visible=False) | |
dataset_dropdown = gr.Dropdown(label="Source Dataset", allow_custom_value=True, scale=10) | |
subset_dropdown = gr.Dropdown(info="Subset", allow_custom_value=True, show_label=False, visible=False) | |
split_dropdown = gr.Dropdown(info="Split", allow_custom_value=True, show_label=False, visible=False) | |
with gr.Column(min_width=60): | |
gr.HTML("<div style='font-size: 4em;'>β</div>") | |
with gr.Column(scale=10): | |
dst_dropdown = gr.Dropdown(label="Destination Dataset", allow_custom_value=True) | |
query_textarea = gr.TextArea(label="SQL Query", placeholder="SELECT * FROM src;", value="SELECT * FROM src;", container=False, show_label=False) | |
with gr.Row(): | |
run_button = gr.Button("Run", scale=10, variant="primary") | |
if DRY_RUN: | |
dry_run_button = gr.Button("Dry-Run") | |
progress_labels= gr.Label(visible=False, label="Progress") | |
output_markdown = gr.Markdown(label="Output logs") | |
run_button.click(run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[progress_labels, output_markdown]) | |
if DRY_RUN: | |
dry_run_button.click(dry_run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[progress_labels, output_markdown]) | |
def show_subset_dropdown(dataset: str): | |
if dataset and "/" not in dataset.strip().strip("/"): | |
return [] | |
resp = requests.get(f"https://datasets-server.huggingface.co/compatible-libraries?dataset={dataset}", timeout=3).json() | |
loading_codes = ([lib["loading_codes"] for lib in resp.get("libraries", []) if lib["function"] in READ_FUNCTIONS] or [[]])[0] or [] | |
subsets = [loading_code["config_name"] for loading_code in loading_codes] | |
subset = (subsets or [""])[0] | |
return dict(choices=subsets, value=subset, visible=len(subsets) > 1, key=hash(str(loading_codes))), loading_codes | |
def show_split_dropdown(subset: str, loading_codes: list[dict]): | |
splits = ([list(loading_code["arguments"]["splits"]) for loading_code in loading_codes if loading_code["config_name"] == subset] or [[]])[0] | |
split = (splits or [""])[0] | |
return dict(choices=splits, value=split, visible=len(splits) > 1, key=hash(str(loading_codes) + subset)) | |
def _fetch_datasets(request: gr.Request): | |
dataset = "CohereForAI/Global-MMLU" | |
datasets = [dataset] + [ds.id for ds in HfApi().list_datasets(limit=NUM_TRENDING_DATASETS, sort="trendingScore", direction=-1) if ds.id != dataset] | |
subsets, loading_codes = show_subset_dropdown(dataset) | |
splits = show_split_dropdown(subsets["value"], loading_codes) | |
return { | |
dataset_dropdown: gr.Dropdown(choices=datasets, value=dataset), | |
loading_codes_json: loading_codes, | |
subset_dropdown: gr.Dropdown(**subsets), | |
split_dropdown: gr.Dropdown(**splits), | |
} | |
def _show_subset_dropdown(dataset: str): | |
subsets, loading_codes = show_subset_dropdown(dataset) | |
splits = show_split_dropdown(subsets["value"], loading_codes) | |
return { | |
subset_dropdown: gr.Dropdown(**subsets), | |
split_dropdown: gr.Dropdown(**splits), | |
} | |
def _show_split_dropdown(dataset: str, subset: str, loading_codes: list[dict]): | |
splits = show_split_dropdown(subset, loading_codes) | |
return { | |
split_dropdown: gr.Dropdown(**splits), | |
} | |
if HELP: | |
with demo.route("Help", "/help"): | |
gr.Markdown(f"# Help\n\n```\n{HELP}\n```") | |
with demo.route("Jobs", "/jobs"): | |
gr.Markdown("# Jobs") | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0") | |