MilanM's picture
Update app_v2.py
2b31766 verified
import marimo
__generated_with = "0.11.16"
app = marimo.App(width="medium")
@app.cell
def _():
import marimo as mo
import os
return mo, os
@app.cell
def _():
def get_markdown_content(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
return (get_markdown_content,)
@app.cell
def _(get_markdown_content, mo):
intro_text = get_markdown_content('intro_markdown/intro.md')
intro_marimo = get_markdown_content('intro_markdown/intro_marimo.md')
intro_notebook = get_markdown_content('intro_markdown/intro_notebook.md')
intro_comparison = get_markdown_content('intro_markdown/intro_comparison.md')
intro = mo.carousel([
mo.md(f"{intro_text}"),
mo.md(f"{intro_marimo}"),
mo.md(f"{intro_notebook}"),
mo.md(f"{intro_comparison}"),
])
mo.accordion({"## Notebook Introduction":intro})
return intro, intro_comparison, intro_marimo, intro_notebook, intro_text
@app.cell
def _(os):
### Imports
from typing import (
Any, Dict, List, Optional, Pattern, Set, Union, Tuple
)
from pathlib import Path
from urllib.request import urlopen
# from rich.markdown import Markdown as Markd
from rich.text import Text
from rich import print
from tqdm import tqdm
from enum import Enum
import pandas as pd
import tempfile
import requests
import getpass
import urllib3
import base64
import time
import json
import uuid
import ssl
import ast
import re
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)
# Set explicit temporary directory
os.environ['TMPDIR'] = '/tmp'
# Make sure Python's tempfile module also uses this directory
tempfile.tempdir = '/tmp'
return (
Any,
Dict,
Enum,
List,
Optional,
Path,
Pattern,
Set,
Text,
Tuple,
Union,
ast,
base64,
getpass,
json,
pd,
print,
re,
requests,
ssl,
tempfile,
time,
tqdm,
urllib3,
urlopen,
uuid,
)
@app.cell
def _(mo):
### Credentials for the watsonx.ai SDK client
# Endpoints
wx_platform_url = "https://api.dataplatform.cloud.ibm.com"
regions = {
"US": "https://us-south.ml.cloud.ibm.com",
"EU": "https://eu-de.ml.cloud.ibm.com",
"GB": "https://eu-gb.ml.cloud.ibm.com",
"JP": "https://jp-tok.ml.cloud.ibm.com",
"AU": "https://au-syd.ml.cloud.ibm.com",
"CA": "https://ca-tor.ml.cloud.ibm.com"
}
# Create a form with multiple elements
client_instantiation_form = (
mo.md('''
###**watsonx.ai credentials:**
{wx_region}
{wx_api_key}
{space_id}
''').style(max_height="300px", overflow="auto", border_color="blue")
.batch(
wx_region = mo.ui.dropdown(regions, label="Select your watsonx.ai region:", value="US", searchable=True),
wx_api_key = mo.ui.text(placeholder="Add your IBM Cloud api-key...", label="IBM Cloud Api-key:", kind="password"),
# project_id = mo.ui.text(placeholder="Add your watsonx.ai project_id...", label="Project_ID:", kind="text"),
space_id = mo.ui.text(placeholder="Add your watsonx.ai space_id...", label="Space_ID:", kind="text")
,)
.form(show_clear_button=True, bordered=False)
)
# client_instantiation_form
return client_instantiation_form, regions, wx_platform_url
@app.cell
def _(client_instantiation_form, mo):
from ibm_watsonx_ai import APIClient, Credentials
def setup_task_credentials(deployment_client):
# Get existing task credentials
existing_credentials = deployment_client.task_credentials.get_details()
# Delete existing credentials if any
if "resources" in existing_credentials and existing_credentials["resources"]:
for cred in existing_credentials["resources"]:
cred_id = deployment_client.task_credentials.get_id(cred)
deployment_client.task_credentials.delete(cred_id)
# Store new credentials
return deployment_client.task_credentials.store()
if client_instantiation_form.value:
### Instantiate the watsonx.ai client
wx_credentials = Credentials(
url=client_instantiation_form.value["wx_region"],
api_key=client_instantiation_form.value["wx_api_key"]
)
# project_client = APIClient(credentials=wx_credentials, project_id=client_instantiation_form.value["project_id"])
deployment_client = APIClient(credentials=wx_credentials, space_id=client_instantiation_form.value["space_id"])
task_credentials_details = setup_task_credentials(deployment_client)
else:
# project_client = None
deployment_client = None
task_credentials_details = None
template_variant = mo.ui.dropdown(["Base","Stream Files to IBM COS [Example]"], label="Code Template:", value="Base")
if deployment_client is not None:
client_callout_kind = "success"
else:
client_callout_kind = "neutral"
client_callout = mo.callout(template_variant, kind=client_callout_kind)
# client_callout
return (
APIClient,
Credentials,
client_callout,
client_callout_kind,
deployment_client,
setup_task_credentials,
task_credentials_details,
template_variant,
wx_credentials,
)
@app.cell
def _(
client_callout,
client_instantiation_form,
deploy_fnc,
deployment_definition,
fm,
function_editor,
hw_selection_table,
mo,
purge_tabs,
sc_m,
schema_editors,
selection_table,
upload_func,
):
s1 = mo.md(f'''
###**Instantiate your watsonx.ai client:**
1. Select a region from the dropdown menu
2. Provide an IBM Cloud Apikey and watsonx.ai deployment space id
3. Once you submit, the area with the code template will turn green if successful
4. Select a base (provide baseline format) or example code function template
---
{client_instantiation_form}
---
{client_callout}
''')
sc_tabs = mo.ui.tabs(
{
"Schema Option Selection": sc_m,
"Schema Definition": mo.md(f"""
####**Edit the schema definitions you selected in the previous tab.**<br>
{schema_editors}"""),
}
)
s2 = mo.md(f'''###**Create your function from the template:**
1. Use the code editor window to create a function to deploy
<br>
The function must:
<br>
--- Include a payload and score element
<br>
--- Have the same function name in both the score = <name>() segment and the Function Name input field below
<br>
--- Additional details can be found here -> [watsonx.ai - Writing deployable Python functions
](https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/ml-deploy-py-function-write.html?utm_medium=Exinfluencer&utm_source=ibm_developer&utm_content=in_content_link&utm_term=10006555&utm_id=blogs_awb-tekton-optimizations-for-kubeflow-pipelines-2-0&context=wx&audience=wdp)
3. Click submit, then proceed to select whether you wish to add:
<br>
--- An input schema (describing the format of the variables the function takes) **[Optional]**
<br>
--- An output schema (describing the format of the output results the function returns) **[Optional]**
<br>
--- An sample input example (showing an example of a mapping of the input and output schema to actual values.) **[Optional]**
4. Fill in the function name field **(must be exactly the same as in the function editor)**
5. Add a description and metadata tags **[Optional]**
---
{function_editor}
---
{sc_tabs}
---
{fm}
''')
s3 = mo.md(f'''
###**Review and Upload your function**
1. Review the function metadata specs JSON
2. Select a software specification if necessary (default for python functions is pre-selected), this is the runtime environment of python that your function will run in. Environments on watsonx.ai come pre-packaged with many different libraries, if necessary install new ones by adding them into the function as a `subprocess.check_output('pip install <package_name>', shell=True)` command.
3. Once your are satisfied, click the upload function button and wait for the response.
> If you see no table of software specs, you haven't activated your watsonx.ai client.
---
{selection_table}
---
{upload_func}
''')
s4 = mo.md(f'''
###**Deploy your function:**
1. Select a hardware specification (vCPUs/GB) that you want your function deployed on
<br>
--- XXS and XS cost the same (0.5 CUH per hour, so XS is the better option
<br>
--- Select larger instances for more resource intensive tasks or runnable jobs
2. Select the type of deployment:
<br>
--- Function (Online) for always-on endpoints - Always available and low latency, but consume resources continuously for every hour they are deployed.
<br>
--- Batch (Batch) for runnable jobs - Only consume resources during job runs, but aren't as flexible to deploy.
3. If you've selected Function, pick a completely unique (globally, not just your account) deployment serving name that will be in the endpoint url.
4. Once your are satisfied, click the deploy function button and wait for the response.
---
{hw_selection_table}
---
{deployment_definition}
---
{deploy_fnc}
''')
s5 = mo.md(f'''
###**Helper Purge Functions:**
These functions help you retrieve, select and delete deployments, data assets or repository assets (functions, models, etc.) that you have in the deployment space. This is meant to support fast cleanup.
Select the tab based on what you want to delete, then click each of the buttons one by one after the previous gives a response.
---
{purge_tabs}
''')
sections = mo.accordion(
{
"Section 1: **watsonx.ai Credentials**": s1,
"Section 2: **Function Creation**": s2,
"Section 3: **Function Upload**": s3,
"Section 4: **Function Deployment**": s4,
"Section 5: **Helper Functions**": s5,
},
multiple=True
)
sections
return s1, s2, s3, s4, s5, sc_tabs, sections
@app.cell
def _(mo, template_variant):
# Template for WatsonX.ai deployable function
if template_variant.value == "Stream Files to IBM COS [Example]":
with open("stream_files_to_cos.py", "r") as file:
template = file.read()
else:
template = '''def your_function_name():
import subprocess
subprocess.check_output('pip install gensim', shell=True)
import gensim
def score(input_data):
message_from_input_payload = payload.get("input_data")[0].get("values")[0][0]
response_message = "Received message - {0}".format(message_from_input_payload)
# Score using the pre-defined model
score_response = {
'predictions': [{'fields': ['Response_message_field', 'installed_lib_version'],
'values': [[response_message, gensim.__version__]]
}]
}
return score_response
return score
score = your_function_name()
'''
function_editor = (
mo.md('''
#### **Create your function by editing the template:**
{editor}
''')
.batch(
editor = mo.ui.code_editor(value=template, language="python", min_height=50)
)
.form(show_clear_button=True, bordered=False)
)
# function_editor
return file, function_editor, template
@app.cell
def _(function_editor, mo, os):
if function_editor.value:
# Get the edited code from the function editor
code = function_editor.value['editor']
# Create a namespace to execute the code in
namespace = {}
# Execute the code
exec(code, namespace)
# Find the first function defined in the namespace
function_name = None
for name, obj in namespace.items():
if callable(obj) and name != "__builtins__":
function_name = name
break
if function_name:
# Instantiate the deployable function
deployable_function = namespace[function_name]
# Now deployable_function contains the score function
mo.md(f"Created deployable function from '{function_name}'")
# Create the directory if it doesn't exist
save_dir = "/tmp/notebook_functions"
os.makedirs(save_dir, exist_ok=True)
# Save the function code to a file
file_path = os.path.join(save_dir, f"{function_name}.py")
with open(file_path, "w") as f:
f.write(code)
else:
mo.md("No function found in the editor code")
return (
code,
deployable_function,
f,
file_path,
function_name,
name,
namespace,
obj,
save_dir,
)
@app.cell
def _(deployment_client, mo, pd):
if deployment_client:
supported_specs = deployment_client.software_specifications.list()[
deployment_client.software_specifications.list()['STATE'] == 'supported'
]
# Reset the index to start from 0
supported_specs = supported_specs.reset_index(drop=True)
# Create a mapping dictionary for framework names based on software specifications
framework_mapping = {
"tensorflow_rt24.1-py3.11": "TensorFlow",
"pytorch-onnx_rt24.1-py3.11": "PyTorch",
"onnxruntime_opset_19": "ONNX or ONNXRuntime",
"runtime-24.1-py3.11": "AI Services/Python Functions/Python Scripts",
"autoai-ts_rt24.1-py3.11": "AutoAI",
"autoai-kb_rt24.1-py3.11": "AutoAI",
"runtime-24.1-py3.11-cuda": "CUDA-enabled (GPU) Python Runtime",
"runtime-24.1-r4.3": "R Runtime 4.3",
"spark-mllib_3.4": "Apache Spark 3.4",
"autoai-rag_rt24.1-py3.11": "AutoAI RAG"
}
# Define the preferred order for items to appear at the top
preferred_order = [
"runtime-24.1-py3.11",
"runtime-24.1-py3.11-cuda",
"runtime-24.1-r4.3",
"ai-service-v5-software-specification",
"autoai-rag_rt24.1-py3.11",
"autoai-ts_rt24.1-py3.11",
"autoai-kb_rt24.1-py3.11",
"tensorflow_rt24.1-py3.11",
"pytorch-onnx_rt24.1-py3.11",
"onnxruntime_opset_19",
"spark-mllib_3.4",
]
# Create a new column for sorting
supported_specs['SORT_ORDER'] = supported_specs['NAME'].apply(
lambda x: preferred_order.index(x) if x in preferred_order else len(preferred_order)
)
# Sort the DataFrame by the new column
supported_specs = supported_specs.sort_values('SORT_ORDER').reset_index(drop=True)
# Drop the sorting column as it's no longer needed
supported_specs = supported_specs.drop(columns=['SORT_ORDER'])
# Drop the REPLACEMENT column if it exists and add NOTES column
if 'REPLACEMENT' in supported_specs.columns:
supported_specs = supported_specs.drop(columns=['REPLACEMENT'])
# Add NOTES column with framework information
supported_specs['NOTES'] = supported_specs['NAME'].map(framework_mapping).fillna("Other")
# Create a table with single-row selection
selection_table = mo.ui.table(
supported_specs,
selection="single", # Only allow selecting one row
label="#### **Select a supported software_spec runtime for your function asset** (For Python Functions select - *'runtime-24.1-py3.11'* ):",
initial_selection=[0], # Now selecting the first row, which should be runtime-24.1-py3.11
page_size=6
)
else:
sel_df = pd.DataFrame(
data=[["ID", "Activate deployment_client."]],
columns=["ID", "VALUE"]
)
selection_table = mo.ui.table(
sel_df,
selection="single", # Only allow selecting one row
label="You haven't activated the Deployment_Client",
initial_selection=[0]
)
# # Display the table
# mo.md(f"""---
# <br>
# <br>
# {selection_table}
# <br>
# <br>
# ---
# <br>
# <br>
# """)
return (
framework_mapping,
preferred_order,
sel_df,
selection_table,
supported_specs,
)
@app.cell
def _(mo):
input_schema_checkbox = mo.ui.checkbox(label="Add input schema (optional)")
output_schema_checkbox = mo.ui.checkbox(label="Add output schema (optional)")
sample_input_checkbox = mo.ui.checkbox(label="Add sample input example (optional)")
return input_schema_checkbox, output_schema_checkbox, sample_input_checkbox
@app.cell
def _(
input_schema_checkbox,
mo,
output_schema_checkbox,
sample_input_checkbox,
selection_table,
template_variant,
):
if selection_table.value['ID'].iloc[0]:
# Create the input fields
if template_variant.value == "Stream Files to IBM COS [Example]":
fnc_nm = "stream_file_to_cos"
else:
fnc_nm = "your_function_name"
uploaded_function_name = mo.ui.text(placeholder="<Must be the same as the name in editor>", label="Function Name:", kind="text", value=f"{fnc_nm}", full_width=False)
tags_editor = mo.ui.array(
[mo.ui.text(placeholder="Metadata Tags..."), mo.ui.text(), mo.ui.text()],
label="Optional Metadata Tags"
)
software_spec = selection_table.value['ID'].iloc[0]
description_input = mo.ui.text_area(
placeholder="Write a description for your function...)",
label="Description",
max_length=256,
rows=5,
full_width=True
)
func_metadata=mo.hstack([
description_input,
mo.hstack([
uploaded_function_name,
tags_editor,
], justify="start", gap=1, align="start", wrap=True)
],
widths=[0.6,0.4],
gap=2.75
)
schema_metadata=mo.hstack([
input_schema_checkbox,
output_schema_checkbox,
sample_input_checkbox
],
justify="center", gap=1, align="center", wrap=True
)
# Display the metadata inputs
# mo.vstack([
# func_metadata,
# mo.md("**Make sure to click the checkboxes before filling in descriptions and tags or they will reset.**"),
# schema_metadata
# ],
# align="center",
# gap=2
# )
fm = mo.vstack([
func_metadata,
],
align="center",
gap=2
)
sc_m = mo.vstack([
schema_metadata,
mo.md("**Make sure to select the checkbox options before filling in descriptions and tags or they will reset.**")
],
align="center",
gap=2
)
return (
description_input,
fm,
fnc_nm,
func_metadata,
sc_m,
schema_metadata,
software_spec,
tags_editor,
uploaded_function_name,
)
@app.cell
def _(json, mo, template_variant):
if template_variant.value == "Stream Files to IBM COS [Example]":
from cos_stream_schema_examples import input_schema, output_schema, sample_input
else:
input_schema = [
{
'id': '1',
'type': 'struct',
'fields': [
{
'name': '<variable name 1>',
'type': 'string',
'nullable': False,
'metadata': {}
},
{
'name': '<variable name 2>',
'type': 'string',
'nullable': False,
'metadata': {}
}
]
}
]
output_schema = [
{
'id': '1',
'type': 'struct',
'fields': [
{
'name': '<output return name>',
'type': 'string',
'nullable': False,
'metadata': {}
}
]
}
]
sample_input = {
'input_data': [
{
'fields': ['<variable name 1>', '<variable name 2>'],
'values': [
['<sample input value for variable 1>', '<sample input value for variable 2>']
]
}
]
}
input_schema_editor = mo.ui.code_editor(value=json.dumps(input_schema, indent=4), language="python", min_height=25)
output_schema_editor = mo.ui.code_editor(value=json.dumps(output_schema, indent=4), language="python", min_height=25)
sample_input_editor = mo.ui.code_editor(value=json.dumps(sample_input, indent=4), language="python", min_height=25)
schema_editors = mo.accordion(
{
"""**Input Schema Metadata Editor**""": input_schema_editor,
"""**Output Schema Metadata Editor**""": output_schema_editor,
"""**Sample Input Metadata Editor**""": sample_input_editor
}, multiple=True
)
# schema_editors
return (
input_schema,
input_schema_editor,
output_schema,
output_schema_editor,
sample_input,
sample_input_editor,
schema_editors,
)
@app.cell
def _(
ast,
deployment_client,
description_input,
function_editor,
input_schema_checkbox,
input_schema_editor,
json,
mo,
os,
output_schema_checkbox,
output_schema_editor,
sample_input_checkbox,
sample_input_editor,
selection_table,
software_spec,
tags_editor,
uploaded_function_name,
):
get_upload_status, set_upload_status = mo.state("No uploads yet")
function_meta = {}
if selection_table.value['ID'].iloc[0] and deployment_client is not None:
# Start with the base required fields
function_meta = {
deployment_client.repository.FunctionMetaNames.NAME: f"{uploaded_function_name.value}" or "your_function_name",
deployment_client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec or "45f12dfe-aa78-5b8d-9f38-0ee223c47309"
}
# Add optional fields if they exist
if tags_editor.value:
# Filter out empty strings from the tags list
filtered_tags = [tag for tag in tags_editor.value if tag and tag.strip()]
if filtered_tags: # Only add if there are non-empty tags
function_meta[deployment_client.repository.FunctionMetaNames.TAGS] = filtered_tags
if description_input.value:
function_meta[deployment_client.repository.FunctionMetaNames.DESCRIPTION] = description_input.value
# Add input schema if checkbox is checked
if input_schema_checkbox.value:
try:
function_meta[deployment_client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = json.loads(input_schema_editor.value)
except json.JSONDecodeError:
# If JSON parsing fails, try Python literal evaluation as fallback
function_meta[deployment_client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = ast.literal_eval(input_schema_editor.value)
# Add output schema if checkbox is checked
if output_schema_checkbox.value:
try:
function_meta[deployment_client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = json.loads(output_schema_editor.value)
except json.JSONDecodeError:
# If JSON parsing fails, try Python literal evaluation as fallback
function_meta[deployment_client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = ast.literal_eval(output_schema_editor.value)
# Add sample input if checkbox is checked
if sample_input_checkbox.value:
try:
function_meta[deployment_client.repository.FunctionMetaNames.SAMPLE_SCORING_INPUT] = json.loads(sample_input_editor.value)
except json.JSONDecodeError:
# If JSON parsing fails, try Python literal evaluation as fallback
function_meta[deployment_client.repository.FunctionMetaNames.SAMPLE_SCORING_INPUT] = ast.literal_eval(sample_input_editor.value)
def upload_function(function_meta, use_function_object=True):
"""
Uploads a Python function to watsonx.ai as a deployable asset.
Parameters:
function_meta (dict): Metadata for the function
use_function_object (bool): Whether to use function object (True) or file path (False)
Returns:
dict: Details of the uploaded function
"""
# Store the original working directory
original_dir = os.getcwd()
try:
# Create temp file from the code in the editor
code_to_deploy = function_editor.value['editor']
# This function is defined elsewhere in the notebook
func_name = uploaded_function_name.value or "your_function_name"
# Ensure function_meta has the correct function name
function_meta[deployment_client.repository.FunctionMetaNames.NAME] = func_name
# Save the file locally first
save_dir = "/tmp/notebook_functions"
os.makedirs(save_dir, exist_ok=True)
file_path = f"{save_dir}/{func_name}.py"
with open(file_path, "w", encoding="utf-8") as f:
f.write(code_to_deploy)
if use_function_object:
# Import the function from the file
import sys
import importlib.util
# Add the directory to Python's path
sys.path.append(save_dir)
# Import the module
spec = importlib.util.spec_from_file_location(func_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Get the function object
function_object = getattr(module, func_name)
# Change to /tmp directory before calling IBM Watson SDK functions
os.chdir('/tmp')
# Upload the function object
mo.md(f"Uploading function object: {func_name}")
func_details = deployment_client.repository.store_function(function_object, function_meta)
else:
# Change to /tmp directory before calling IBM Watson SDK functions
os.chdir('/tmp')
# Upload using the file path approach
mo.md(f"Uploading function from file: {file_path}")
func_details = deployment_client.repository.store_function(file_path, function_meta)
set_upload_status(f"Latest Upload - id - {func_details['metadata']['id']}")
return func_details
except Exception as e:
set_upload_status(f"Error uploading function: {str(e)}")
mo.md(f"Detailed error: {str(e)}")
raise
finally:
# Always change back to the original directory, even if an exception occurs
os.chdir(original_dir)
upload_status = mo.state("No uploads yet")
upload_button = mo.ui.button(
label="Upload Function",
on_click=lambda _: upload_function(function_meta, use_function_object=True),
kind="success",
tooltip="Click to upload function to watsonx.ai"
)
# function_meta
return (
filtered_tags,
function_meta,
get_upload_status,
set_upload_status,
upload_button,
upload_function,
upload_status,
)
@app.cell
def _(get_upload_status, mo, upload_button):
# Upload your function
if upload_button.value:
try:
upload_result = upload_button.value
artifact_id = upload_result['metadata']['id']
except Exception as e:
mo.md(f"Error: {str(e)}")
upload_func = mo.vstack([
upload_button,
mo.md(f"**Status:** {get_upload_status()}")
], justify="space-around", align="center")
return artifact_id, upload_func, upload_result
@app.cell
def _(deployment_client, mo, pd, upload_button, uuid):
def reorder_hardware_specifications(df):
"""
Reorders a hardware specifications dataframe by type and size of environment
without hardcoding specific hardware types.
Parameters:
df (pandas.DataFrame): The hardware specifications dataframe to reorder
Returns:
pandas.DataFrame: Reordered dataframe with reset index
"""
# Create a copy to avoid modifying the original dataframe
result_df = df.copy()
# Define a function to extract the base type and size
def get_sort_key(name):
# Create a custom ordering list
custom_order = [
"XXS", "XS", "S", "M", "L", "XL",
"XS-Spark", "S-Spark", "M-Spark", "L-Spark", "XL-Spark",
"K80", "K80x2", "K80x4",
"V100", "V100x2",
"WXaaS-XS", "WXaaS-S", "WXaaS-M", "WXaaS-L", "WXaaS-XL",
"Default Spark", "Notebook Default Spark", "ML"
]
# If name is in the custom order list, use its index
if name in custom_order:
return (0, custom_order.index(name))
# For any name not in the custom order, put it at the end
return (1, name)
# Add a temporary column for sorting
result_df['sort_key'] = result_df['NAME'].apply(get_sort_key)
# Sort the dataframe and drop the temporary column
result_df = result_df.sort_values('sort_key').drop('sort_key', axis=1)
# Reset the index
result_df = result_df.reset_index(drop=True)
return result_df
if deployment_client and upload_button.value:
hardware_specs = deployment_client.hardware_specifications.list()
hardware_specs_df = reorder_hardware_specifications(hardware_specs)
# Create a table with single-row selection
hw_selection_table = mo.ui.table(
hardware_specs_df,
selection="single", # Only allow selecting one row
label="#### **Select a supported hardware_specification for your deployment** *(Default: 'XS' - 1vCPU_4GB Ram)*",
initial_selection=[1],
page_size=6,
wrapped_columns=['DESCRIPTION']
)
deployment_type = mo.ui.radio(
options={"Function":"Online (Function Endpoint)","Runnable Job":"Batch (Runnable Jobs)"}, value="Function", label="Select the Type of Deployment:", inline=True
)
uuid_suffix = str(uuid.uuid4())[:4]
deployment_name = mo.ui.text(value=f"deployed_func_{uuid_suffix}", label="Deployment Name:", placeholder="<Must be completely unique>")
else:
hw_df = pd.DataFrame(
data=[["ID", "Activate deployment_client."]],
columns=["ID", "VALUE"]
)
hw_selection_table = mo.ui.table(
hw_df,
selection="single", # Only allow selecting one row
label="You haven't activated the Deployment_Client",
initial_selection=[0]
)
# mo.md(f"""
# <br>
# <br>
# {upload_func}
# <br>
# <br>
# ---
# {hw_selection_table}
# <br>
# <br>
# """)
return (
deployment_name,
deployment_type,
hardware_specs,
hardware_specs_df,
hw_df,
hw_selection_table,
reorder_hardware_specifications,
uuid_suffix,
)
@app.cell
def _(
artifact_id,
deployment_client,
deployment_details,
deployment_name,
deployment_type,
hw_selection_table,
mo,
print,
upload_button,
):
def deploy_function(artifact_id, deployment_type):
"""
Deploys a function asset to watsonx.ai.
Parameters:
artifact_id (str): ID of the function artifact to deploy
deployment_type (object): Type of deployment (online or batch)
Returns:
dict: Details of the deployed function
"""
if not artifact_id:
print("Error: No artifact ID provided. Please upload a function first.")
return None
if deployment_type.value == "Online (Function Endpoint)": # Changed from "Online (Function Endpoint)"
deployment_props = {
deployment_client.deployments.ConfigurationMetaNames.NAME: deployment_name.value,
deployment_client.deployments.ConfigurationMetaNames.ONLINE: {},
deployment_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config},
deployment_client.deployments.ConfigurationMetaNames.SERVING_NAME: deployment_name.value,
}
else: # "Runnable Job" instead of "Batch (Runnable Jobs)"
deployment_props = {
deployment_client.deployments.ConfigurationMetaNames.NAME: deployment_name.value,
deployment_client.deployments.ConfigurationMetaNames.BATCH: {},
deployment_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config},
# batch does not use serving names
}
try:
print(deployment_props)
# First, get the asset details to confirm it exists
asset_details = deployment_client.repository.get_details(artifact_id)
print(f"Asset found: {asset_details['metadata']['name']} with ID: {asset_details['metadata']['id']}")
# Create the deployment
deployed_function = deployment_client.deployments.create(artifact_id, deployment_props)
print(f"Creating deployment from Asset: {artifact_id} with deployment properties {str(deployment_props)}")
return deployed_function
except Exception as e:
print(f"Deployment error: {str(e)}")
return None
def get_deployment_id(deployed_function):
deployment_id = deployment_client.deployments.get_uid(deployment_details)
return deployment_id
def get_deployment_info(deployment_id):
deployment_info = deployment_client.deployments.get_details(deployment_id)
return deployment_info
deployment_status = mo.state("No deployments yet")
if hw_selection_table.value['ID'].iloc[0]:
selected_hw_config = hw_selection_table.value['ID'].iloc[0]
deploy_button = mo.ui.button(
label="Deploy Function",
on_click=lambda _: deploy_function(artifact_id, deployment_type),
kind="success",
tooltip="Click to deploy function to watsonx.ai"
)
if deployment_client and upload_button.value:
deployment_definition = mo.hstack([
deployment_type,
deployment_name
], justify="space-around")
else:
deployment_definition = mo.hstack([
"No Deployment Type Selected",
"No Deployment Name Provided"
], justify="space-around")
# deployment_definition
return (
deploy_button,
deploy_function,
deployment_definition,
deployment_status,
get_deployment_id,
get_deployment_info,
selected_hw_config,
)
@app.cell
def _(deploy_button, deployment_definition, mo):
_ = deployment_definition
deploy_fnc = mo.vstack([
deploy_button,
deploy_button.value
], justify="space-around", align="center")
# mo.md(f"""
# {deployment_definition}
# <br>
# <br>
# {deploy_fnc}
# ---
# """)
return (deploy_fnc,)
@app.cell(hide_code=True)
def _(deployment_client, mo):
### Functions to List , Get ID's as a list and Purge of Assets
def get_deployment_list():
dep_df = deployment_client.deployments.list()
# deployment_df = mo.ui.table(dep_df, initial_selection=[0])
return pd.DataFrame(dep_df)
def get_deployment_ids():
if deployments_dataframe is not []:
df = deployments_dataframe.value
dep_list = df['ID'].tolist()
else:
dep_list = []
return dep_list
def get_data_assets_list():
d_assets_df = deployment_client.data_assets.list()
# data_assets_df = mo.ui.table(d_assets_df, initial_selection=[0])
return pd.DataFrame(d_assets_df)
def get_data_asset_ids():
if repository_dataframe is not []:
df = data_assets_dataframe.value
data_asset_list = df['ASSET_ID'].tolist()
else:
data_asset_list = []
return data_asset_list
### List Repository Assets, Get ID's as a list and Purge Repository Assets (AI Services, Functions, Models, etc.)
def get_repository_list():
rep_df = deployment_client.repository.list()
# repository_df = mo.ui.table(rep_df, initial_selection=[0])
return pd.DataFrame(rep_df)
def get_repository_ids():
if repository_dataframe is not []:
df = repository_dataframe.value
repository_list = df['ID'].tolist()
else:
repository_list = []
return repository_list
def delete_with_progress(ids_list, delete_function, item_type="items"):
"""
Generic wrapper that adds a progress bar to any deletion function
Parameters:
ids_list: List of IDs to delete
delete_function: Function that deletes a single ID
item_type: String describing what's being deleted (for display)
"""
with mo.status.progress_bar(
total=len(ids_list) or 1,
title=f"Purging {item_type}",
subtitle=f"Deleting {item_type}...",
completion_title="Purge Complete",
completion_subtitle=f"Successfully deleted {len(ids_list)} {item_type}"
) as progress:
for item_id in ids_list:
delete_function(item_id)
progress.update(increment=1)
return f"Deleted {len(ids_list)} {item_type} successfully"
# Use with existing deletion functions
def delete_deployments(deployment_ids):
return delete_with_progress(
deployment_ids,
lambda id: deployment_client.deployments.delete(id),
"deployments"
)
def delete_data_assets(data_asset_ids):
return delete_with_progress(
data_asset_ids,
lambda id: deployment_client.data_assets.delete(id),
"data assets"
)
def delete_repository_items(repository_ids):
return delete_with_progress(
repository_ids,
lambda id: deployment_client.repository.delete(id),
"repository items"
)
return (
delete_data_assets,
delete_deployments,
delete_repository_items,
delete_with_progress,
get_data_asset_ids,
get_data_assets_list,
get_deployment_ids,
get_deployment_list,
get_repository_ids,
get_repository_list,
)
@app.cell(hide_code=True)
def _(
delete_data_assets,
delete_deployments,
delete_repository_items,
get_data_asset_ids,
get_data_assets_list,
get_deployment_ids,
get_deployment_list,
get_repository_ids,
get_repository_list,
mo,
):
### Temporary Function Purge - Assets
get_data_assets_button = mo.ui.button(
label="Get Data Assets Dataframe",
on_click=lambda _: get_data_assets_list(),
kind="neutral",
)
get_data_asset_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_data_asset_ids(),
# on_click=lambda _: get_data_asset_ids(data_assets_dataframe.value),
kind="neutral",
)
purge_data_assets = mo.ui.button(
label="Purge Data Assets",
on_click=lambda _: delete_data_assets(get_data_asset_id_list.value),
kind="danger",
)
### Temporary Function Purge - Deployments
get_deployments_button = mo.ui.button(
label="Get Deployments Dataframe",
on_click=lambda _: get_deployment_list(),
kind="neutral",
)
get_deployment_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_deployment_ids(),
# on_click=lambda _: get_deployment_ids(deployments_dataframe.value),
kind="neutral",
)
purge_deployments = mo.ui.button(
label="Purge Deployments",
on_click=lambda _: delete_deployments(get_deployment_id_list.value),
kind="danger",
)
### Repository Items Purge
get_repository_button = mo.ui.button(
label="Get Repository Dataframe",
on_click=lambda _: get_repository_list(),
kind="neutral",
)
get_repository_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_repository_ids(),
# on_click=lambda _: get_repository_ids(repository_dataframe.value),
kind="neutral",
)
purge_repository = mo.ui.button(
label="Purge Repository Items",
on_click=lambda _: delete_repository_items(get_repository_id_list.value),
kind="danger",
)
return (
get_data_asset_id_list,
get_data_assets_button,
get_deployment_id_list,
get_deployments_button,
get_repository_button,
get_repository_id_list,
purge_data_assets,
purge_deployments,
purge_repository,
)
@app.cell
def _(get_deployment_id_list, get_deployments_button, mo, purge_deployments):
deployments_purge_stack = mo.hstack([get_deployments_button, get_deployment_id_list, purge_deployments])
deployments_purge_stack_results = mo.vstack([deployments_dataframe, get_deployment_id_list.value, purge_deployments.value])
deployments_purge_tab = mo.vstack([deployments_purge_stack, deployments_purge_stack_results])
return (
deployments_purge_stack,
deployments_purge_stack_results,
deployments_purge_tab,
)
@app.cell
def _(get_repository_button, get_repository_id_list, mo, purge_repository):
repository_purge_stack = mo.hstack([get_repository_button, get_repository_id_list, purge_repository])
repository_purge_stack_results = mo.vstack([repository_dataframe, get_repository_id_list.value, purge_repository.value])
repository_purge_tab = mo.vstack([repository_purge_stack, repository_purge_stack_results])
return (
repository_purge_stack,
repository_purge_stack_results,
repository_purge_tab,
)
@app.cell
def _(get_data_asset_id_list, get_data_assets_button, mo, purge_data_assets):
data_assets_purge_stack = mo.hstack([get_data_assets_button, get_data_asset_id_list, purge_data_assets])
data_assets_purge_stack_results = mo.vstack([data_assets_dataframe, get_data_asset_id_list.value, purge_data_assets.value])
data_assets_purge_tab = mo.vstack([data_assets_purge_stack, data_assets_purge_stack_results])
return (
data_assets_purge_stack,
data_assets_purge_stack_results,
data_assets_purge_tab,
)
@app.cell
def _(get_data_assets_button, get_repository_button, mo, purge_data_assets):
deployments_dataframe = []
data_assets_dataframe = []
repository_dataframe = []
# Only try to update if the buttons exist and have values
try:
if 'get_deployments_button' in globals() and get_deployments_button.value is not None:
deployments_dataframe = mo.ui.table(get_deployments_button.value, initial_selection=[0])
except (NameError, AttributeError):
pass
try:
if 'get_data_assets_button' in globals() and get_data_assets_button.value is not None:
data_assets_dataframe = mo.ui.table(get_data_assets_button.value, initial_selection=[0])
except (NameError, AttributeError):
pass
try:
if 'get_repository_button' in globals() and get_repository_button.value is not None:
repository_dataframe = mo.ui.table(get_repository_button.value, initial_selection=[0])
except (NameError, AttributeError):
pass
return (
deployments_dataframe,
data_assets_dataframe,
repository_dataframe,
)
@app.cell
def _(data_assets_purge_tab, deployments_purge_tab, mo, repository_purge_tab):
if deployments_purge_stack:
purge_tabs = mo.ui.tabs(
{"Purge Deployments": deployments_purge_tab, "Purge Repository Assets": repository_purge_tab,"Purge Data Assets": data_assets_purge_tab }, lazy=False
)
else:
purge_tabs = mo.md("**Instantiate the watsonx.ai Deployment Space Client.**")
# asset_purge = mo.accordion(
# {
# """<br>
# #### **Supporting Cleanup Functionality, lists of different assets and purge them if needed** *(purges all detected)*
# <br>""": purge_tabs,
# }
# )
# asset_purge
return (purge_tabs,)
if __name__ == "__main__":
app.run()