|
import marimo |
|
|
|
__generated_with = "0.11.16" |
|
app = marimo.App(width="medium") |
|
|
|
|
|
@app.cell |
|
def _(): |
|
import marimo as mo |
|
import os |
|
return mo, os |
|
|
|
|
|
@app.cell |
|
def _(): |
|
def get_markdown_content(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
return content |
|
return (get_markdown_content,) |
|
|
|
|
|
@app.cell |
|
def _(get_markdown_content, mo): |
|
intro_text = get_markdown_content('intro_markdown/intro.md') |
|
intro_marimo = get_markdown_content('intro_markdown/intro_marimo.md') |
|
intro_notebook = get_markdown_content('intro_markdown/intro_notebook.md') |
|
intro_comparison = get_markdown_content('intro_markdown/intro_comparison.md') |
|
|
|
intro = mo.carousel([ |
|
mo.md(f"{intro_text}"), |
|
mo.md(f"{intro_marimo}"), |
|
mo.md(f"{intro_notebook}"), |
|
mo.md(f"{intro_comparison}"), |
|
]) |
|
|
|
mo.accordion({"## Notebook Introduction":intro}) |
|
return intro, intro_comparison, intro_marimo, intro_notebook, intro_text |
|
|
|
|
|
@app.cell |
|
def _(os): |
|
|
|
from typing import ( |
|
Any, Dict, List, Optional, Pattern, Set, Union, Tuple |
|
) |
|
from pathlib import Path |
|
from urllib.request import urlopen |
|
|
|
from rich.text import Text |
|
from rich import print |
|
from tqdm import tqdm |
|
from enum import Enum |
|
import pandas as pd |
|
import tempfile |
|
import requests |
|
import getpass |
|
import urllib3 |
|
import base64 |
|
import time |
|
import json |
|
import uuid |
|
import ssl |
|
import ast |
|
import re |
|
|
|
pd.set_option('display.max_columns', None) |
|
pd.set_option('display.max_rows', None) |
|
pd.set_option('display.max_colwidth', None) |
|
pd.set_option('display.width', None) |
|
|
|
|
|
os.environ['TMPDIR'] = '/tmp' |
|
|
|
|
|
tempfile.tempdir = '/tmp' |
|
return ( |
|
Any, |
|
Dict, |
|
Enum, |
|
List, |
|
Optional, |
|
Path, |
|
Pattern, |
|
Set, |
|
Text, |
|
Tuple, |
|
Union, |
|
ast, |
|
base64, |
|
getpass, |
|
json, |
|
pd, |
|
print, |
|
re, |
|
requests, |
|
ssl, |
|
tempfile, |
|
time, |
|
tqdm, |
|
urllib3, |
|
urlopen, |
|
uuid, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
|
|
|
|
|
|
wx_platform_url = "https://api.dataplatform.cloud.ibm.com" |
|
regions = { |
|
"US": "https://us-south.ml.cloud.ibm.com", |
|
"EU": "https://eu-de.ml.cloud.ibm.com", |
|
"GB": "https://eu-gb.ml.cloud.ibm.com", |
|
"JP": "https://jp-tok.ml.cloud.ibm.com", |
|
"AU": "https://au-syd.ml.cloud.ibm.com", |
|
"CA": "https://ca-tor.ml.cloud.ibm.com" |
|
} |
|
|
|
|
|
client_instantiation_form = ( |
|
mo.md(''' |
|
###**watsonx.ai credentials:** |
|
|
|
{wx_region} |
|
|
|
{wx_api_key} |
|
|
|
{space_id} |
|
''').style(max_height="300px", overflow="auto", border_color="blue") |
|
.batch( |
|
wx_region = mo.ui.dropdown(regions, label="Select your watsonx.ai region:", value="US", searchable=True), |
|
wx_api_key = mo.ui.text(placeholder="Add your IBM Cloud api-key...", label="IBM Cloud Api-key:", kind="password"), |
|
|
|
space_id = mo.ui.text(placeholder="Add your watsonx.ai space_id...", label="Space_ID:", kind="text") |
|
,) |
|
.form(show_clear_button=True, bordered=False) |
|
) |
|
|
|
|
|
|
|
return client_instantiation_form, regions, wx_platform_url |
|
|
|
|
|
@app.cell |
|
def _(client_instantiation_form, mo): |
|
from ibm_watsonx_ai import APIClient, Credentials |
|
|
|
def setup_task_credentials(deployment_client): |
|
|
|
existing_credentials = deployment_client.task_credentials.get_details() |
|
|
|
|
|
if "resources" in existing_credentials and existing_credentials["resources"]: |
|
for cred in existing_credentials["resources"]: |
|
cred_id = deployment_client.task_credentials.get_id(cred) |
|
deployment_client.task_credentials.delete(cred_id) |
|
|
|
|
|
return deployment_client.task_credentials.store() |
|
|
|
if client_instantiation_form.value: |
|
|
|
wx_credentials = Credentials( |
|
url=client_instantiation_form.value["wx_region"], |
|
api_key=client_instantiation_form.value["wx_api_key"] |
|
) |
|
|
|
|
|
deployment_client = APIClient(credentials=wx_credentials, space_id=client_instantiation_form.value["space_id"]) |
|
|
|
task_credentials_details = setup_task_credentials(deployment_client) |
|
else: |
|
|
|
deployment_client = None |
|
task_credentials_details = None |
|
|
|
template_variant = mo.ui.dropdown(["Base","Stream Files to IBM COS [Example]"], label="Code Template:", value="Base") |
|
|
|
if deployment_client is not None: |
|
client_callout_kind = "success" |
|
else: |
|
client_callout_kind = "neutral" |
|
|
|
client_callout = mo.callout(template_variant, kind=client_callout_kind) |
|
|
|
|
|
return ( |
|
APIClient, |
|
Credentials, |
|
client_callout, |
|
client_callout_kind, |
|
deployment_client, |
|
setup_task_credentials, |
|
task_credentials_details, |
|
template_variant, |
|
wx_credentials, |
|
) |
|
|
|
|
|
@app.cell |
|
def _( |
|
client_callout, |
|
client_instantiation_form, |
|
deploy_fnc, |
|
deployment_definition, |
|
fm, |
|
function_editor, |
|
hw_selection_table, |
|
mo, |
|
purge_tabs, |
|
sc_m, |
|
schema_editors, |
|
selection_table, |
|
upload_func, |
|
): |
|
s1 = mo.md(f''' |
|
###**Instantiate your watsonx.ai client:** |
|
|
|
1. Select a region from the dropdown menu |
|
|
|
2. Provide an IBM Cloud Apikey and watsonx.ai deployment space id |
|
|
|
3. Once you submit, the area with the code template will turn green if successful |
|
|
|
4. Select a base (provide baseline format) or example code function template |
|
|
|
--- |
|
|
|
{client_instantiation_form} |
|
|
|
--- |
|
|
|
{client_callout} |
|
|
|
''') |
|
|
|
sc_tabs = mo.ui.tabs( |
|
{ |
|
"Schema Option Selection": sc_m, |
|
"Schema Definition": mo.md(f""" |
|
####**Edit the schema definitions you selected in the previous tab.**<br> |
|
{schema_editors}"""), |
|
} |
|
) |
|
|
|
s2 = mo.md(f'''###**Create your function from the template:** |
|
|
|
1. Use the code editor window to create a function to deploy |
|
<br> |
|
The function must: |
|
<br> |
|
--- Include a payload and score element |
|
<br> |
|
--- Have the same function name in both the score = <name>() segment and the Function Name input field below |
|
<br> |
|
--- Additional details can be found here -> [watsonx.ai - Writing deployable Python functions |
|
](https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/ml-deploy-py-function-write.html?utm_medium=Exinfluencer&utm_source=ibm_developer&utm_content=in_content_link&utm_term=10006555&utm_id=blogs_awb-tekton-optimizations-for-kubeflow-pipelines-2-0&context=wx&audience=wdp) |
|
|
|
3. Click submit, then proceed to select whether you wish to add: |
|
<br> |
|
--- An input schema (describing the format of the variables the function takes) **[Optional]** |
|
<br> |
|
--- An output schema (describing the format of the output results the function returns) **[Optional]** |
|
<br> |
|
--- An sample input example (showing an example of a mapping of the input and output schema to actual values.) **[Optional]** |
|
|
|
4. Fill in the function name field **(must be exactly the same as in the function editor)** |
|
|
|
5. Add a description and metadata tags **[Optional]** |
|
|
|
--- |
|
|
|
{function_editor} |
|
|
|
--- |
|
|
|
{sc_tabs} |
|
|
|
--- |
|
|
|
{fm} |
|
|
|
''') |
|
|
|
s3 = mo.md(f''' |
|
###**Review and Upload your function** |
|
|
|
1. Review the function metadata specs JSON |
|
|
|
2. Select a software specification if necessary (default for python functions is pre-selected), this is the runtime environment of python that your function will run in. Environments on watsonx.ai come pre-packaged with many different libraries, if necessary install new ones by adding them into the function as a `subprocess.check_output('pip install <package_name>', shell=True)` command. |
|
|
|
3. Once your are satisfied, click the upload function button and wait for the response. |
|
|
|
> If you see no table of software specs, you haven't activated your watsonx.ai client. |
|
|
|
--- |
|
|
|
{selection_table} |
|
|
|
--- |
|
|
|
{upload_func} |
|
|
|
''') |
|
|
|
s4 = mo.md(f''' |
|
###**Deploy your function:** |
|
|
|
1. Select a hardware specification (vCPUs/GB) that you want your function deployed on |
|
<br> |
|
--- XXS and XS cost the same (0.5 CUH per hour, so XS is the better option |
|
<br> |
|
--- Select larger instances for more resource intensive tasks or runnable jobs |
|
|
|
2. Select the type of deployment: |
|
<br> |
|
--- Function (Online) for always-on endpoints - Always available and low latency, but consume resources continuously for every hour they are deployed. |
|
<br> |
|
--- Batch (Batch) for runnable jobs - Only consume resources during job runs, but aren't as flexible to deploy. |
|
|
|
3. If you've selected Function, pick a completely unique (globally, not just your account) deployment serving name that will be in the endpoint url. |
|
|
|
4. Once your are satisfied, click the deploy function button and wait for the response. |
|
|
|
--- |
|
|
|
{hw_selection_table} |
|
|
|
--- |
|
|
|
{deployment_definition} |
|
|
|
--- |
|
|
|
{deploy_fnc} |
|
|
|
''') |
|
|
|
s5 = mo.md(f''' |
|
###**Helper Purge Functions:** |
|
|
|
These functions help you retrieve, select and delete deployments, data assets or repository assets (functions, models, etc.) that you have in the deployment space. This is meant to support fast cleanup. |
|
|
|
Select the tab based on what you want to delete, then click each of the buttons one by one after the previous gives a response. |
|
|
|
--- |
|
|
|
{purge_tabs} |
|
|
|
''') |
|
|
|
sections = mo.accordion( |
|
{ |
|
"Section 1: **watsonx.ai Credentials**": s1, |
|
"Section 2: **Function Creation**": s2, |
|
"Section 3: **Function Upload**": s3, |
|
"Section 4: **Function Deployment**": s4, |
|
"Section 5: **Helper Functions**": s5, |
|
}, |
|
multiple=True |
|
) |
|
|
|
sections |
|
return s1, s2, s3, s4, s5, sc_tabs, sections |
|
|
|
|
|
@app.cell |
|
def _(mo, template_variant): |
|
|
|
if template_variant.value == "Stream Files to IBM COS [Example]": |
|
with open("stream_files_to_cos.py", "r") as file: |
|
template = file.read() |
|
else: |
|
template = '''def your_function_name(): |
|
|
|
import subprocess |
|
subprocess.check_output('pip install gensim', shell=True) |
|
import gensim |
|
|
|
def score(input_data): |
|
message_from_input_payload = payload.get("input_data")[0].get("values")[0][0] |
|
response_message = "Received message - {0}".format(message_from_input_payload) |
|
|
|
# Score using the pre-defined model |
|
score_response = { |
|
'predictions': [{'fields': ['Response_message_field', 'installed_lib_version'], |
|
'values': [[response_message, gensim.__version__]] |
|
}] |
|
} |
|
return score_response |
|
|
|
return score |
|
|
|
score = your_function_name() |
|
''' |
|
|
|
function_editor = ( |
|
mo.md(''' |
|
#### **Create your function by editing the template:** |
|
|
|
{editor} |
|
|
|
''') |
|
.batch( |
|
editor = mo.ui.code_editor(value=template, language="python", min_height=50) |
|
) |
|
.form(show_clear_button=True, bordered=False) |
|
) |
|
|
|
|
|
return file, function_editor, template |
|
|
|
|
|
@app.cell |
|
def _(function_editor, mo, os): |
|
if function_editor.value: |
|
|
|
code = function_editor.value['editor'] |
|
|
|
namespace = {} |
|
|
|
exec(code, namespace) |
|
|
|
|
|
function_name = None |
|
for name, obj in namespace.items(): |
|
if callable(obj) and name != "__builtins__": |
|
function_name = name |
|
break |
|
|
|
if function_name: |
|
|
|
deployable_function = namespace[function_name] |
|
|
|
mo.md(f"Created deployable function from '{function_name}'") |
|
|
|
save_dir = "/tmp/notebook_functions" |
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
file_path = os.path.join(save_dir, f"{function_name}.py") |
|
with open(file_path, "w") as f: |
|
f.write(code) |
|
else: |
|
mo.md("No function found in the editor code") |
|
return ( |
|
code, |
|
deployable_function, |
|
f, |
|
file_path, |
|
function_name, |
|
name, |
|
namespace, |
|
obj, |
|
save_dir, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(deployment_client, mo, pd): |
|
if deployment_client: |
|
supported_specs = deployment_client.software_specifications.list()[ |
|
deployment_client.software_specifications.list()['STATE'] == 'supported' |
|
] |
|
|
|
|
|
supported_specs = supported_specs.reset_index(drop=True) |
|
|
|
|
|
framework_mapping = { |
|
"tensorflow_rt24.1-py3.11": "TensorFlow", |
|
"pytorch-onnx_rt24.1-py3.11": "PyTorch", |
|
"onnxruntime_opset_19": "ONNX or ONNXRuntime", |
|
"runtime-24.1-py3.11": "AI Services/Python Functions/Python Scripts", |
|
"autoai-ts_rt24.1-py3.11": "AutoAI", |
|
"autoai-kb_rt24.1-py3.11": "AutoAI", |
|
"runtime-24.1-py3.11-cuda": "CUDA-enabled (GPU) Python Runtime", |
|
"runtime-24.1-r4.3": "R Runtime 4.3", |
|
"spark-mllib_3.4": "Apache Spark 3.4", |
|
"autoai-rag_rt24.1-py3.11": "AutoAI RAG" |
|
} |
|
|
|
|
|
preferred_order = [ |
|
"runtime-24.1-py3.11", |
|
"runtime-24.1-py3.11-cuda", |
|
"runtime-24.1-r4.3", |
|
"ai-service-v5-software-specification", |
|
"autoai-rag_rt24.1-py3.11", |
|
"autoai-ts_rt24.1-py3.11", |
|
"autoai-kb_rt24.1-py3.11", |
|
"tensorflow_rt24.1-py3.11", |
|
"pytorch-onnx_rt24.1-py3.11", |
|
"onnxruntime_opset_19", |
|
"spark-mllib_3.4", |
|
] |
|
|
|
|
|
supported_specs['SORT_ORDER'] = supported_specs['NAME'].apply( |
|
lambda x: preferred_order.index(x) if x in preferred_order else len(preferred_order) |
|
) |
|
|
|
|
|
supported_specs = supported_specs.sort_values('SORT_ORDER').reset_index(drop=True) |
|
|
|
|
|
supported_specs = supported_specs.drop(columns=['SORT_ORDER']) |
|
|
|
|
|
if 'REPLACEMENT' in supported_specs.columns: |
|
supported_specs = supported_specs.drop(columns=['REPLACEMENT']) |
|
|
|
|
|
supported_specs['NOTES'] = supported_specs['NAME'].map(framework_mapping).fillna("Other") |
|
|
|
|
|
selection_table = mo.ui.table( |
|
supported_specs, |
|
selection="single", |
|
label="#### **Select a supported software_spec runtime for your function asset** (For Python Functions select - *'runtime-24.1-py3.11'* ):", |
|
initial_selection=[0], |
|
page_size=6 |
|
) |
|
else: |
|
sel_df = pd.DataFrame( |
|
data=[["ID", "Activate deployment_client."]], |
|
columns=["ID", "VALUE"] |
|
) |
|
|
|
selection_table = mo.ui.table( |
|
sel_df, |
|
selection="single", |
|
label="You haven't activated the Deployment_Client", |
|
initial_selection=[0] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ( |
|
framework_mapping, |
|
preferred_order, |
|
sel_df, |
|
selection_table, |
|
supported_specs, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
input_schema_checkbox = mo.ui.checkbox(label="Add input schema (optional)") |
|
output_schema_checkbox = mo.ui.checkbox(label="Add output schema (optional)") |
|
sample_input_checkbox = mo.ui.checkbox(label="Add sample input example (optional)") |
|
return input_schema_checkbox, output_schema_checkbox, sample_input_checkbox |
|
|
|
|
|
@app.cell |
|
def _( |
|
input_schema_checkbox, |
|
mo, |
|
output_schema_checkbox, |
|
sample_input_checkbox, |
|
selection_table, |
|
template_variant, |
|
): |
|
if selection_table.value['ID'].iloc[0]: |
|
|
|
if template_variant.value == "Stream Files to IBM COS [Example]": |
|
fnc_nm = "stream_file_to_cos" |
|
else: |
|
fnc_nm = "your_function_name" |
|
|
|
uploaded_function_name = mo.ui.text(placeholder="<Must be the same as the name in editor>", label="Function Name:", kind="text", value=f"{fnc_nm}", full_width=False) |
|
tags_editor = mo.ui.array( |
|
[mo.ui.text(placeholder="Metadata Tags..."), mo.ui.text(), mo.ui.text()], |
|
label="Optional Metadata Tags" |
|
) |
|
software_spec = selection_table.value['ID'].iloc[0] |
|
|
|
description_input = mo.ui.text_area( |
|
placeholder="Write a description for your function...)", |
|
label="Description", |
|
max_length=256, |
|
rows=5, |
|
full_width=True |
|
) |
|
|
|
|
|
func_metadata=mo.hstack([ |
|
description_input, |
|
mo.hstack([ |
|
uploaded_function_name, |
|
tags_editor, |
|
], justify="start", gap=1, align="start", wrap=True) |
|
], |
|
widths=[0.6,0.4], |
|
gap=2.75 |
|
) |
|
|
|
schema_metadata=mo.hstack([ |
|
input_schema_checkbox, |
|
output_schema_checkbox, |
|
sample_input_checkbox |
|
], |
|
justify="center", gap=1, align="center", wrap=True |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fm = mo.vstack([ |
|
func_metadata, |
|
], |
|
align="center", |
|
gap=2 |
|
) |
|
sc_m = mo.vstack([ |
|
schema_metadata, |
|
mo.md("**Make sure to select the checkbox options before filling in descriptions and tags or they will reset.**") |
|
], |
|
align="center", |
|
gap=2 |
|
) |
|
return ( |
|
description_input, |
|
fm, |
|
fnc_nm, |
|
func_metadata, |
|
sc_m, |
|
schema_metadata, |
|
software_spec, |
|
tags_editor, |
|
uploaded_function_name, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(json, mo, template_variant): |
|
if template_variant.value == "Stream Files to IBM COS [Example]": |
|
from cos_stream_schema_examples import input_schema, output_schema, sample_input |
|
else: |
|
input_schema = [ |
|
{ |
|
'id': '1', |
|
'type': 'struct', |
|
'fields': [ |
|
{ |
|
'name': '<variable name 1>', |
|
'type': 'string', |
|
'nullable': False, |
|
'metadata': {} |
|
}, |
|
{ |
|
'name': '<variable name 2>', |
|
'type': 'string', |
|
'nullable': False, |
|
'metadata': {} |
|
} |
|
] |
|
} |
|
] |
|
|
|
output_schema = [ |
|
{ |
|
'id': '1', |
|
'type': 'struct', |
|
'fields': [ |
|
{ |
|
'name': '<output return name>', |
|
'type': 'string', |
|
'nullable': False, |
|
'metadata': {} |
|
} |
|
] |
|
} |
|
] |
|
|
|
sample_input = { |
|
'input_data': [ |
|
{ |
|
'fields': ['<variable name 1>', '<variable name 2>'], |
|
'values': [ |
|
['<sample input value for variable 1>', '<sample input value for variable 2>'] |
|
] |
|
} |
|
] |
|
} |
|
|
|
|
|
input_schema_editor = mo.ui.code_editor(value=json.dumps(input_schema, indent=4), language="python", min_height=25) |
|
output_schema_editor = mo.ui.code_editor(value=json.dumps(output_schema, indent=4), language="python", min_height=25) |
|
sample_input_editor = mo.ui.code_editor(value=json.dumps(sample_input, indent=4), language="python", min_height=25) |
|
|
|
schema_editors = mo.accordion( |
|
{ |
|
"""**Input Schema Metadata Editor**""": input_schema_editor, |
|
"""**Output Schema Metadata Editor**""": output_schema_editor, |
|
"""**Sample Input Metadata Editor**""": sample_input_editor |
|
}, multiple=True |
|
) |
|
|
|
|
|
return ( |
|
input_schema, |
|
input_schema_editor, |
|
output_schema, |
|
output_schema_editor, |
|
sample_input, |
|
sample_input_editor, |
|
schema_editors, |
|
) |
|
|
|
|
|
@app.cell |
|
def _( |
|
ast, |
|
deployment_client, |
|
description_input, |
|
function_editor, |
|
input_schema_checkbox, |
|
input_schema_editor, |
|
json, |
|
mo, |
|
os, |
|
output_schema_checkbox, |
|
output_schema_editor, |
|
sample_input_checkbox, |
|
sample_input_editor, |
|
selection_table, |
|
software_spec, |
|
tags_editor, |
|
uploaded_function_name, |
|
): |
|
get_upload_status, set_upload_status = mo.state("No uploads yet") |
|
|
|
function_meta = {} |
|
|
|
if selection_table.value['ID'].iloc[0] and deployment_client is not None: |
|
|
|
function_meta = { |
|
deployment_client.repository.FunctionMetaNames.NAME: f"{uploaded_function_name.value}" or "your_function_name", |
|
deployment_client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec or "45f12dfe-aa78-5b8d-9f38-0ee223c47309" |
|
} |
|
|
|
|
|
if tags_editor.value: |
|
|
|
filtered_tags = [tag for tag in tags_editor.value if tag and tag.strip()] |
|
if filtered_tags: |
|
function_meta[deployment_client.repository.FunctionMetaNames.TAGS] = filtered_tags |
|
|
|
|
|
if description_input.value: |
|
function_meta[deployment_client.repository.FunctionMetaNames.DESCRIPTION] = description_input.value |
|
|
|
|
|
if input_schema_checkbox.value: |
|
try: |
|
function_meta[deployment_client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = json.loads(input_schema_editor.value) |
|
except json.JSONDecodeError: |
|
|
|
function_meta[deployment_client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = ast.literal_eval(input_schema_editor.value) |
|
|
|
|
|
if output_schema_checkbox.value: |
|
try: |
|
function_meta[deployment_client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = json.loads(output_schema_editor.value) |
|
except json.JSONDecodeError: |
|
|
|
function_meta[deployment_client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = ast.literal_eval(output_schema_editor.value) |
|
|
|
|
|
if sample_input_checkbox.value: |
|
try: |
|
function_meta[deployment_client.repository.FunctionMetaNames.SAMPLE_SCORING_INPUT] = json.loads(sample_input_editor.value) |
|
except json.JSONDecodeError: |
|
|
|
function_meta[deployment_client.repository.FunctionMetaNames.SAMPLE_SCORING_INPUT] = ast.literal_eval(sample_input_editor.value) |
|
|
|
def upload_function(function_meta, use_function_object=True): |
|
""" |
|
Uploads a Python function to watsonx.ai as a deployable asset. |
|
Parameters: |
|
function_meta (dict): Metadata for the function |
|
use_function_object (bool): Whether to use function object (True) or file path (False) |
|
Returns: |
|
dict: Details of the uploaded function |
|
""" |
|
|
|
original_dir = os.getcwd() |
|
|
|
try: |
|
|
|
code_to_deploy = function_editor.value['editor'] |
|
|
|
func_name = uploaded_function_name.value or "your_function_name" |
|
|
|
function_meta[deployment_client.repository.FunctionMetaNames.NAME] = func_name |
|
|
|
save_dir = "/tmp/notebook_functions" |
|
os.makedirs(save_dir, exist_ok=True) |
|
file_path = f"{save_dir}/{func_name}.py" |
|
with open(file_path, "w", encoding="utf-8") as f: |
|
f.write(code_to_deploy) |
|
|
|
if use_function_object: |
|
|
|
import sys |
|
import importlib.util |
|
|
|
sys.path.append(save_dir) |
|
|
|
spec = importlib.util.spec_from_file_location(func_name, file_path) |
|
module = importlib.util.module_from_spec(spec) |
|
spec.loader.exec_module(module) |
|
|
|
function_object = getattr(module, func_name) |
|
|
|
|
|
os.chdir('/tmp') |
|
|
|
|
|
mo.md(f"Uploading function object: {func_name}") |
|
func_details = deployment_client.repository.store_function(function_object, function_meta) |
|
else: |
|
|
|
os.chdir('/tmp') |
|
|
|
|
|
mo.md(f"Uploading function from file: {file_path}") |
|
func_details = deployment_client.repository.store_function(file_path, function_meta) |
|
|
|
set_upload_status(f"Latest Upload - id - {func_details['metadata']['id']}") |
|
return func_details |
|
except Exception as e: |
|
set_upload_status(f"Error uploading function: {str(e)}") |
|
mo.md(f"Detailed error: {str(e)}") |
|
raise |
|
finally: |
|
|
|
os.chdir(original_dir) |
|
|
|
upload_status = mo.state("No uploads yet") |
|
|
|
upload_button = mo.ui.button( |
|
label="Upload Function", |
|
on_click=lambda _: upload_function(function_meta, use_function_object=True), |
|
kind="success", |
|
tooltip="Click to upload function to watsonx.ai" |
|
) |
|
|
|
|
|
return ( |
|
filtered_tags, |
|
function_meta, |
|
get_upload_status, |
|
set_upload_status, |
|
upload_button, |
|
upload_function, |
|
upload_status, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(get_upload_status, mo, upload_button): |
|
|
|
if upload_button.value: |
|
try: |
|
upload_result = upload_button.value |
|
artifact_id = upload_result['metadata']['id'] |
|
except Exception as e: |
|
mo.md(f"Error: {str(e)}") |
|
|
|
upload_func = mo.vstack([ |
|
upload_button, |
|
mo.md(f"**Status:** {get_upload_status()}") |
|
], justify="space-around", align="center") |
|
return artifact_id, upload_func, upload_result |
|
|
|
|
|
@app.cell |
|
def _(deployment_client, mo, pd, upload_button, uuid): |
|
def reorder_hardware_specifications(df): |
|
""" |
|
Reorders a hardware specifications dataframe by type and size of environment |
|
without hardcoding specific hardware types. |
|
|
|
Parameters: |
|
df (pandas.DataFrame): The hardware specifications dataframe to reorder |
|
|
|
Returns: |
|
pandas.DataFrame: Reordered dataframe with reset index |
|
""" |
|
|
|
result_df = df.copy() |
|
|
|
|
|
def get_sort_key(name): |
|
|
|
custom_order = [ |
|
"XXS", "XS", "S", "M", "L", "XL", |
|
"XS-Spark", "S-Spark", "M-Spark", "L-Spark", "XL-Spark", |
|
"K80", "K80x2", "K80x4", |
|
"V100", "V100x2", |
|
"WXaaS-XS", "WXaaS-S", "WXaaS-M", "WXaaS-L", "WXaaS-XL", |
|
"Default Spark", "Notebook Default Spark", "ML" |
|
] |
|
|
|
|
|
if name in custom_order: |
|
return (0, custom_order.index(name)) |
|
|
|
|
|
return (1, name) |
|
|
|
|
|
result_df['sort_key'] = result_df['NAME'].apply(get_sort_key) |
|
|
|
|
|
result_df = result_df.sort_values('sort_key').drop('sort_key', axis=1) |
|
|
|
|
|
result_df = result_df.reset_index(drop=True) |
|
|
|
return result_df |
|
|
|
if deployment_client and upload_button.value: |
|
|
|
hardware_specs = deployment_client.hardware_specifications.list() |
|
hardware_specs_df = reorder_hardware_specifications(hardware_specs) |
|
|
|
|
|
hw_selection_table = mo.ui.table( |
|
hardware_specs_df, |
|
selection="single", |
|
label="#### **Select a supported hardware_specification for your deployment** *(Default: 'XS' - 1vCPU_4GB Ram)*", |
|
initial_selection=[1], |
|
page_size=6, |
|
wrapped_columns=['DESCRIPTION'] |
|
) |
|
|
|
deployment_type = mo.ui.radio( |
|
options={"Function":"Online (Function Endpoint)","Runnable Job":"Batch (Runnable Jobs)"}, value="Function", label="Select the Type of Deployment:", inline=True |
|
) |
|
uuid_suffix = str(uuid.uuid4())[:4] |
|
|
|
deployment_name = mo.ui.text(value=f"deployed_func_{uuid_suffix}", label="Deployment Name:", placeholder="<Must be completely unique>") |
|
else: |
|
hw_df = pd.DataFrame( |
|
data=[["ID", "Activate deployment_client."]], |
|
columns=["ID", "VALUE"] |
|
) |
|
|
|
hw_selection_table = mo.ui.table( |
|
hw_df, |
|
selection="single", |
|
label="You haven't activated the Deployment_Client", |
|
initial_selection=[0] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ( |
|
deployment_name, |
|
deployment_type, |
|
hardware_specs, |
|
hardware_specs_df, |
|
hw_df, |
|
hw_selection_table, |
|
reorder_hardware_specifications, |
|
uuid_suffix, |
|
) |
|
|
|
|
|
@app.cell |
|
def _( |
|
artifact_id, |
|
deployment_client, |
|
deployment_details, |
|
deployment_name, |
|
deployment_type, |
|
hw_selection_table, |
|
mo, |
|
print, |
|
upload_button, |
|
): |
|
def deploy_function(artifact_id, deployment_type): |
|
""" |
|
Deploys a function asset to watsonx.ai. |
|
|
|
Parameters: |
|
artifact_id (str): ID of the function artifact to deploy |
|
deployment_type (object): Type of deployment (online or batch) |
|
|
|
Returns: |
|
dict: Details of the deployed function |
|
""" |
|
if not artifact_id: |
|
print("Error: No artifact ID provided. Please upload a function first.") |
|
return None |
|
|
|
if deployment_type.value == "Online (Function Endpoint)": |
|
deployment_props = { |
|
deployment_client.deployments.ConfigurationMetaNames.NAME: deployment_name.value, |
|
deployment_client.deployments.ConfigurationMetaNames.ONLINE: {}, |
|
deployment_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config}, |
|
deployment_client.deployments.ConfigurationMetaNames.SERVING_NAME: deployment_name.value, |
|
} |
|
else: |
|
deployment_props = { |
|
deployment_client.deployments.ConfigurationMetaNames.NAME: deployment_name.value, |
|
deployment_client.deployments.ConfigurationMetaNames.BATCH: {}, |
|
deployment_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config}, |
|
|
|
} |
|
|
|
try: |
|
print(deployment_props) |
|
|
|
asset_details = deployment_client.repository.get_details(artifact_id) |
|
print(f"Asset found: {asset_details['metadata']['name']} with ID: {asset_details['metadata']['id']}") |
|
|
|
|
|
deployed_function = deployment_client.deployments.create(artifact_id, deployment_props) |
|
print(f"Creating deployment from Asset: {artifact_id} with deployment properties {str(deployment_props)}") |
|
return deployed_function |
|
except Exception as e: |
|
print(f"Deployment error: {str(e)}") |
|
return None |
|
|
|
def get_deployment_id(deployed_function): |
|
deployment_id = deployment_client.deployments.get_uid(deployment_details) |
|
return deployment_id |
|
|
|
def get_deployment_info(deployment_id): |
|
deployment_info = deployment_client.deployments.get_details(deployment_id) |
|
return deployment_info |
|
|
|
deployment_status = mo.state("No deployments yet") |
|
|
|
if hw_selection_table.value['ID'].iloc[0]: |
|
selected_hw_config = hw_selection_table.value['ID'].iloc[0] |
|
|
|
deploy_button = mo.ui.button( |
|
label="Deploy Function", |
|
on_click=lambda _: deploy_function(artifact_id, deployment_type), |
|
kind="success", |
|
tooltip="Click to deploy function to watsonx.ai" |
|
) |
|
|
|
if deployment_client and upload_button.value: |
|
deployment_definition = mo.hstack([ |
|
deployment_type, |
|
deployment_name |
|
], justify="space-around") |
|
else: |
|
deployment_definition = mo.hstack([ |
|
"No Deployment Type Selected", |
|
"No Deployment Name Provided" |
|
], justify="space-around") |
|
|
|
|
|
return ( |
|
deploy_button, |
|
deploy_function, |
|
deployment_definition, |
|
deployment_status, |
|
get_deployment_id, |
|
get_deployment_info, |
|
selected_hw_config, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(deploy_button, deployment_definition, mo): |
|
_ = deployment_definition |
|
|
|
deploy_fnc = mo.vstack([ |
|
deploy_button, |
|
deploy_button.value |
|
], justify="space-around", align="center") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return (deploy_fnc,) |
|
|
|
|
|
@app.cell(hide_code=True) |
|
def _(deployment_client, mo): |
|
|
|
|
|
def get_deployment_list(): |
|
dep_df = deployment_client.deployments.list() |
|
|
|
return pd.DataFrame(dep_df) |
|
|
|
def get_deployment_ids(): |
|
if deployments_dataframe is not []: |
|
df = deployments_dataframe.value |
|
dep_list = df['ID'].tolist() |
|
else: |
|
dep_list = [] |
|
return dep_list |
|
|
|
def get_data_assets_list(): |
|
d_assets_df = deployment_client.data_assets.list() |
|
|
|
return pd.DataFrame(d_assets_df) |
|
|
|
def get_data_asset_ids(): |
|
if repository_dataframe is not []: |
|
df = data_assets_dataframe.value |
|
data_asset_list = df['ASSET_ID'].tolist() |
|
else: |
|
data_asset_list = [] |
|
return data_asset_list |
|
|
|
|
|
def get_repository_list(): |
|
rep_df = deployment_client.repository.list() |
|
|
|
return pd.DataFrame(rep_df) |
|
|
|
def get_repository_ids(): |
|
if repository_dataframe is not []: |
|
df = repository_dataframe.value |
|
repository_list = df['ID'].tolist() |
|
else: |
|
repository_list = [] |
|
return repository_list |
|
|
|
def delete_with_progress(ids_list, delete_function, item_type="items"): |
|
""" |
|
Generic wrapper that adds a progress bar to any deletion function |
|
|
|
Parameters: |
|
ids_list: List of IDs to delete |
|
delete_function: Function that deletes a single ID |
|
item_type: String describing what's being deleted (for display) |
|
""" |
|
with mo.status.progress_bar( |
|
total=len(ids_list) or 1, |
|
title=f"Purging {item_type}", |
|
subtitle=f"Deleting {item_type}...", |
|
completion_title="Purge Complete", |
|
completion_subtitle=f"Successfully deleted {len(ids_list)} {item_type}" |
|
) as progress: |
|
for item_id in ids_list: |
|
delete_function(item_id) |
|
progress.update(increment=1) |
|
return f"Deleted {len(ids_list)} {item_type} successfully" |
|
|
|
|
|
def delete_deployments(deployment_ids): |
|
return delete_with_progress( |
|
deployment_ids, |
|
lambda id: deployment_client.deployments.delete(id), |
|
"deployments" |
|
) |
|
|
|
def delete_data_assets(data_asset_ids): |
|
return delete_with_progress( |
|
data_asset_ids, |
|
lambda id: deployment_client.data_assets.delete(id), |
|
"data assets" |
|
) |
|
|
|
def delete_repository_items(repository_ids): |
|
return delete_with_progress( |
|
repository_ids, |
|
lambda id: deployment_client.repository.delete(id), |
|
"repository items" |
|
) |
|
return ( |
|
delete_data_assets, |
|
delete_deployments, |
|
delete_repository_items, |
|
delete_with_progress, |
|
get_data_asset_ids, |
|
get_data_assets_list, |
|
get_deployment_ids, |
|
get_deployment_list, |
|
get_repository_ids, |
|
get_repository_list, |
|
) |
|
|
|
@app.cell(hide_code=True) |
|
def _( |
|
delete_data_assets, |
|
delete_deployments, |
|
delete_repository_items, |
|
get_data_asset_ids, |
|
get_data_assets_list, |
|
get_deployment_ids, |
|
get_deployment_list, |
|
get_repository_ids, |
|
get_repository_list, |
|
mo, |
|
): |
|
|
|
get_data_assets_button = mo.ui.button( |
|
label="Get Data Assets Dataframe", |
|
on_click=lambda _: get_data_assets_list(), |
|
kind="neutral", |
|
) |
|
|
|
get_data_asset_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_data_asset_ids(), |
|
|
|
kind="neutral", |
|
) |
|
|
|
purge_data_assets = mo.ui.button( |
|
label="Purge Data Assets", |
|
on_click=lambda _: delete_data_assets(get_data_asset_id_list.value), |
|
kind="danger", |
|
) |
|
|
|
|
|
get_deployments_button = mo.ui.button( |
|
label="Get Deployments Dataframe", |
|
on_click=lambda _: get_deployment_list(), |
|
kind="neutral", |
|
) |
|
|
|
get_deployment_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_deployment_ids(), |
|
|
|
kind="neutral", |
|
) |
|
|
|
purge_deployments = mo.ui.button( |
|
label="Purge Deployments", |
|
on_click=lambda _: delete_deployments(get_deployment_id_list.value), |
|
kind="danger", |
|
) |
|
|
|
|
|
get_repository_button = mo.ui.button( |
|
label="Get Repository Dataframe", |
|
on_click=lambda _: get_repository_list(), |
|
kind="neutral", |
|
) |
|
|
|
get_repository_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_repository_ids(), |
|
|
|
kind="neutral", |
|
) |
|
|
|
purge_repository = mo.ui.button( |
|
label="Purge Repository Items", |
|
on_click=lambda _: delete_repository_items(get_repository_id_list.value), |
|
kind="danger", |
|
) |
|
return ( |
|
get_data_asset_id_list, |
|
get_data_assets_button, |
|
get_deployment_id_list, |
|
get_deployments_button, |
|
get_repository_button, |
|
get_repository_id_list, |
|
purge_data_assets, |
|
purge_deployments, |
|
purge_repository, |
|
) |
|
|
|
|
|
|
|
@app.cell |
|
def _(get_deployment_id_list, get_deployments_button, mo, purge_deployments): |
|
|
|
deployments_purge_stack = mo.hstack([get_deployments_button, get_deployment_id_list, purge_deployments]) |
|
deployments_purge_stack_results = mo.vstack([deployments_dataframe, get_deployment_id_list.value, purge_deployments.value]) |
|
|
|
deployments_purge_tab = mo.vstack([deployments_purge_stack, deployments_purge_stack_results]) |
|
|
|
return ( |
|
deployments_purge_stack, |
|
deployments_purge_stack_results, |
|
deployments_purge_tab, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(get_repository_button, get_repository_id_list, mo, purge_repository): |
|
|
|
repository_purge_stack = mo.hstack([get_repository_button, get_repository_id_list, purge_repository]) |
|
repository_purge_stack_results = mo.vstack([repository_dataframe, get_repository_id_list.value, purge_repository.value]) |
|
|
|
repository_purge_tab = mo.vstack([repository_purge_stack, repository_purge_stack_results]) |
|
return ( |
|
repository_purge_stack, |
|
repository_purge_stack_results, |
|
repository_purge_tab, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(get_data_asset_id_list, get_data_assets_button, mo, purge_data_assets): |
|
|
|
data_assets_purge_stack = mo.hstack([get_data_assets_button, get_data_asset_id_list, purge_data_assets]) |
|
data_assets_purge_stack_results = mo.vstack([data_assets_dataframe, get_data_asset_id_list.value, purge_data_assets.value]) |
|
|
|
data_assets_purge_tab = mo.vstack([data_assets_purge_stack, data_assets_purge_stack_results]) |
|
return ( |
|
data_assets_purge_stack, |
|
data_assets_purge_stack_results, |
|
data_assets_purge_tab, |
|
) |
|
|
|
@app.cell |
|
def _(get_data_assets_button, get_repository_button, mo, purge_data_assets): |
|
deployments_dataframe = [] |
|
data_assets_dataframe = [] |
|
repository_dataframe = [] |
|
|
|
|
|
try: |
|
if 'get_deployments_button' in globals() and get_deployments_button.value is not None: |
|
deployments_dataframe = mo.ui.table(get_deployments_button.value, initial_selection=[0]) |
|
except (NameError, AttributeError): |
|
pass |
|
|
|
try: |
|
if 'get_data_assets_button' in globals() and get_data_assets_button.value is not None: |
|
data_assets_dataframe = mo.ui.table(get_data_assets_button.value, initial_selection=[0]) |
|
except (NameError, AttributeError): |
|
pass |
|
|
|
try: |
|
if 'get_repository_button' in globals() and get_repository_button.value is not None: |
|
repository_dataframe = mo.ui.table(get_repository_button.value, initial_selection=[0]) |
|
except (NameError, AttributeError): |
|
pass |
|
|
|
return ( |
|
deployments_dataframe, |
|
data_assets_dataframe, |
|
repository_dataframe, |
|
) |
|
|
|
|
|
|
|
@app.cell |
|
def _(data_assets_purge_tab, deployments_purge_tab, mo, repository_purge_tab): |
|
if deployments_purge_stack: |
|
purge_tabs = mo.ui.tabs( |
|
{"Purge Deployments": deployments_purge_tab, "Purge Repository Assets": repository_purge_tab,"Purge Data Assets": data_assets_purge_tab }, lazy=False |
|
) |
|
else: |
|
purge_tabs = mo.md("**Instantiate the watsonx.ai Deployment Space Client.**") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return (purge_tabs,) |
|
|
|
if __name__ == "__main__": |
|
app.run() |
|
|