File size: 11,591 Bytes
8b00326
43b024e
73e0168
f045267
8b00326
73e0168
f045267
6017ce1
af91d08
73e0168
43b024e
f045267
31ee061
43b024e
d182492
53dd325
d182492
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f045267
73e0168
 
 
c6a3e13
73e0168
 
 
 
 
 
 
 
 
 
8b00326
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73e0168
 
55e8035
73e0168
55e8035
43b024e
 
af91d08
43b024e
f045267
bdb8322
43b024e
73e0168
43b024e
55e8035
73e0168
43b024e
 
 
 
 
 
 
273c97d
43b024e
 
 
 
 
 
 
d182492
43b024e
 
9088a0f
43b024e
 
 
 
 
 
 
 
 
8b00326
 
43b024e
 
8b00326
 
43b024e
8b00326
 
43b024e
8b00326
 
 
 
 
 
 
 
 
 
74560e6
 
 
 
 
43b024e
73e0168
af91d08
73e0168
 
f045267
 
73e0168
 
 
c6a3e13
 
73e0168
43b024e
d182492
73e0168
43b024e
73e0168
 
 
 
 
43b024e
73e0168
43b024e
73e0168
287c8b4
73e0168
 
 
 
 
af91d08
8b00326
af91d08
73e0168
af91d08
73e0168
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af91d08
73e0168
d182492
af91d08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d182492
af91d08
 
 
 
d182492
af91d08
f045267
 
73e0168
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import json
import os
import re
import subprocess
import time
import yaml

import gradio as gr
import pandas as pd
import requests
from huggingface_hub import HfApi, get_token


CMD = ["python" ,"run_job.py"]
ARG_NAMES = ["<src>", "<dst>", "<query>", "[-c config]", "[-s split]", "[-p private]"]
SPACE_ID = os.environ.get("SPACE_ID") or "lhoestq/run-duckdb-jobs"

CONTENT = """
## Usage: 

```bash
curl -L 'https://huggingface.co/api/jobs/<username>' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer <hf_token>' \
-d '{{
    "spaceId": "{SPACE_ID}", 
    "command": {CMD},
    "arguments": {ARG_NAMES},
    "environment": {{"HF_TOKEN": <hf_token>}},
    "flavor": "cpu-basic" 
}}'
```

## Example:
"""

with open("README.md") as f:
    METADATA = yaml.safe_load(f.read().split("---\n")[1])
TITLE = METADATA["title"]
SHORT_DESCRIPTION = METADATA.get("short_description")
EMOJI = METADATA["emoji"]

try:
    process = subprocess.run(CMD + ["--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    HELP = not process.returncode and (process.stdout or process.stderr).decode()
except Exception:
    HELP = False

DRY_RUN = bool(HELP) and bool(m :=re.search("--dry(-|_)run", HELP)) and m.group(0)

def parse_log(line: str, pbars: dict[str, float] = None):
    if line.startswith("data: {"):
        data = json.loads(line[len("data: "):])
        data, timestamp = data["data"], data["timestamp"]
        if pbars is not None and data.startswith("===== Job started at"):
            pbars.pop("Starting βš™οΈ", None)
            pbars["Running πŸƒ"] = 0.0
            return f"[{timestamp}] {data}\n\n"
        elif pbars is not None and (percent_match := re.search("\\d+(?:\\.\\d+)?%", data)) and any(c in data.split("%")[1][:10] for c in "|β–ˆβ–Œ"):
            pbars.pop("Running πŸƒ", None)
            [pbars.pop(desc) for desc, percent in pbars.items() if percent == 1.]
            percent = float(percent_match.group(0)[:-1]) / 100
            desc = data[:percent_match.start()].strip() or "Progress"
            pbars[desc] = percent
        else:
            return f"[{timestamp}] {data}\n\n"
    return ""

def dry_run(src, config, split, dst, query):
    if not all([src, dst, query]):
        raise gr.Error("Please fill source, destination and query.")
    args = ["--src", src] + (["--config", config] if config else []) + (["--split", split] if split else []) + [ "--dst", dst, "--query", query, DRY_RUN]
    cmd = CMD + args
    logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n"
    yield {output_markdown: logs, progress_labels: gr.Label(visible=False), details_accordion: gr.Accordion(open=True)}
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, b""):
        logs += line.decode()
        yield {output_markdown: logs}

def run(src, config, split, dst, query, oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None):
    if not all([src, dst, query]):
        raise gr.Error("Please fill source, destination and query.")
    if oauth_token and profile:
        token = oauth_token.token
        username = profile.username
    elif (token := get_token()):
        username = HfApi().whoami(token=token)["name"]
    else:
        raise gr.Error("Please log in to run the job.")
    args = ["--src", src] + (["--config", config] if config else []) + (["--split", split] if split else []) + [ "--dst", dst, "--query", query]
    cmd = CMD + args
    logs = "Job:\n\n```bash\n" + " ".join('"' + arg.replace('"', '\"""') + '"' if " " in arg else arg for arg in cmd) + "\n```\nOutput:\n\n"
    pbars = {}
    yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
    resp = requests.post(
        f"https://huggingface.co/api/jobs/{username}",
        json={
            "spaceId": SPACE_ID,
            "arguments": args,
            "command":  CMD,
            "environment": {"HF_TOKEN": token},
            "flavor": "cpu-basic" 
        },
        headers={"Authorization": f"Bearer {token}"}
    )
    if resp.status_code != 200:
        logs += resp.text
        pbars = {"Finished with an error ❌": 1.0}
    else:
        job_id = resp.json()["metadata"]["job_id"]
        pbars = {"Starting βš™οΈ": 0.0}
        yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
        resp = requests.get(
            f"https://huggingface.co/api/jobs/{username}/{job_id}/logs-stream",
            headers={"Authorization": f"Bearer {token}"},
            stream=True
        )
        for line in resp.iter_lines():
            logs += parse_log(line.decode("utf-8"), pbars=pbars)
            yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}
        job_status = {"status": {"stage": "RUNNING"}}
        while True:
            job_status = requests.get(
                f"https://huggingface.co/api/jobs/{username}/{job_id}",
                headers={"Authorization": f"Bearer {token}"}
            ).json()
            if job_status["status"]["stage"] == "RUNNING":
                time.sleep(1)
            else:
                break
        if job_status["status"]["stage"] == "COMPLETED":
            pbars = {"Finished βœ…": 1.0}
        else:
            logs += f'{job_status["status"]["message"]} ({job_status["status"]["error"]})'
            pbars = {"Finished with an error ❌": 1.0}
    yield {output_markdown: logs, progress_labels: gr.Label(pbars, visible=bool(pbars))}


READ_FUNCTIONS = ("pl.read_parquet", "pl.read_csv", "pl.read_json")
NUM_TRENDING_DATASETS = 10

with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column(scale=10):
            gr.Markdown(f"# {TITLE} {EMOJI}")
            if SHORT_DESCRIPTION:
                gr.Markdown(SHORT_DESCRIPTION)
        with gr.Column():
            gr.LoginButton()
    gr.Markdown(CONTENT.format(SPACE_ID=SPACE_ID, CMD=json.dumps(CMD), ARG_NAMES=json.dumps(ARG_NAMES)))
    with gr.Row():
        with gr.Column(scale=10):
            with gr.Row():
                loading_codes_json = gr.JSON([], visible=False)
                dataset_dropdown = gr.Dropdown(label="Source Dataset", allow_custom_value=True, scale=10)
                subset_dropdown = gr.Dropdown(info="Subset", allow_custom_value=True, show_label=False, visible=False)
                split_dropdown = gr.Dropdown(info="Split", allow_custom_value=True, show_label=False, visible=False)
        with gr.Column(min_width=60):
            gr.HTML("<div style='font-size: 4em;'>β†’</div>")
        with gr.Column(scale=10):
            dst_dropdown = gr.Dropdown(label="Destination Dataset", allow_custom_value=True)
    query_textarea = gr.Textbox(label="SQL Query", lines=2, max_lines=300, placeholder="SELECT * FROM src;", value="SELECT * FROM src;")
    with gr.Row():
        run_button = gr.Button("Run", scale=10, variant="primary")
        if DRY_RUN:
            dry_run_button = gr.Button("Dry-Run")
    progress_labels= gr.Label(visible=False, label="Progress")
    with gr.Accordion("Details", open=False) as details_accordion:
        output_markdown = gr.Markdown(label="Output logs")
    run_button.click(run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[details_accordion, progress_labels, output_markdown])
    if DRY_RUN:
        dry_run_button.click(dry_run, inputs=[dataset_dropdown, subset_dropdown, split_dropdown, dst_dropdown, query_textarea], outputs=[details_accordion, progress_labels, output_markdown])

    def show_subset_dropdown(dataset: str):
        if dataset and "/" not in dataset.strip().strip("/"):
            return []
        resp = requests.get(f"https://datasets-server.huggingface.co/compatible-libraries?dataset={dataset}", timeout=3).json()
        loading_codes = ([lib["loading_codes"] for lib in resp.get("libraries", []) if lib["function"] in READ_FUNCTIONS] or [[]])[0] or []
        subsets = [loading_code["config_name"] for loading_code in loading_codes]
        subset = (subsets or [""])[0]
        return dict(choices=subsets, value=subset, visible=len(subsets) > 1, key=hash(str(loading_codes))), loading_codes

    def show_split_dropdown(subset: str, loading_codes: list[dict]):
        splits = ([list(loading_code["arguments"]["splits"]) for loading_code in loading_codes if loading_code["config_name"] == subset] or [[]])[0]
        split = (splits or [""])[0]
        return dict(choices=splits, value=split, visible=len(splits) > 1, key=hash(str(loading_codes) + subset))

    @demo.load(outputs=[dataset_dropdown, loading_codes_json, subset_dropdown, split_dropdown])
    def _fetch_datasets(request: gr.Request):
        dataset = "CohereForAI/Global-MMLU"
        datasets = [dataset] + [ds.id for ds in HfApi().list_datasets(limit=NUM_TRENDING_DATASETS, sort="trendingScore", direction=-1) if ds.id != dataset]
        subsets, loading_codes = show_subset_dropdown(dataset)
        splits = show_split_dropdown(subsets["value"], loading_codes)
        return {
            dataset_dropdown: gr.Dropdown(choices=datasets, value=dataset),
            loading_codes_json: loading_codes,
            subset_dropdown: gr.Dropdown(**subsets),
            split_dropdown: gr.Dropdown(**splits),
        }
    
    @dataset_dropdown.select(inputs=[dataset_dropdown], outputs=[subset_dropdown, split_dropdown])
    def _show_subset_dropdown(dataset: str):
        subsets, loading_codes = show_subset_dropdown(dataset)
        splits = show_split_dropdown(subsets["value"], loading_codes)
        return {
            subset_dropdown: gr.Dropdown(**subsets),
            split_dropdown: gr.Dropdown(**splits),
        }
    
    @subset_dropdown.select(inputs=[dataset_dropdown, subset_dropdown, loading_codes_json], outputs=[split_dropdown])
    def _show_split_dropdown(dataset: str, subset: str, loading_codes: list[dict]):
        splits = show_split_dropdown(subset, loading_codes)
        return {
            split_dropdown: gr.Dropdown(**splits),
        }

if HELP:
    with demo.route("Help", "/help"):
        gr.Markdown(f"# Help\n\n```\n{HELP}\n```")

with demo.route("Jobs", "/jobs") as page:
    gr.Markdown("# Jobs")
    jobs_dataframe = gr.DataFrame(datatype="markdown")

    @page.load(outputs=[jobs_dataframe])
    def list_jobs(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None):
        if oauth_token and profile:
            token = oauth_token.token
            username = profile.username
        elif (token := get_token()):
            username = HfApi().whoami(token=token)["name"]
        else:
            return pd.DataFrame({"Log in to see jobs": []})
        resp = requests.get(
            f"https://huggingface.co/api/jobs/{username}",
            headers={"Authorization": f"Bearer {token}"}
        )
        return pd.DataFrame([
            {
                "id": job["metadata"]["id"],
                "created_at": job["metadata"]["created_at"],
                "stage": job["compute"]["status"]["stage"],
                "output": f'[logs](https://huggingface.co/api/jobs/{username}/{job["metadata"]["id"]}/logs-stream)',
                "command": str(job["compute"]["spec"]["extra"]["command"]),
                "args": str(job["compute"]["spec"]["extra"]["args"]),
            }
            for job in resp.json()
            if job["compute"]["spec"]["extra"]["input"]["spaceId"] == SPACE_ID
        ])

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0")