|
import marimo |
|
|
|
__generated_with = "0.13.0" |
|
app = marimo.App(width="medium") |
|
|
|
|
|
@app.cell |
|
def _(): |
|
import marimo as mo |
|
import os |
|
return mo, os |
|
|
|
|
|
@app.function |
|
def get_markdown_content(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
return content |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
intro_text = get_markdown_content('intro_markdown/intro.md') |
|
intro_marimo = get_markdown_content('intro_markdown/intro_marimo.md') |
|
intro_notebook = get_markdown_content('intro_markdown/intro_notebook.md') |
|
intro_comparison = get_markdown_content('intro_markdown/intro_comparison.md') |
|
|
|
intro = mo.carousel([ |
|
mo.md(f"{intro_text}"), |
|
mo.md(f"{intro_marimo}"), |
|
mo.md(f"{intro_notebook}"), |
|
mo.md(f"{intro_comparison}"), |
|
]) |
|
|
|
mo.accordion({"## Notebook Introduction":intro}) |
|
return |
|
|
|
|
|
@app.cell |
|
def _(os): |
|
|
|
from typing import Any, Dict, List, Optional, Pattern, Set, Union, Tuple |
|
from ibm_watsonx_ai import APIClient, Credentials |
|
from pathlib import Path |
|
import importlib.util |
|
import pandas as pd |
|
import mimetypes |
|
import requests |
|
import zipfile |
|
import polars |
|
import urllib3 |
|
import tempfile |
|
import importlib.util |
|
import base64 |
|
import certifi |
|
import uuid |
|
import time |
|
import json |
|
import sys |
|
import ssl |
|
import ast |
|
import io |
|
import re |
|
|
|
|
|
os.environ['TMPDIR'] = '/tmp/notebook_functions' |
|
|
|
|
|
os.makedirs('/tmp/notebook_functions', exist_ok=True) |
|
|
|
|
|
tempfile.tempdir = '/tmp/notebook_functions' |
|
|
|
def get_iam_token(api_key): |
|
return requests.post( |
|
'https://iam.cloud.ibm.com/identity/token', |
|
headers={'Content-Type': 'application/x-www-form-urlencoded'}, |
|
data={'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': api_key}, |
|
verify=certifi.where() |
|
).json()['access_token'] |
|
|
|
def setup_task_credentials(client): |
|
|
|
existing_credentials = client.task_credentials.get_details() |
|
|
|
|
|
if "resources" in existing_credentials and existing_credentials["resources"]: |
|
for cred in existing_credentials["resources"]: |
|
cred_id = client.task_credentials.get_id(cred) |
|
client.task_credentials.delete(cred_id) |
|
|
|
|
|
return client.task_credentials.store() |
|
|
|
def get_cred_value(key, creds_var_name="baked_in_creds", default=""): |
|
""" |
|
Helper function to safely get a value from a credentials dictionary. |
|
|
|
Args: |
|
key: The key to look up in the credentials dictionary. |
|
creds_var_name: The variable name of the credentials dictionary. |
|
default: The default value to return if the key is not found. |
|
|
|
Returns: |
|
The value from the credentials dictionary if it exists and contains the key, |
|
otherwise returns the default value. |
|
""" |
|
|
|
if creds_var_name in globals(): |
|
creds_dict = globals()[creds_var_name] |
|
if isinstance(creds_dict, dict) and key in creds_dict: |
|
return creds_dict[key] |
|
return default |
|
return ( |
|
APIClient, |
|
Credentials, |
|
ast, |
|
get_iam_token, |
|
importlib, |
|
json, |
|
pd, |
|
setup_task_credentials, |
|
sys, |
|
tempfile, |
|
uuid, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(client_instantiation_form, os): |
|
if client_instantiation_form.value: |
|
client_setup = client_instantiation_form.value |
|
else: |
|
client_setup = None |
|
|
|
|
|
if client_setup is not None: |
|
wx_url = client_setup["wx_region"] |
|
wx_api_key = client_setup["wx_api_key"].strip() |
|
os.environ["WATSONX_APIKEY"] = wx_api_key |
|
|
|
if client_setup["project_id"] is not None: |
|
project_id = client_setup["project_id"].strip() |
|
else: |
|
project_id = None |
|
|
|
if client_setup["space_id"] is not None: |
|
space_id = client_setup["space_id"].strip() |
|
else: |
|
space_id = None |
|
|
|
else: |
|
os.environ["WATSONX_APIKEY"] = "" |
|
project_id = None |
|
space_id = None |
|
wx_api_key = None |
|
wx_url = None |
|
return client_setup, project_id, space_id, wx_api_key, wx_url |
|
|
|
|
|
@app.cell |
|
def _(client_setup, get_iam_token, wx_api_key): |
|
if client_setup and wx_api_key is not None: |
|
token = get_iam_token(wx_api_key) |
|
else: |
|
token = None |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
|
|
|
|
|
|
wx_platform_url = "https://api.dataplatform.cloud.ibm.com" |
|
regions = { |
|
"US": "https://us-south.ml.cloud.ibm.com", |
|
"EU": "https://eu-de.ml.cloud.ibm.com", |
|
"GB": "https://eu-gb.ml.cloud.ibm.com", |
|
"JP": "https://jp-tok.ml.cloud.ibm.com", |
|
"AU": "https://au-syd.ml.cloud.ibm.com", |
|
"CA": "https://ca-tor.ml.cloud.ibm.com" |
|
} |
|
|
|
|
|
client_instantiation_form = ( |
|
mo.md(''' |
|
###**watsonx.ai credentials:** |
|
|
|
{wx_region} |
|
|
|
{wx_api_key} |
|
|
|
{space_id} |
|
''').style(max_height="300px", overflow="auto", border_color="blue") |
|
.batch( |
|
wx_region = mo.ui.dropdown(regions, label="Select your watsonx.ai region:", value="US", searchable=True), |
|
wx_api_key = mo.ui.text(placeholder="Add your IBM Cloud api-key...", label="IBM Cloud Api-key:", kind="password"), |
|
project_id = mo.ui.text(placeholder="Add your watsonx.ai project_id...", label="Project_ID:", kind="text"), |
|
space_id = mo.ui.text(placeholder="Add your watsonx.ai space_id...", label="Space_ID:", kind="text") |
|
,) |
|
.form(show_clear_button=True, bordered=False) |
|
) |
|
|
|
return (client_instantiation_form,) |
|
|
|
|
|
@app.cell |
|
def _( |
|
APIClient, |
|
Credentials, |
|
client_setup, |
|
project_id, |
|
setup_task_credentials, |
|
space_id, |
|
wx_api_key, |
|
wx_url, |
|
): |
|
|
|
if client_setup: |
|
wx_credentials = Credentials( |
|
url=wx_url, |
|
api_key=wx_api_key |
|
) |
|
|
|
if project_id: |
|
project_client = APIClient(credentials=wx_credentials, project_id=project_id) |
|
else: |
|
project_client = None |
|
|
|
if space_id: |
|
deployment_client = APIClient(credentials=wx_credentials, space_id=space_id) |
|
client = deployment_client |
|
else: |
|
deployment_client = None |
|
client = None |
|
|
|
if project_client is not None: |
|
task_credentials_details = setup_task_credentials(project_client) |
|
elif deployment_client is not None: |
|
task_credentials_details = setup_task_credentials(deployment_client) |
|
elif project_client is not None and deployment_client is not None: |
|
task_credentials_details = setup_task_credentials(project_client) |
|
else: |
|
task_credentials_details = None |
|
|
|
|
|
else: |
|
wx_credentials = None |
|
project_client = None |
|
deployment_client = None |
|
task_credentials_details = None |
|
client = None |
|
return client, deployment_client, project_client |
|
|
|
|
|
@app.cell |
|
def _(deployment_client, project_client): |
|
if project_client is not None or deployment_client is not None: |
|
client_callout_kind = "success" |
|
else: |
|
client_callout_kind = "neutral" |
|
return (client_callout_kind,) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
template_variants = [ |
|
"Base", |
|
"Stream Files to IBM COS [Example]", |
|
] |
|
template_variant = mo.ui.dropdown(template_variants, label="Code Template:", value="Base") |
|
return (template_variant,) |
|
|
|
|
|
@app.cell |
|
def _(client_callout_kind, client_instantiation_form, mo, template_variant): |
|
client_callout = mo.callout(template_variant, kind=client_callout_kind) |
|
client_stack = mo.hstack([client_instantiation_form, client_callout], align="center", justify="space-around") |
|
return (client_stack,) |
|
|
|
|
|
@app.cell |
|
def _(client_stack, mo): |
|
client_section = mo.md(f''' |
|
###**Instantiate your watsonx.ai client:** |
|
|
|
1. Select a region from the dropdown menu |
|
|
|
2. Provide an IBM Cloud Apikey and watsonx.ai deployment space id |
|
|
|
3. Once you submit, the area with the code template will turn green if successful |
|
|
|
4. Select a base (provide baseline format) or example code function template |
|
|
|
--- |
|
|
|
{client_stack} |
|
|
|
''') |
|
return (client_section,) |
|
|
|
|
|
@app.cell |
|
def _(mo, sc_m, schema_editors): |
|
sc_tabs = mo.ui.tabs( |
|
{ |
|
"Schema Option Selection": sc_m, |
|
"Schema Definition": mo.md(f""" |
|
####**Edit the schema definitions you selected in the previous tab.**<br> |
|
{schema_editors}"""), |
|
} |
|
) |
|
return (sc_tabs,) |
|
|
|
|
|
@app.cell |
|
def _(fm, function_editor, mo, sc_tabs): |
|
function_section = mo.md(f'''###**Create your function from the template:** |
|
|
|
1. Use the code editor window to create a function to deploy |
|
<br> |
|
The function must: |
|
<br> |
|
--- Include a payload and score element |
|
<br> |
|
--- Have the same function name in both the score = <name>() segment and the Function Name input field below |
|
<br> |
|
--- Additional details can be found here -> [watsonx.ai - Writing deployable Python functions |
|
](https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/ml-deploy-py-function-write.html?utm_medium=Exinfluencer&utm_source=ibm_developer&utm_content=in_content_link&utm_term=10006555&utm_id=blogs_awb-tekton-optimizations-for-kubeflow-pipelines-2-0&context=wx&audience=wdp) |
|
|
|
3. Click submit, then proceed to select whether you wish to add: |
|
<br> |
|
--- An input schema (describing the format of the variables the function takes) **[Optional]** |
|
<br> |
|
--- An output schema (describing the format of the output results the function returns) **[Optional]** |
|
<br> |
|
--- An sample input example (showing an example of a mapping of the input and output schema to actual values.) **[Optional]** |
|
|
|
4. Fill in the function name field **(must be exactly the same as in the function editor)** |
|
|
|
5. Add a description and metadata tags **[Optional]** |
|
|
|
--- |
|
|
|
{function_editor} |
|
|
|
--- |
|
|
|
{sc_tabs} |
|
|
|
--- |
|
|
|
{fm} |
|
|
|
''') |
|
return (function_section,) |
|
|
|
|
|
@app.cell |
|
def _(mo, selection_table, upload_func): |
|
upload_section = mo.md(f''' |
|
###**Review and Upload your function** |
|
|
|
1. Review the function metadata specs JSON |
|
|
|
2. Select a software specification if necessary (default for python functions is pre-selected), this is the runtime environment of python that your function will run in. Environments on watsonx.ai come pre-packaged with many different libraries, if necessary install new ones by adding them into the function as a `subprocess.check_output('pip install <package_name>', shell=True)` command. |
|
|
|
3. Once your are satisfied, click the upload function button and wait for the response. |
|
|
|
> If you see no table of software specs, you haven't activated your watsonx.ai client. |
|
|
|
--- |
|
|
|
{selection_table} |
|
|
|
--- |
|
|
|
{upload_func} |
|
|
|
''') |
|
return (upload_section,) |
|
|
|
|
|
@app.cell |
|
def _(deploy_fnc, deployment_definition, hw_selection_table, mo): |
|
deployment_section = mo.md(f''' |
|
###**Deploy your function:** |
|
|
|
1. Select a hardware specification (vCPUs/GB) that you want your function deployed on |
|
<br> |
|
--- XXS and XS cost the same (0.5 CUH per hour, so XS is the better option |
|
<br> |
|
--- Select larger instances for more resource intensive tasks or runnable jobs |
|
|
|
2. Select the type of deployment: |
|
<br> |
|
--- Function (Online) for always-on endpoints - Always available and low latency, but consume resources continuously for every hour they are deployed. |
|
<br> |
|
--- Batch (Batch) for runnable jobs - Only consume resources during job runs, but aren't as flexible to deploy. |
|
|
|
3. If you've selected Function, pick a completely unique (globally, not just your account) deployment serving name that will be in the endpoint url. |
|
|
|
4. Once your are satisfied, click the deploy function button and wait for the response. |
|
|
|
--- |
|
|
|
{hw_selection_table} |
|
|
|
--- |
|
|
|
{deployment_definition} |
|
|
|
--- |
|
|
|
{deploy_fnc} |
|
|
|
''') |
|
return (deployment_section,) |
|
|
|
|
|
@app.cell |
|
def _(mo, purge_tabs): |
|
purging_section = mo.md(f''' |
|
###**Helper Purge Functions:** |
|
|
|
These functions help you retrieve, select and delete deployments, data assets or repository assets (functions, models, etc.) that you have in the deployment space. This is meant to support fast cleanup. |
|
|
|
Select the tab based on what you want to delete, then click each of the buttons one by one after the previous gives a response. |
|
|
|
--- |
|
|
|
{purge_tabs} |
|
|
|
''') |
|
|
|
return (purging_section,) |
|
|
|
|
|
@app.cell |
|
def _( |
|
json, |
|
mo, |
|
package_analysis_stack, |
|
package_meta, |
|
ss_asset_response, |
|
yaml_template, |
|
): |
|
packages_section = mo.md(f''' |
|
###**If needed - Create a custom software-spec with added python packages** |
|
|
|
1. Check to see if the python library you want to use is already available inside watsonx.ai's runtime environment base specs for deployed functions by adding them as a comma separated list, e.g. - plotly, ibm-watsonx-ai==1.3.6, etc. into the text area. |
|
|
|
2. If you wish to see all the packages already present, check the box to return it alognside your validation results. |
|
|
|
--- |
|
|
|
{package_analysis_stack} |
|
|
|
--- |
|
|
|
3. If it isn't, you can create a custom software spec that adds a package_extension add-on to it. Use the template selector to see some examples, but when you are ready add your packages in the code editor after the "pip:" section. |
|
|
|
{yaml_template} |
|
|
|
4. Give your package extension and software spec names and descriptions and submit. |
|
|
|
5. You will find it in the Function Upload table afterward. |
|
|
|
--- |
|
|
|
{package_meta} |
|
|
|
--- |
|
|
|
Result: |
|
```json |
|
{json.dumps(ss_asset_response, indent=2)} |
|
``` |
|
''') |
|
return (packages_section,) |
|
|
|
|
|
@app.cell |
|
def _(client_section, mo): |
|
ui_accordion_section_1 = mo.accordion( |
|
{"Section 1: **watsonx.ai Credentials**": client_section} |
|
) |
|
ui_accordion_section_1 |
|
return |
|
|
|
|
|
@app.cell |
|
def _(function_section, mo): |
|
ui_accordion_section_2 = mo.accordion( |
|
{"Section 2: **Function Creation**": function_section} |
|
) |
|
ui_accordion_section_2 |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo, packages_section): |
|
ui_accordion_section_3 = mo.accordion( |
|
{"Section 3: **Create a Package Extension (Optional)**": packages_section} |
|
) |
|
ui_accordion_section_3 |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo, upload_section): |
|
ui_accordion_section_4 = mo.accordion( |
|
{"Section 4: **Function Upload**": upload_section} |
|
) |
|
ui_accordion_section_4 |
|
return |
|
|
|
|
|
@app.cell |
|
def _(deployment_section, mo): |
|
ui_accordion_section_5 = mo.accordion( |
|
{"Section 5: **Function Deployment**": deployment_section} |
|
) |
|
ui_accordion_section_5 |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo, purging_section): |
|
ui_accordion_section_6 = mo.accordion( |
|
{"Section 6: **Helper Functions**": purging_section} |
|
) |
|
ui_accordion_section_6 |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo, template_variant): |
|
|
|
if template_variant.value == "Stream Files to IBM COS [Example]": |
|
with open("stream_files_to_cos.py", "r") as file: |
|
template = file.read() |
|
else: |
|
template = '''def your_function_name(): |
|
|
|
import subprocess |
|
subprocess.check_output('pip install gensim', shell=True) |
|
import gensim |
|
|
|
def score(input_data): |
|
message_from_input_payload = payload.get("input_data")[0].get("values")[0][0] |
|
response_message = "Received message - {0}".format(message_from_input_payload) |
|
|
|
# Score using the pre-defined model |
|
score_response = { |
|
'predictions': [{'fields': ['Response_message_field', 'installed_lib_version'], |
|
'values': [[response_message, gensim.__version__]] |
|
}] |
|
} |
|
return score_response |
|
|
|
return score |
|
|
|
score = your_function_name() |
|
''' |
|
|
|
function_editor = ( |
|
mo.md(''' |
|
#### **Create your function by editing the template:** |
|
|
|
{editor} |
|
|
|
''') |
|
.batch( |
|
editor = mo.ui.code_editor(value=template, language="python", min_height=200, theme="dark") |
|
) |
|
.form(show_clear_button=True, bordered=False) |
|
) |
|
|
|
|
|
return (function_editor,) |
|
|
|
|
|
@app.cell |
|
def _(ast, function_editor, mo, os): |
|
function_name = None |
|
if function_editor.value: |
|
|
|
code = function_editor.value['editor'] |
|
|
|
|
|
try: |
|
|
|
parsed_ast = ast.parse(code) |
|
for node in parsed_ast.body: |
|
if isinstance(node, ast.FunctionDef): |
|
function_name = node.name |
|
break |
|
|
|
if function_name is not None: |
|
|
|
deployable_function = None |
|
mo.md(f"Found function: '{function_name}' (using file path approach)") |
|
|
|
|
|
save_dir = "/tmp/notebook_functions" |
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
file_path = os.path.join(save_dir, f"{function_name}.py") |
|
with open(file_path, "w") as f: |
|
f.write(code) |
|
else: |
|
mo.md("No function found in the editor code") |
|
except SyntaxError as e: |
|
mo.md(f"Syntax error in function code: {str(e)}") |
|
return (function_name,) |
|
|
|
|
|
@app.cell |
|
def _(): |
|
yaml_templates = { |
|
"empty": """dependencies: |
|
- pip |
|
- pip: |
|
- <library_name> |
|
- ... |
|
""", |
|
"llama_index_and_scikit": """dependencies: |
|
- pip |
|
- pip: |
|
- scikit-learn==1.6.1 |
|
- scikit-llm==1.4.1 |
|
- plotly==6.0.1 |
|
- altair==5.5.0 |
|
- PyMuPDF==1.25.1 |
|
- llama-index==0.12.24 |
|
- llama-index-readers-file==0.4.6 |
|
- llama-index-readers-web==0.3.8 |
|
""", |
|
"data_product_hub": """dependencies: |
|
- pip |
|
- pip: |
|
- PyMuPDF==1.25.1 |
|
- llama-index==0.12.24 |
|
- llama-index-readers-file==0.4.6 |
|
- llama-index-readers-web==0.3.8 |
|
- plotly==6.0.1 |
|
- altair==5.5.0 |
|
- dask==2025.2.0 |
|
- dph-services==0.4.0 |
|
""", |
|
"watsonx_data": """dependencies: |
|
- pip |
|
- pip: |
|
- prestodb |
|
- PyMuPDF==1.25.1 |
|
- llama-index==0.12.24 |
|
- llama-index-readers-file==0.4.6 |
|
- llama-index-readers-web==0.3.8 |
|
- ibm-watsonxdata==0.4.0 |
|
""" |
|
} |
|
return (yaml_templates,) |
|
|
|
|
|
@app.cell |
|
def _(mo, yaml_templates): |
|
yaml_template = mo.ui.dropdown(yaml_templates, searchable=True, label="**Select a template:**", value="empty") |
|
return (yaml_template,) |
|
|
|
|
|
@app.cell |
|
def _(tempfile): |
|
def create_yaml_tempfile(yaml_editor_value): |
|
"""Creates temporary YAML file and returns its path""" |
|
temp_file = tempfile.NamedTemporaryFile(suffix='.yaml', delete=False) |
|
with open(temp_file.name, 'w') as f: |
|
f.write(str(yaml_editor_value)) |
|
return temp_file.name |
|
return (create_yaml_tempfile,) |
|
|
|
|
|
@app.cell |
|
def _(mo, yaml_template): |
|
pkg_types = {"Conda Yaml":"conda_yml","Custom user library":"custom_library"} |
|
|
|
package_meta = ( |
|
mo.md('''**Create your Conda YAML by editing the template:** |
|
|
|
{yml_editor} |
|
|
|
{package_name} |
|
|
|
{package_description} |
|
|
|
{software_spec_name} |
|
|
|
{software_spec_description} |
|
''') |
|
.batch( |
|
yml_editor = mo.ui.code_editor(value=yaml_template.value, language="yaml", min_height=100, theme="dark"), |
|
package_name = mo.ui.text(placeholder="Python Package for...", |
|
label="Package Extension Name:", |
|
kind="text", |
|
value="Custom Python Package"), |
|
software_spec_name = mo.ui.text(placeholder="Software Spec Name", |
|
label="Custom Software Spec Name:", |
|
kind="text", value="Extended Python Function Software Spec"), |
|
package_description = mo.ui.text_area(placeholder="Write a description for your package.", |
|
label="Package Description:", |
|
value=" "), |
|
software_spec_description = mo.ui.text_area(placeholder="Write a description for your software spec.", |
|
label="Software Spec Description:", |
|
value=" "), |
|
package_type = mo.ui.dropdown(pkg_types, |
|
label="Select your package type:", |
|
value="Conda Yaml") |
|
) |
|
.form(show_clear_button=True, bordered=False) |
|
) |
|
return (package_meta,) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
check_packages =(mo.md(''' |
|
**Check if a package you want to use is in the base software_specification already:** |
|
|
|
{package_list} |
|
|
|
{return_full_list} |
|
''') |
|
.batch( |
|
package_list = mo.ui.text_area(placeholder="Add packages as a comma separated list (with or without versions)."), |
|
return_full_list = mo.ui.checkbox(value=False, label="Return a full list of packages in the base software specification.") |
|
) |
|
.form(show_clear_button=True, bordered=False) |
|
) |
|
return (check_packages,) |
|
|
|
|
|
@app.cell |
|
def _(check_packages): |
|
if check_packages.value is not None: |
|
packages = check_packages.value['package_list'] |
|
verification_list = [item.strip() for item in packages.split(',')] |
|
full_list_return = check_packages.value['return_full_list'] |
|
else: |
|
packages = None |
|
verification_list = None |
|
full_list_return = None |
|
return full_list_return, verification_list |
|
|
|
|
|
@app.cell |
|
def _( |
|
analyze_software_spec, |
|
base_software_spec, |
|
full_list_return, |
|
verification_list, |
|
visualize_software_spec, |
|
): |
|
if verification_list is not None: |
|
pkg_analysis = analyze_software_spec(base_software_spec, verification_list, return_full_sw_package_list=full_list_return) |
|
package_df = visualize_software_spec(pkg_analysis, verification_list) |
|
else: |
|
pkg_analysis = None |
|
package_df = None |
|
return (package_df,) |
|
|
|
|
|
@app.cell |
|
def _(check_packages, mo, package_df): |
|
package_analysis_stack = mo.vstack([check_packages, package_df], justify="center") |
|
return (package_analysis_stack,) |
|
|
|
|
|
@app.cell |
|
def _(client): |
|
if client is not None: |
|
base_sw_spec_id = "45f12dfe-aa78-5b8d-9f38-0ee223c47309" |
|
base_software_spec = client.software_specifications.get_details(base_sw_spec_id) |
|
else: |
|
base_sw_spec_id = None |
|
base_software_spec = None |
|
return base_software_spec, base_sw_spec_id |
|
|
|
|
|
@app.cell |
|
def _(client, create_yaml_tempfile, package_meta, uuid): |
|
if package_meta.value is not None and client is not None: |
|
pack_suffix = str(uuid.uuid4())[:4] |
|
pack_name = package_meta.value['package_name'] |
|
pe_metadata = { |
|
client.package_extensions.ConfigurationMetaNames.NAME: f"{pack_name}_{pack_suffix}", |
|
client.package_extensions.ConfigurationMetaNames.TYPE: package_meta.value['package_type'], |
|
client.software_specifications.ConfigurationMetaNames.DESCRIPTION:package_meta.value['package_description'] |
|
} |
|
yaml_file_path = create_yaml_tempfile(package_meta.value['yml_editor']) |
|
else: |
|
pe_metadata = { |
|
} |
|
yaml_file_path = None |
|
|
|
return pe_metadata, yaml_file_path |
|
|
|
|
|
@app.cell |
|
def _(client, pe_metadata, yaml_file_path): |
|
if yaml_file_path is not None: |
|
|
|
pe_asset_details = client.package_extensions.store( |
|
meta_props=pe_metadata, |
|
file_path=yaml_file_path |
|
) |
|
package_id = pe_asset_details["metadata"]["asset_id"] |
|
else: |
|
pe_asset_details = None |
|
package_id = None |
|
return (package_id,) |
|
|
|
|
|
@app.cell |
|
def _(): |
|
|
|
def analyze_software_spec(sw_spec_response, required_libraries, return_full_sw_package_list=False): |
|
""" |
|
Analyzes a software specification against a list of required libraries. |
|
|
|
Args: |
|
sw_spec_response (dict): The software specification response from the API |
|
required_libraries (list): List of required libraries in format ["package_name", "package_name==version", etc.] |
|
return_full_sw_package_list (bool): Whether to return the complete package list from sw_spec |
|
|
|
Returns: |
|
dict: A dictionary with analysis results containing: |
|
- not_present: Dict of libraries not found in the software spec |
|
- version_mismatch: Dict of libraries with version mismatches, with [sw_version, required_version] as values |
|
- present: Dict of libraries that are present with matching versions |
|
- sw_packages: (Optional) Complete dict of all packages in the software spec |
|
""" |
|
result = { |
|
"present": {}, |
|
"not_present": {}, |
|
"version_mismatch": {} |
|
} |
|
|
|
|
|
sw_packages = {} |
|
|
|
try: |
|
|
|
included_packages = sw_spec_response["entity"]["software_specification"]["software_configuration"]["included_packages"] |
|
|
|
|
|
for package in included_packages: |
|
package_name = package["name"] |
|
package_version = package["version"] |
|
sw_packages[package_name] = package_version |
|
except KeyError as e: |
|
raise ValueError(f"Invalid software specification format: {e}") |
|
|
|
|
|
for lib in required_libraries: |
|
if "==" in lib: |
|
lib_name, lib_version = lib.split("==", 1) |
|
lib_name = lib_name.strip() |
|
lib_version = lib_version.strip() |
|
else: |
|
lib_name = lib.strip() |
|
lib_version = None |
|
|
|
|
|
if lib_name not in sw_packages: |
|
result["not_present"][lib_name] = None |
|
elif lib_version is not None and lib_version != sw_packages[lib_name]: |
|
|
|
result["version_mismatch"][lib_name] = [sw_packages[lib_name], lib_version] |
|
else: |
|
|
|
result["present"][lib_name] = sw_packages[lib_name] |
|
|
|
if return_full_sw_package_list: |
|
|
|
req_libs_names = [lib.split("==")[0].strip() if "==" in lib else lib.strip() for lib in required_libraries] |
|
|
|
def sort_key(pkg_name): |
|
if pkg_name in result["not_present"]: |
|
return (0, pkg_name) |
|
elif pkg_name in result["version_mismatch"]: |
|
return (1, pkg_name) |
|
elif pkg_name in req_libs_names: |
|
return (2, pkg_name) |
|
else: |
|
return (3, pkg_name) |
|
|
|
|
|
result["sw_packages"] = {k: sw_packages[k] for k in sorted(sw_packages.keys(), key=sort_key)} |
|
|
|
|
|
for pkg in result["not_present"]: |
|
result["sw_packages"] = {pkg: None, **result["sw_packages"]} |
|
|
|
|
|
return result |
|
|
|
def visualize_software_spec(analysis_result, required_libraries=None): |
|
""" |
|
Visualizes the results of analyze_software_spec in a DataFrame with status indicators. |
|
For use in Marimo notebooks. |
|
|
|
Args: |
|
analysis_result (dict): The result from analyze_software_spec function |
|
required_libraries (list, optional): The original list of required libraries |
|
used in analyze_software_spec |
|
|
|
Returns: |
|
pandas.DataFrame: A DataFrame showing the analysis results with status indicators |
|
""" |
|
import pandas as pd |
|
|
|
|
|
req_libs_parsed = {} |
|
if required_libraries: |
|
for lib in required_libraries: |
|
if "==" in lib: |
|
lib_name, lib_version = lib.split("==", 1) |
|
lib_name = lib_name.strip() |
|
lib_version = lib_version.strip() |
|
req_libs_parsed[lib_name] = lib_version |
|
else: |
|
lib_name = lib.strip() |
|
req_libs_parsed[lib_name] = None |
|
|
|
|
|
has_full_list = "sw_packages" in analysis_result |
|
|
|
|
|
if has_full_list: |
|
|
|
packages = analysis_result["sw_packages"] |
|
|
|
|
|
rows = [] |
|
for package, version in packages.items(): |
|
if package in analysis_result.get("not_present", {}): |
|
status = "❌ Missing" |
|
priority = 0 |
|
elif package in analysis_result.get("version_mismatch", {}): |
|
status = "⚠️ Version Mismatch" |
|
priority = 1 |
|
elif package in req_libs_parsed: |
|
status = "✅ Present" |
|
priority = 2 |
|
else: |
|
status = "Other" |
|
priority = 3 |
|
|
|
rows.append({ |
|
"Package": package, |
|
"Version": version if version is not None else "Not Present", |
|
"Status": status, |
|
"_priority": priority |
|
}) |
|
|
|
df = pd.DataFrame(rows) |
|
|
|
|
|
df = df.sort_values(by=["_priority", "Package"]).drop("_priority", axis=1).reset_index(drop=True) |
|
|
|
else: |
|
|
|
packages = set(list(analysis_result.get("not_present", {}).keys()) + |
|
list(analysis_result.get("version_mismatch", {}).keys()) + |
|
list(analysis_result.get("present", {}).keys())) |
|
|
|
|
|
rows = [] |
|
for package in packages: |
|
if package in analysis_result.get("not_present", {}): |
|
version = "Not Present" |
|
status = "❌ Missing" |
|
priority = 0 |
|
elif package in analysis_result.get("version_mismatch", {}): |
|
version = analysis_result["version_mismatch"][package][0] |
|
status = "⚠️ Version Mismatch" |
|
priority = 1 |
|
else: |
|
version = analysis_result["present"][package] |
|
status = "✅ Present" |
|
priority = 2 |
|
|
|
rows.append({ |
|
"Package": package, |
|
"Version": version, |
|
"Status": status, |
|
"_priority": priority |
|
}) |
|
|
|
df = pd.DataFrame(rows) |
|
|
|
|
|
df = df.sort_values(by=["_priority", "Package"]).drop("_priority", axis=1).reset_index(drop=True) |
|
|
|
return df |
|
return analyze_software_spec, visualize_software_spec |
|
|
|
|
|
@app.cell |
|
def _(base_sw_spec_id, client, package_id, package_meta, uuid): |
|
if package_id is not None: |
|
|
|
ss_suffix = str(uuid.uuid4())[:4] |
|
ss_name = package_meta.value['software_spec_name'] |
|
|
|
ss_metadata = { |
|
client.software_specifications.ConfigurationMetaNames.NAME: f"{ss_name}_{ss_suffix}", |
|
client.software_specifications.ConfigurationMetaNames.DESCRIPTION: package_meta.value['software_spec_description'], |
|
client.software_specifications.ConfigurationMetaNames.BASE_SOFTWARE_SPECIFICATION: {'guid': base_sw_spec_id}, |
|
client.software_specifications.ConfigurationMetaNames.PACKAGE_EXTENSIONS: [{'guid': package_id}] |
|
} |
|
|
|
|
|
ss_asset_details = client.software_specifications.store(meta_props=ss_metadata) |
|
|
|
current_status = get_selection_table_status() |
|
if ss_asset_details is not None: |
|
new_status = 1 if current_status is None else current_status + 1 |
|
set_selection_table_status(new_status) |
|
|
|
if ss_asset_details is not None and "metadata" in ss_asset_details: |
|
ss_asset_response = ss_asset_details["metadata"] |
|
else: |
|
ss_asset_response = {"error": "Unsuccessful Upload"} |
|
else: |
|
ss_metadata = {} |
|
ss_asset_details = None |
|
ss_asset_response = None |
|
|
|
|
|
return (ss_asset_response,) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_selection_table_status, set_selection_table_status = mo.state(None) |
|
return get_selection_table_status, set_selection_table_status |
|
|
|
@app.cell |
|
def _(mo, client): |
|
if client: |
|
|
|
specs_df = client.software_specifications.list() |
|
elif get_selection_table_status() is not None and get_selection_table_status() > 0: |
|
specs_df = client.software_specifications.list() |
|
else: |
|
specs_df = None |
|
|
|
|
|
return (specs_df,) |
|
|
|
|
|
@app.cell |
|
def _(client, mo, pd, specs_df): |
|
if client: |
|
|
|
base_specs = specs_df[ |
|
(specs_df['STATE'] == 'supported') & |
|
(specs_df['NAME'].isin(['runtime-24.1-py3.11', 'runtime-24.1-py3.11-cuda'])) |
|
] |
|
|
|
derived_specs = specs_df[ |
|
(specs_df['TYPE'] == 'derived') |
|
] |
|
|
|
|
|
supported_specs = pd.concat([base_specs, derived_specs]).reset_index(drop=True) |
|
|
|
|
|
framework_mapping = { |
|
"tensorflow_rt24.1-py3.11": "TensorFlow", |
|
"pytorch-onnx_rt24.1-py3.11": "PyTorch", |
|
"onnxruntime_opset_19": "ONNX or ONNXRuntime", |
|
"runtime-24.1-py3.11": "AI Services/Python Functions/Python Scripts", |
|
"autoai-ts_rt24.1-py3.11": "AutoAI", |
|
"autoai-kb_rt24.1-py3.11": "AutoAI", |
|
"runtime-24.1-py3.11-cuda": "CUDA-enabled (GPU) Python Runtime", |
|
"runtime-24.1-r4.3": "R Runtime 4.3", |
|
"spark-mllib_3.4": "Apache Spark 3.4", |
|
"autoai-rag_rt24.1-py3.11": "AutoAI RAG" |
|
} |
|
|
|
|
|
preferred_order = [ |
|
"runtime-24.1-py3.11", |
|
"runtime-24.1-py3.11-cuda", |
|
"runtime-24.1-r4.3", |
|
"ai-service-v5-software-specification", |
|
"autoai-rag_rt24.1-py3.11", |
|
"autoai-ts_rt24.1-py3.11", |
|
"autoai-kb_rt24.1-py3.11", |
|
"tensorflow_rt24.1-py3.11", |
|
"pytorch-onnx_rt24.1-py3.11", |
|
"onnxruntime_opset_19", |
|
"spark-mllib_3.4", |
|
] |
|
|
|
|
|
supported_specs['SORT_ORDER'] = supported_specs['NAME'].apply( |
|
lambda x: preferred_order.index(x) if x in preferred_order else len(preferred_order) |
|
) |
|
|
|
|
|
supported_specs = supported_specs.sort_values('SORT_ORDER').reset_index(drop=True) |
|
|
|
|
|
supported_specs = supported_specs.drop(columns=['SORT_ORDER']) |
|
|
|
|
|
if 'REPLACEMENT' in supported_specs.columns: |
|
supported_specs = supported_specs.drop(columns=['REPLACEMENT']) |
|
|
|
|
|
supported_specs['NOTES'] = supported_specs['NAME'].map(framework_mapping).fillna("Other") |
|
|
|
|
|
selection_table = mo.ui.table( |
|
supported_specs, |
|
selection="single", |
|
label="#### **Select a supported software_spec runtime for your function asset** (For Python Functions select - *'runtime-24.1-py3.11'* ):", |
|
initial_selection=[0], |
|
page_size=6 |
|
) |
|
else: |
|
sel_df = pd.DataFrame( |
|
data=[["ID", "Activate client."]], |
|
columns=["ID", "VALUE"] |
|
) |
|
|
|
selection_table = mo.ui.table( |
|
sel_df, |
|
selection="single", |
|
label="You haven't activated the client", |
|
initial_selection=[0] |
|
) |
|
|
|
return (selection_table,) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_selected_sw_spec, set_selected_sw_spec = mo.state(None) |
|
return get_selected_sw_spec, set_selected_sw_spec |
|
|
|
|
|
@app.cell |
|
def _(selection_table, set_selected_sw_spec): |
|
if selection_table.value is not None: |
|
set_selected_sw_spec(selection_table.value['ID'].iloc[0]) |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_selected_hw_spec, set_selected_hw_spec = mo.state(None) |
|
return get_selected_hw_spec, set_selected_hw_spec |
|
|
|
|
|
@app.cell |
|
def _(hw_selection_table, set_selected_hw_spec): |
|
if hw_selection_table.value is not None: |
|
set_selected_hw_spec(hw_selection_table.value['ID'].iloc[0]) |
|
return |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
input_schema_checkbox = mo.ui.checkbox(label="Add input schema (optional)") |
|
output_schema_checkbox = mo.ui.checkbox(label="Add output schema (optional)") |
|
|
|
return input_schema_checkbox, output_schema_checkbox |
|
|
|
|
|
@app.cell |
|
def _( |
|
function_name, |
|
get_selected_sw_spec, |
|
input_schema_checkbox, |
|
mo, |
|
output_schema_checkbox, |
|
selection_table, |
|
template_variant, |
|
): |
|
if selection_table.value is not None: |
|
|
|
if function_name is not None: |
|
fnc_nm = function_name |
|
else: |
|
if template_variant.value == "Stream Files to IBM COS [Example]": |
|
fnc_nm = "stream_file_to_cos" |
|
else: |
|
fnc_nm = "your_function_name" |
|
|
|
uploaded_function_name = mo.ui.text(placeholder="<Must be the same as the name in editor>", label="Function Name:", kind="text", value=f"{fnc_nm}", full_width=False) |
|
tags_editor = mo.ui.array( |
|
[mo.ui.text(placeholder="Metadata Tags..."), mo.ui.text(), mo.ui.text()], |
|
label="Optional Metadata Tags" |
|
) |
|
software_spec = get_selected_sw_spec() |
|
|
|
description_input = mo.ui.text_area( |
|
placeholder="Write a description for your function...)", |
|
label="Description", |
|
max_length=256, |
|
rows=5, |
|
full_width=True |
|
) |
|
|
|
|
|
func_metadata=mo.hstack([ |
|
description_input, |
|
mo.hstack([ |
|
uploaded_function_name, |
|
tags_editor, |
|
], justify="start", gap=1, align="start", wrap=True) |
|
], |
|
widths=[0.6,0.4], |
|
gap=2.75 |
|
) |
|
|
|
schema_metadata=mo.hstack([ |
|
input_schema_checkbox, |
|
output_schema_checkbox, |
|
|
|
], |
|
justify="center", gap=1, align="center", wrap=True |
|
) |
|
|
|
fm = mo.vstack([ |
|
func_metadata, |
|
], |
|
align="center", |
|
gap=2 |
|
) |
|
sc_m = mo.vstack([ |
|
schema_metadata, |
|
mo.md("**Make sure to select the checkbox options before filling in descriptions and tags or they will reset.**") |
|
], |
|
align="center", |
|
gap=2 |
|
) |
|
return ( |
|
description_input, |
|
fm, |
|
sc_m, |
|
software_spec, |
|
tags_editor, |
|
uploaded_function_name, |
|
) |
|
|
|
|
|
@app.cell |
|
def _(json, mo, template_variant): |
|
if template_variant.value == "Stream Files to IBM COS [Example]": |
|
from cos_stream_schema_examples import input_schema, output_schema |
|
else: |
|
input_schema = [ |
|
{ |
|
'id': '1', |
|
'type': 'struct', |
|
'fields': [ |
|
{ |
|
'name': '<variable name 1>', |
|
'type': 'string', |
|
'nullable': False, |
|
'metadata': {} |
|
}, |
|
{ |
|
'name': '<variable name 2>', |
|
'type': 'string', |
|
'nullable': False, |
|
'metadata': {} |
|
} |
|
] |
|
} |
|
] |
|
|
|
output_schema = [ |
|
{ |
|
'id': '1', |
|
'type': 'struct', |
|
'fields': [ |
|
{ |
|
'name': '<output return name>', |
|
'type': 'string', |
|
'nullable': False, |
|
'metadata': {} |
|
} |
|
] |
|
} |
|
] |
|
|
|
|
|
|
|
input_schema_editor = mo.ui.code_editor(value=json.dumps(input_schema, indent=4), language="python", min_height=100,theme="dark") |
|
output_schema_editor = mo.ui.code_editor(value=json.dumps(output_schema, indent=4), language="python", min_height=100,theme="dark") |
|
|
|
schema_editors = mo.accordion( |
|
{ |
|
"""**Input Schema Metadata Editor**""": input_schema_editor, |
|
"""**Output Schema Metadata Editor**""": output_schema_editor, |
|
}, multiple=True |
|
) |
|
|
|
|
|
return input_schema_editor, output_schema_editor, schema_editors |
|
|
|
|
|
@app.cell |
|
def _( |
|
ast, |
|
client, |
|
description_input, |
|
function_editor, |
|
importlib, |
|
input_schema_checkbox, |
|
input_schema_editor, |
|
json, |
|
mo, |
|
os, |
|
output_schema_checkbox, |
|
output_schema_editor, |
|
software_spec, |
|
sys, |
|
tags_editor, |
|
uploaded_function_name, |
|
): |
|
get_upload_status, set_upload_status = mo.state("No uploads yet") |
|
|
|
function_meta = {} |
|
|
|
if software_spec and client is not None: |
|
|
|
function_meta = { |
|
client.repository.FunctionMetaNames.NAME: f"{uploaded_function_name.value}" or "your_function_name", |
|
client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec or "45f12dfe-aa78-5b8d-9f38-0ee223c47309" |
|
} |
|
|
|
|
|
if tags_editor.value: |
|
|
|
filtered_tags = [tag for tag in tags_editor.value if tag and tag.strip()] |
|
if filtered_tags: |
|
function_meta[client.repository.FunctionMetaNames.TAGS] = filtered_tags |
|
|
|
|
|
if description_input.value: |
|
function_meta[client.repository.FunctionMetaNames.DESCRIPTION] = description_input.value |
|
|
|
|
|
if input_schema_checkbox.value: |
|
try: |
|
function_meta[client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = json.loads(input_schema_editor.value) |
|
except json.JSONDecodeError: |
|
|
|
function_meta[client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = ast.literal_eval(input_schema_editor.value) |
|
|
|
|
|
if output_schema_checkbox.value: |
|
try: |
|
function_meta[client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = json.loads(output_schema_editor.value) |
|
except json.JSONDecodeError: |
|
|
|
function_meta[client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = ast.literal_eval(output_schema_editor.value) |
|
|
|
|
|
def upload_function(function_meta, use_function_object=False): |
|
""" |
|
Uploads a Python function to watsonx.ai as a deployable asset. |
|
Parameters: |
|
function_meta (dict): Metadata for the function |
|
use_function_object (bool): Whether to use function object (True) or file path (False) |
|
Returns: |
|
dict: Details of the uploaded function |
|
""" |
|
|
|
original_dir = os.getcwd() |
|
|
|
try: |
|
|
|
code_to_deploy = function_editor.value['editor'] |
|
|
|
func_name = uploaded_function_name.value or "your_function_name" |
|
|
|
function_meta[client.repository.FunctionMetaNames.NAME] = func_name |
|
|
|
save_dir = "/tmp/notebook_functions" |
|
os.makedirs(save_dir, exist_ok=True) |
|
file_path = f"{save_dir}/{func_name}.py" |
|
with open(file_path, "w", encoding="utf-8") as f: |
|
f.write(code_to_deploy) |
|
|
|
if use_function_object: |
|
|
|
|
|
sys.path.append(save_dir) |
|
|
|
spec = importlib.util.spec_from_file_location(func_name, file_path) |
|
module = importlib.util.module_from_spec(spec) |
|
spec.loader.exec_module(module) |
|
|
|
function_object = getattr(module, func_name) |
|
|
|
|
|
os.chdir('/tmp/notebook_functions') |
|
|
|
|
|
mo.md(f"Uploading function object: {func_name}") |
|
func_details = client.repository.store_function(function_object, function_meta) |
|
else: |
|
|
|
os.chdir('/tmp/notebook_functions') |
|
|
|
|
|
import gzip |
|
import shutil |
|
|
|
|
|
gz_path = f"{save_dir}/{func_name}.py.gz" |
|
|
|
|
|
with open(file_path, 'rb') as f_in: |
|
with gzip.open(gz_path, 'wb') as f_out: |
|
shutil.copyfileobj(f_in, f_out) |
|
|
|
|
|
mo.md(f"Uploading function from gzip: {gz_path}") |
|
func_details = client.repository.store_function(gz_path, function_meta) |
|
|
|
set_upload_status(f"Latest Upload - id - {func_details['metadata']['id']}") |
|
return func_details |
|
except Exception as e: |
|
set_upload_status(f"Error uploading function: {str(e)}") |
|
mo.md(f"Detailed error: {str(e)}") |
|
raise |
|
finally: |
|
|
|
os.chdir(original_dir) |
|
|
|
upload_status = mo.state("No uploads yet") |
|
|
|
upload_button = mo.ui.button( |
|
label="Upload Function", |
|
on_click=lambda _: upload_function(function_meta, use_function_object=False), |
|
kind="success", |
|
tooltip="Click to upload function to watsonx.ai" |
|
) |
|
|
|
|
|
return get_upload_status, upload_button |
|
|
|
|
|
@app.cell |
|
def _(get_upload_status, mo, upload_button): |
|
|
|
if upload_button.value: |
|
try: |
|
upload_result = upload_button.value |
|
artifact_id = upload_result['metadata']['id'] |
|
except Exception as e: |
|
mo.md(f"Error: {str(e)}") |
|
|
|
upload_func = mo.vstack([ |
|
upload_button, |
|
mo.md(f"**Status:** {get_upload_status()}") |
|
], justify="space-around", align="center") |
|
return artifact_id, upload_func |
|
|
|
|
|
@app.cell |
|
def _(client, mo, pd, upload_button, uuid): |
|
def reorder_hardware_specifications(df): |
|
""" |
|
Reorders a hardware specifications dataframe by type and size of environment |
|
without hardcoding specific hardware types. |
|
|
|
Parameters: |
|
df (pandas.DataFrame): The hardware specifications dataframe to reorder |
|
|
|
Returns: |
|
pandas.DataFrame: Reordered dataframe with reset index |
|
""" |
|
|
|
result_df = df.copy() |
|
|
|
|
|
def get_sort_key(name): |
|
|
|
custom_order = [ |
|
"XXS", "XS", "S", "M", "L", "XL", |
|
"XS-Spark", "S-Spark", "M-Spark", "L-Spark", "XL-Spark", |
|
"K80", "K80x2", "K80x4", |
|
"V100", "V100x2", |
|
"WXaaS-XS", "WXaaS-S", "WXaaS-M", "WXaaS-L", "WXaaS-XL", |
|
"Default Spark", "Notebook Default Spark", "ML" |
|
] |
|
|
|
|
|
if name in custom_order: |
|
return (0, custom_order.index(name)) |
|
|
|
|
|
return (1, name) |
|
|
|
|
|
result_df['sort_key'] = result_df['NAME'].apply(get_sort_key) |
|
|
|
|
|
result_df = result_df.sort_values('sort_key').drop('sort_key', axis=1) |
|
|
|
|
|
result_df = result_df.reset_index(drop=True) |
|
|
|
return result_df |
|
|
|
if client and upload_button.value: |
|
|
|
hardware_specs = client.hardware_specifications.list() |
|
hardware_specs_df = reorder_hardware_specifications(hardware_specs) |
|
|
|
|
|
hw_selection_table = mo.ui.table( |
|
hardware_specs_df, |
|
selection="single", |
|
label="#### **Select a supported hardware_specification for your deployment** *(Default: 'XS' - 1vCPU_4GB Ram)*", |
|
initial_selection=[1], |
|
page_size=6, |
|
wrapped_columns=['DESCRIPTION'] |
|
) |
|
|
|
deployment_type = mo.ui.radio( |
|
options={"Function":"Online (Function Endpoint)","Runnable Job":"Batch (Runnable Jobs)"}, value="Function", label="Select the Type of Deployment:", inline=True |
|
) |
|
uuid_suffix = str(uuid.uuid4())[:4] |
|
|
|
deployment_name = mo.ui.text(value=f"deployed_func_{uuid_suffix}", label="Deployment Name:", placeholder="<Must be completely unique>") |
|
else: |
|
hw_df = pd.DataFrame( |
|
data=[["ID", "Activate client."]], |
|
columns=["ID", "VALUE"] |
|
) |
|
|
|
hw_selection_table = mo.ui.table( |
|
hw_df, |
|
selection="single", |
|
label="You haven't activated the client", |
|
initial_selection=[0] |
|
) |
|
|
|
return deployment_name, deployment_type, hw_selection_table |
|
|
|
|
|
@app.cell |
|
def _( |
|
artifact_id, |
|
client, |
|
deployment_details, |
|
deployment_name, |
|
deployment_type, |
|
get_selected_hw_spec, |
|
hw_selection_table, |
|
mo, |
|
upload_button, |
|
): |
|
def deploy_function(artifact_id, deployment_type): |
|
""" |
|
Deploys a function asset to watsonx.ai. |
|
|
|
Parameters: |
|
artifact_id (str): ID of the function artifact to deploy |
|
deployment_type (object): Type of deployment (online or batch) |
|
|
|
Returns: |
|
dict: Details of the deployed function |
|
""" |
|
if not artifact_id: |
|
print("Error: No artifact ID provided. Please upload a function first.") |
|
return None |
|
|
|
if deployment_type.value == "Online (Function Endpoint)": |
|
deployment_props = { |
|
client.deployments.ConfigurationMetaNames.NAME: deployment_name.value, |
|
client.deployments.ConfigurationMetaNames.ONLINE: {}, |
|
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config}, |
|
client.deployments.ConfigurationMetaNames.SERVING_NAME: deployment_name.value, |
|
} |
|
else: |
|
deployment_props = { |
|
client.deployments.ConfigurationMetaNames.NAME: deployment_name.value, |
|
client.deployments.ConfigurationMetaNames.BATCH: {}, |
|
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config}, |
|
|
|
} |
|
|
|
try: |
|
print(deployment_props) |
|
|
|
asset_details = client.repository.get_details(artifact_id) |
|
print(f"Asset found: {asset_details['metadata']['name']} with ID: {asset_details['metadata']['id']}") |
|
|
|
|
|
deployed_function = client.deployments.create(artifact_id, deployment_props) |
|
print(f"Creating deployment from Asset: {artifact_id} with deployment properties {str(deployment_props)}") |
|
return deployed_function |
|
except Exception as e: |
|
print(f"Deployment error: {str(e)}") |
|
return None |
|
|
|
def get_deployment_id(deployed_function): |
|
deployment_id = client.deployments.get_uid(deployment_details) |
|
return deployment_id |
|
|
|
def get_deployment_info(deployment_id): |
|
deployment_info = client.deployments.get_details(deployment_id) |
|
return deployment_info |
|
|
|
deployment_status = mo.state("No deployments yet") |
|
|
|
if hw_selection_table.value is not None: |
|
selected_hw_config = get_selected_hw_spec() |
|
|
|
deploy_button = mo.ui.button( |
|
label="Deploy Function", |
|
on_click=lambda _: deploy_function(artifact_id, deployment_type), |
|
kind="success", |
|
tooltip="Click to deploy function to watsonx.ai" |
|
) |
|
|
|
if client and upload_button.value: |
|
deployment_definition = mo.hstack([ |
|
deployment_type, |
|
deployment_name |
|
], justify="space-around") |
|
else: |
|
deployment_definition = mo.hstack([ |
|
"No Deployment Type Selected", |
|
"No Deployment Name Provided" |
|
], justify="space-around") |
|
|
|
|
|
return deploy_button, deployment_definition |
|
|
|
|
|
@app.cell |
|
def _(deploy_button, deployment_definition, mo): |
|
_ = deployment_definition |
|
|
|
deploy_fnc = mo.vstack([ |
|
deploy_button, |
|
deploy_button.value |
|
], justify="space-around", align="center") |
|
|
|
return (deploy_fnc,) |
|
|
|
|
|
@app.cell |
|
def _(client, mo, pd, sys): |
|
|
|
|
|
def get_deployment_list(): |
|
dep_df = client.deployments.list() |
|
dep_df = pd.DataFrame(dep_df) |
|
|
|
columns_to_drop = [col for col in dep_df.columns if 'STATE' in col or 'REPLACEMENT' in col] |
|
if columns_to_drop: |
|
dep_df = dep_df.drop(columns=columns_to_drop) |
|
return dep_df |
|
|
|
def get_deployment_ids(df): |
|
dep_list = df['ID'].tolist() |
|
return dep_list |
|
|
|
|
|
|
|
def get_data_assets_list(): |
|
data_a_df = client.data_assets.list() |
|
data_a_df = pd.DataFrame(data_a_df) |
|
return data_a_df |
|
|
|
def get_data_asset_ids(df): |
|
data_asset_list = df['ASSET_ID'].tolist() |
|
return data_asset_list |
|
|
|
|
|
|
|
def get_repository_list(): |
|
rep_list_df = client.repository.list() |
|
rep_list_df = pd.DataFrame(rep_list_df) |
|
|
|
columns_to_drop = [col for col in ['SPEC_STATE', 'SPEC_REPLACEMENT'] if col in rep_list_df.columns] |
|
if columns_to_drop: |
|
rep_list_df = rep_list_df.drop(columns=columns_to_drop) |
|
return rep_list_df |
|
|
|
def get_repository_ids(df): |
|
repository_list = df['ID'].tolist() |
|
return repository_list |
|
|
|
|
|
|
|
def get_pkg_ext_list(): |
|
pkg_ext_list_df = client.package_extensions.list() |
|
pkg_ext_list_df = pd.DataFrame(pkg_ext_list_df) |
|
return pkg_ext_list_df |
|
|
|
def get_pkg_ext_ids(df): |
|
pkg_ext_id_list = df['ASSET_ID'].tolist() |
|
return pkg_ext_id_list |
|
|
|
|
|
|
|
def get_sws_list(): |
|
sws_list_df = client.software_specifications.list() |
|
|
|
derived_sws_list_df = sws_list_df[ |
|
(sws_list_df['TYPE'] == 'derived') |
|
] |
|
|
|
sws_list_df = pd.DataFrame(derived_sws_list_df).reset_index(drop=True) |
|
|
|
columns_to_drop = [col for col in ['STATE', 'REPLACEMENT'] if col in sws_list_df.columns] |
|
if columns_to_drop: |
|
sws_list_df = sws_list_df.drop(columns=columns_to_drop) |
|
return sws_list_df |
|
|
|
def get_sws_ids(df): |
|
sws_id_list = df['ID'].tolist() |
|
return sws_id_list |
|
|
|
|
|
|
|
def delete_with_progress(ids_list, delete_function, item_type="items", display_errors=True): |
|
errors = [] |
|
|
|
with mo.status.progress_bar( |
|
total=len(ids_list) or 1, |
|
title=f"Purging {item_type}", |
|
subtitle=f"Deleting {item_type}...", |
|
completion_title="Purge Complete", |
|
completion_subtitle=f"Successfully deleted {len(ids_list) - len(errors)} {item_type}", |
|
remove_on_exit=True |
|
) as progress: |
|
for item_id in ids_list: |
|
try: |
|
delete_function(item_id) |
|
except Exception as e: |
|
error_msg = f"Error deleting {item_type} with ID {item_id}: {str(e)}" |
|
if display_errors: |
|
print(error_msg) |
|
errors.append((item_id, str(e))) |
|
finally: |
|
progress.update(increment=1) |
|
|
|
if errors and display_errors: |
|
with mo.redirect_stderr(): |
|
sys.stderr.write("\nErrors encountered during deletion:\n") |
|
for item_id, error in errors: |
|
sys.stderr.write(f" - ID {item_id}: {error}\n") |
|
|
|
return f"Deleted {len(ids_list) - len(errors)} {item_type} successfully" |
|
|
|
|
|
def delete_deployments(deployment_ids): |
|
return delete_with_progress( |
|
deployment_ids, |
|
lambda id: client.deployments.delete(id), |
|
"deployments" |
|
) |
|
|
|
def delete_data_assets(data_asset_ids): |
|
return delete_with_progress( |
|
data_asset_ids, |
|
lambda id: client.data_assets.delete(id), |
|
"data assets" |
|
) |
|
|
|
def delete_repository_items(repository_ids): |
|
return delete_with_progress( |
|
repository_ids, |
|
lambda id: client.repository.delete(id), |
|
"repository items" |
|
) |
|
|
|
def delete_pkg_ext_items(pkg_ids): |
|
return delete_with_progress( |
|
pkg_ids, |
|
lambda id: client.package_extensions.delete(id), |
|
"package extensions" |
|
) |
|
|
|
def delete_sws_items(sws_ids): |
|
return delete_with_progress( |
|
sws_ids, |
|
lambda id: client.software_specifications.delete(id), |
|
"software specifications" |
|
) |
|
return ( |
|
delete_data_assets, |
|
delete_deployments, |
|
delete_pkg_ext_items, |
|
delete_repository_items, |
|
delete_sws_items, |
|
get_data_asset_ids, |
|
get_data_assets_list, |
|
get_deployment_ids, |
|
get_deployment_list, |
|
get_pkg_ext_ids, |
|
get_pkg_ext_list, |
|
get_repository_ids, |
|
get_repository_list, |
|
get_sws_ids, |
|
get_sws_list, |
|
) |
|
|
|
|
|
@app.cell |
|
def _( |
|
get_data_assets_tab, |
|
get_deployments_tab, |
|
get_pkg_ext_tab, |
|
get_repository_tab, |
|
get_sws_tab, |
|
mo, |
|
): |
|
if get_deployments_tab() is not None: |
|
deployments_table = mo.ui.table(get_deployments_tab()) |
|
else: |
|
deployments_table = mo.md("No Table Loaded") |
|
|
|
if get_repository_tab() is not None: |
|
repository_table = mo.ui.table(get_repository_tab()) |
|
else: |
|
repository_table = mo.md("No Table Loaded") |
|
|
|
if get_data_assets_tab() is not None: |
|
data_assets_table = mo.ui.table(get_data_assets_tab()) |
|
else: |
|
data_assets_table = mo.md("No Table Loaded") |
|
|
|
if get_sws_tab() is not None: |
|
sws_table = mo.ui.table(get_sws_tab()) |
|
else: |
|
sws_table = mo.md("No Table Loaded") |
|
|
|
if get_pkg_ext_tab() is not None: |
|
pkg_ext_table = mo.ui.table(get_pkg_ext_tab()) |
|
else: |
|
pkg_ext_table = mo.md("No Table Loaded") |
|
|
|
|
|
return ( |
|
data_assets_table, |
|
deployments_table, |
|
pkg_ext_table, |
|
repository_table, |
|
sws_table, |
|
) |
|
|
|
|
|
@app.cell |
|
def _( |
|
deployments_table, |
|
get_deployment_id_list, |
|
get_deployments_button, |
|
mo, |
|
purge_deployments, |
|
): |
|
deployments_purge_stack = mo.hstack([get_deployments_button, get_deployment_id_list, purge_deployments]) |
|
deployments_purge_stack_results = mo.vstack([deployments_table, get_deployment_id_list.value, purge_deployments.value]) |
|
|
|
deployments_purge_tab = mo.vstack([deployments_purge_stack, deployments_purge_stack_results]) |
|
return (deployments_purge_tab,) |
|
|
|
|
|
@app.cell |
|
def _( |
|
get_repository_button, |
|
get_repository_id_list, |
|
mo, |
|
purge_repository, |
|
repository_table, |
|
): |
|
repository_purge_stack = mo.hstack([get_repository_button, get_repository_id_list, purge_repository]) |
|
repository_purge_stack_results = mo.vstack([repository_table, get_repository_id_list.value, purge_repository.value]) |
|
|
|
repository_purge_tab = mo.vstack([repository_purge_stack, repository_purge_stack_results]) |
|
return (repository_purge_tab,) |
|
|
|
|
|
@app.cell |
|
def _( |
|
data_assets_table, |
|
get_data_asset_id_list, |
|
get_data_assets_button, |
|
mo, |
|
purge_data_assets, |
|
): |
|
data_assets_purge_stack = mo.hstack([get_data_assets_button, get_data_asset_id_list, purge_data_assets]) |
|
data_assets_purge_stack_results = mo.vstack([data_assets_table, get_data_asset_id_list.value, purge_data_assets.value]) |
|
|
|
data_assets_purge_tab = mo.vstack([data_assets_purge_stack, data_assets_purge_stack_results]) |
|
return (data_assets_purge_tab,) |
|
|
|
|
|
@app.cell |
|
def _(get_sws_button, get_sws_id_list, mo, purge_sws, sws_table): |
|
sws_purge_stack = mo.hstack([get_sws_button, get_sws_id_list, purge_sws]) |
|
sws_purge_stack_results = mo.vstack([sws_table, get_sws_id_list.value, purge_sws.value]) |
|
|
|
sws_purge_stack_tab = mo.vstack([sws_purge_stack, sws_purge_stack_results]) |
|
return (sws_purge_stack_tab,) |
|
|
|
|
|
@app.cell |
|
def _( |
|
get_pkg_ext_button, |
|
get_pkg_ext_id_list, |
|
mo, |
|
pkg_ext_table, |
|
purge_pkg_ext, |
|
): |
|
pkg_ext_purge_stack = mo.hstack([get_pkg_ext_button, get_pkg_ext_id_list, purge_pkg_ext]) |
|
pkg_ext_purge_stack_results = mo.vstack([pkg_ext_table, get_pkg_ext_id_list.value, purge_pkg_ext.value]) |
|
|
|
pkg_ext_purge_tab = mo.vstack([pkg_ext_purge_stack, pkg_ext_purge_stack_results]) |
|
return (pkg_ext_purge_tab,) |
|
|
|
|
|
@app.cell |
|
def _( |
|
data_assets_purge_tab, |
|
deployments_purge_tab, |
|
mo, |
|
pkg_ext_purge_tab, |
|
repository_purge_tab, |
|
sws_purge_stack_tab, |
|
): |
|
purge_tabs = mo.ui.tabs( |
|
{ |
|
"Purge Deployments": deployments_purge_tab, |
|
"Purge Repository Assets": repository_purge_tab, |
|
"Purge Data Assets": data_assets_purge_tab, |
|
"Purge Software Specifications": sws_purge_stack_tab, |
|
"Purge Package Extensions": pkg_ext_purge_tab, |
|
} |
|
, lazy=False |
|
) |
|
|
|
return (purge_tabs,) |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_deployments_tab, set_deployments_tab = mo.state(None) |
|
return get_deployments_tab, set_deployments_tab |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_repository_tab, set_repository_tab = mo.state(None) |
|
return get_repository_tab, set_repository_tab |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_data_assets_tab, set_data_assets_tab = mo.state(None) |
|
return get_data_assets_tab, set_data_assets_tab |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_pkg_ext_tab, set_pkg_ext_tab = mo.state(None) |
|
return get_pkg_ext_tab, set_pkg_ext_tab |
|
|
|
|
|
@app.cell |
|
def _(mo): |
|
get_sws_tab, set_sws_tab = mo.state(None) |
|
return get_sws_tab, set_sws_tab |
|
|
|
|
|
@app.cell |
|
def _( |
|
data_assets_table, |
|
delete_data_assets, |
|
delete_deployments, |
|
delete_pkg_ext_items, |
|
delete_repository_items, |
|
delete_sws_items, |
|
deployments_table, |
|
get_data_asset_ids, |
|
get_data_assets_list, |
|
get_deployment_ids, |
|
get_deployment_list, |
|
get_pkg_ext_ids, |
|
get_pkg_ext_list, |
|
get_repository_ids, |
|
get_repository_list, |
|
get_sws_ids, |
|
get_sws_list, |
|
mo, |
|
pkg_ext_table, |
|
repository_table, |
|
set_data_assets_tab, |
|
set_deployments_tab, |
|
set_pkg_ext_tab, |
|
set_repository_tab, |
|
set_sws_tab, |
|
sws_table, |
|
): |
|
|
|
get_data_assets_button = mo.ui.button( |
|
label="Get Data Assets Dataframe", |
|
on_click=lambda _: get_data_assets_list(), |
|
on_change=lambda value: set_data_assets_tab(value), |
|
kind="neutral", |
|
) |
|
|
|
get_data_asset_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_data_asset_ids(data_assets_table.value), |
|
kind="neutral", |
|
) |
|
|
|
purge_data_assets = mo.ui.button( |
|
label="Purge Data Assets", |
|
on_click=lambda _: delete_data_assets(get_data_asset_id_list.value), |
|
kind="danger", |
|
) |
|
|
|
|
|
get_deployments_button = mo.ui.button( |
|
label="Get Deployments Dataframe", |
|
on_click=lambda _: get_deployment_list(), |
|
on_change=lambda value: set_deployments_tab(value), |
|
kind="neutral", |
|
) |
|
|
|
get_deployment_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_deployment_ids(deployments_table.value), |
|
kind="neutral", |
|
) |
|
|
|
purge_deployments = mo.ui.button( |
|
label="Purge Deployments", |
|
on_click=lambda _: delete_deployments(get_deployment_id_list.value), |
|
kind="danger", |
|
) |
|
|
|
|
|
get_repository_button = mo.ui.button( |
|
label="Get Repository Dataframe", |
|
on_click=lambda _: get_repository_list(), |
|
on_change=lambda value: set_repository_tab(value), |
|
kind="neutral", |
|
) |
|
|
|
get_repository_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_repository_ids(repository_table.value), |
|
kind="neutral", |
|
) |
|
|
|
purge_repository = mo.ui.button( |
|
label="Purge Repository Items", |
|
on_click=lambda _: delete_repository_items(get_repository_id_list.value), |
|
kind="danger", |
|
) |
|
|
|
|
|
get_sws_button = mo.ui.button( |
|
label="Get Software Spec Dataframe", |
|
on_click=lambda _: get_sws_list(), |
|
on_change=lambda value: set_sws_tab(value), |
|
kind="neutral", |
|
) |
|
|
|
get_sws_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_sws_ids(sws_table.value), |
|
kind="neutral", |
|
) |
|
|
|
purge_sws = mo.ui.button( |
|
label="Purge Software Specifications", |
|
on_click=lambda _: delete_sws_items(get_sws_id_list.value), |
|
kind="danger", |
|
) |
|
|
|
|
|
|
|
get_pkg_ext_button = mo.ui.button( |
|
label="Get Package Extensions Dataframe", |
|
on_click=lambda _: get_pkg_ext_list(), |
|
on_change=lambda value: set_pkg_ext_tab(value), |
|
kind="neutral", |
|
) |
|
|
|
get_pkg_ext_id_list = mo.ui.button( |
|
label="Turn Dataframe into List of IDs", |
|
on_click=lambda _: get_pkg_ext_ids(pkg_ext_table.value), |
|
kind="neutral", |
|
) |
|
|
|
purge_pkg_ext = mo.ui.button( |
|
label="Purge Package Extensions", |
|
on_click=lambda _: delete_pkg_ext_items(get_pkg_ext_id_list.value), |
|
kind="danger", |
|
) |
|
return ( |
|
get_data_asset_id_list, |
|
get_data_assets_button, |
|
get_deployment_id_list, |
|
get_deployments_button, |
|
get_pkg_ext_button, |
|
get_pkg_ext_id_list, |
|
get_repository_button, |
|
get_repository_id_list, |
|
get_sws_button, |
|
get_sws_id_list, |
|
purge_data_assets, |
|
purge_deployments, |
|
purge_pkg_ext, |
|
purge_repository, |
|
purge_sws, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run() |
|
|