MilanM's picture
Update main_app.py
6fd1666 verified
import marimo
__generated_with = "0.13.0"
app = marimo.App(width="medium")
@app.cell
def _():
import marimo as mo
import os
return mo, os
@app.function
def get_markdown_content(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return content
@app.cell
def _(mo):
intro_text = get_markdown_content('intro_markdown/intro.md')
intro_marimo = get_markdown_content('intro_markdown/intro_marimo.md')
intro_notebook = get_markdown_content('intro_markdown/intro_notebook.md')
intro_comparison = get_markdown_content('intro_markdown/intro_comparison.md')
intro = mo.carousel([
mo.md(f"{intro_text}"),
mo.md(f"{intro_marimo}"),
mo.md(f"{intro_notebook}"),
mo.md(f"{intro_comparison}"),
])
mo.accordion({"## Notebook Introduction":intro})
return
@app.cell
def _(os):
### Imports
from typing import Any, Dict, List, Optional, Pattern, Set, Union, Tuple
from ibm_watsonx_ai import APIClient, Credentials
from pathlib import Path
import importlib.util
import pandas as pd
import mimetypes
import requests
import zipfile
import polars
import urllib3
import tempfile
import importlib.util
import base64
import certifi
import uuid
import time
import json
import sys
import ssl
import ast
import io
import re
# Set explicit temporary directory
os.environ['TMPDIR'] = '/tmp/notebook_functions'
# Create the directory if it doesn't exist
os.makedirs('/tmp/notebook_functions', exist_ok=True)
# Make sure Python's tempfile module also uses this directory
tempfile.tempdir = '/tmp/notebook_functions'
def get_iam_token(api_key):
return requests.post(
'https://iam.cloud.ibm.com/identity/token',
headers={'Content-Type': 'application/x-www-form-urlencoded'},
data={'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': api_key},
verify=certifi.where()
).json()['access_token']
def setup_task_credentials(client):
# Get existing task credentials
existing_credentials = client.task_credentials.get_details()
# Delete existing credentials if any
if "resources" in existing_credentials and existing_credentials["resources"]:
for cred in existing_credentials["resources"]:
cred_id = client.task_credentials.get_id(cred)
client.task_credentials.delete(cred_id)
# Store new credentials
return client.task_credentials.store()
def get_cred_value(key, creds_var_name="baked_in_creds", default=""): ### Helper for working with preset credentials
"""
Helper function to safely get a value from a credentials dictionary.
Args:
key: The key to look up in the credentials dictionary.
creds_var_name: The variable name of the credentials dictionary.
default: The default value to return if the key is not found.
Returns:
The value from the credentials dictionary if it exists and contains the key,
otherwise returns the default value.
"""
# Check if the credentials variable exists in globals
if creds_var_name in globals():
creds_dict = globals()[creds_var_name]
if isinstance(creds_dict, dict) and key in creds_dict:
return creds_dict[key]
return default
return (
APIClient,
Credentials,
ast,
get_iam_token,
importlib,
json,
pd,
setup_task_credentials,
sys,
tempfile,
uuid,
)
@app.cell
def _(client_instantiation_form, os):
if client_instantiation_form.value:
client_setup = client_instantiation_form.value
else:
client_setup = None
### Extract Credential Variables:
if client_setup is not None:
wx_url = client_setup["wx_region"]
wx_api_key = client_setup["wx_api_key"].strip()
os.environ["WATSONX_APIKEY"] = wx_api_key
if client_setup["project_id"] is not None:
project_id = client_setup["project_id"].strip()
else:
project_id = None
if client_setup["space_id"] is not None:
space_id = client_setup["space_id"].strip()
else:
space_id = None
else:
os.environ["WATSONX_APIKEY"] = ""
project_id = None
space_id = None
wx_api_key = None
wx_url = None
return client_setup, project_id, space_id, wx_api_key, wx_url
@app.cell
def _(client_setup, get_iam_token, wx_api_key):
if client_setup and wx_api_key is not None:
token = get_iam_token(wx_api_key)
else:
token = None
return
@app.cell
def _(mo):
### Credentials for the watsonx.ai SDK client
# Endpoints
wx_platform_url = "https://api.dataplatform.cloud.ibm.com"
regions = {
"US": "https://us-south.ml.cloud.ibm.com",
"EU": "https://eu-de.ml.cloud.ibm.com",
"GB": "https://eu-gb.ml.cloud.ibm.com",
"JP": "https://jp-tok.ml.cloud.ibm.com",
"AU": "https://au-syd.ml.cloud.ibm.com",
"CA": "https://ca-tor.ml.cloud.ibm.com"
}
# Create a form with multiple elements
client_instantiation_form = (
mo.md('''
###**watsonx.ai credentials:**
{wx_region}
{wx_api_key}
{space_id}
''').style(max_height="300px", overflow="auto", border_color="blue")
.batch(
wx_region = mo.ui.dropdown(regions, label="Select your watsonx.ai region:", value="US", searchable=True),
wx_api_key = mo.ui.text(placeholder="Add your IBM Cloud api-key...", label="IBM Cloud Api-key:", kind="password"),
project_id = mo.ui.text(placeholder="Add your watsonx.ai project_id...", label="Project_ID:", kind="text"),
space_id = mo.ui.text(placeholder="Add your watsonx.ai space_id...", label="Space_ID:", kind="text")
,)
.form(show_clear_button=True, bordered=False)
)
return (client_instantiation_form,)
@app.cell
def _(
APIClient,
Credentials,
client_setup,
project_id,
setup_task_credentials,
space_id,
wx_api_key,
wx_url,
):
### Instantiate the watsonx.ai client
if client_setup:
wx_credentials = Credentials(
url=wx_url,
api_key=wx_api_key
)
if project_id:
project_client = APIClient(credentials=wx_credentials, project_id=project_id)
else:
project_client = None
if space_id:
deployment_client = APIClient(credentials=wx_credentials, space_id=space_id)
client = deployment_client
else:
deployment_client = None
client = None
if project_client is not None:
task_credentials_details = setup_task_credentials(project_client)
elif deployment_client is not None:
task_credentials_details = setup_task_credentials(deployment_client)
elif project_client is not None and deployment_client is not None:
task_credentials_details = setup_task_credentials(project_client)
else:
task_credentials_details = None
else:
wx_credentials = None
project_client = None
deployment_client = None
task_credentials_details = None
client = None
return client, deployment_client, project_client
@app.cell
def _(deployment_client, project_client):
if project_client is not None or deployment_client is not None:
client_callout_kind = "success"
else:
client_callout_kind = "neutral"
return (client_callout_kind,)
@app.cell
def _(mo):
template_variants = [
"Base",
"Stream Files to IBM COS [Example]",
]
template_variant = mo.ui.dropdown(template_variants, label="Code Template:", value="Base")
return (template_variant,)
@app.cell
def _(client_callout_kind, client_instantiation_form, mo, template_variant):
client_callout = mo.callout(template_variant, kind=client_callout_kind)
client_stack = mo.hstack([client_instantiation_form, client_callout], align="center", justify="space-around")
return (client_stack,)
@app.cell
def _(client_stack, mo):
client_section = mo.md(f'''
###**Instantiate your watsonx.ai client:**
1. Select a region from the dropdown menu
2. Provide an IBM Cloud Apikey and watsonx.ai deployment space id
3. Once you submit, the area with the code template will turn green if successful
4. Select a base (provide baseline format) or example code function template
---
{client_stack}
''')
return (client_section,)
@app.cell
def _(mo, sc_m, schema_editors):
sc_tabs = mo.ui.tabs(
{
"Schema Option Selection": sc_m,
"Schema Definition": mo.md(f"""
####**Edit the schema definitions you selected in the previous tab.**<br>
{schema_editors}"""),
}
)
return (sc_tabs,)
@app.cell
def _(fm, function_editor, mo, sc_tabs):
function_section = mo.md(f'''###**Create your function from the template:**
1. Use the code editor window to create a function to deploy
<br>
The function must:
<br>
--- Include a payload and score element
<br>
--- Have the same function name in both the score = <name>() segment and the Function Name input field below
<br>
--- Additional details can be found here -> [watsonx.ai - Writing deployable Python functions
](https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/ml-deploy-py-function-write.html?utm_medium=Exinfluencer&utm_source=ibm_developer&utm_content=in_content_link&utm_term=10006555&utm_id=blogs_awb-tekton-optimizations-for-kubeflow-pipelines-2-0&context=wx&audience=wdp)
3. Click submit, then proceed to select whether you wish to add:
<br>
--- An input schema (describing the format of the variables the function takes) **[Optional]**
<br>
--- An output schema (describing the format of the output results the function returns) **[Optional]**
<br>
--- An sample input example (showing an example of a mapping of the input and output schema to actual values.) **[Optional]**
4. Fill in the function name field **(must be exactly the same as in the function editor)**
5. Add a description and metadata tags **[Optional]**
---
{function_editor}
---
{sc_tabs}
---
{fm}
''')
return (function_section,)
@app.cell
def _(mo, selection_table, upload_func):
upload_section = mo.md(f'''
###**Review and Upload your function**
1. Review the function metadata specs JSON
2. Select a software specification if necessary (default for python functions is pre-selected), this is the runtime environment of python that your function will run in. Environments on watsonx.ai come pre-packaged with many different libraries, if necessary install new ones by adding them into the function as a `subprocess.check_output('pip install <package_name>', shell=True)` command.
3. Once your are satisfied, click the upload function button and wait for the response.
> If you see no table of software specs, you haven't activated your watsonx.ai client.
---
{selection_table}
---
{upload_func}
''')
return (upload_section,)
@app.cell
def _(deploy_fnc, deployment_definition, hw_selection_table, mo):
deployment_section = mo.md(f'''
###**Deploy your function:**
1. Select a hardware specification (vCPUs/GB) that you want your function deployed on
<br>
--- XXS and XS cost the same (0.5 CUH per hour, so XS is the better option
<br>
--- Select larger instances for more resource intensive tasks or runnable jobs
2. Select the type of deployment:
<br>
--- Function (Online) for always-on endpoints - Always available and low latency, but consume resources continuously for every hour they are deployed.
<br>
--- Batch (Batch) for runnable jobs - Only consume resources during job runs, but aren't as flexible to deploy.
3. If you've selected Function, pick a completely unique (globally, not just your account) deployment serving name that will be in the endpoint url.
4. Once your are satisfied, click the deploy function button and wait for the response.
---
{hw_selection_table}
---
{deployment_definition}
---
{deploy_fnc}
''')
return (deployment_section,)
@app.cell
def _(mo, purge_tabs):
purging_section = mo.md(f'''
###**Helper Purge Functions:**
These functions help you retrieve, select and delete deployments, data assets or repository assets (functions, models, etc.) that you have in the deployment space. This is meant to support fast cleanup.
Select the tab based on what you want to delete, then click each of the buttons one by one after the previous gives a response.
---
{purge_tabs}
''')
return (purging_section,)
@app.cell
def _(
json,
mo,
package_analysis_stack,
package_meta,
ss_asset_response,
yaml_template,
):
packages_section = mo.md(f'''
###**If needed - Create a custom software-spec with added python packages**
1. Check to see if the python library you want to use is already available inside watsonx.ai's runtime environment base specs for deployed functions by adding them as a comma separated list, e.g. - plotly, ibm-watsonx-ai==1.3.6, etc. into the text area.
2. If you wish to see all the packages already present, check the box to return it alognside your validation results.
---
{package_analysis_stack}
---
3. If it isn't, you can create a custom software spec that adds a package_extension add-on to it. Use the template selector to see some examples, but when you are ready add your packages in the code editor after the "pip:" section.
{yaml_template}
4. Give your package extension and software spec names and descriptions and submit.
5. You will find it in the Function Upload table afterward.
---
{package_meta}
---
Result:
```json
{json.dumps(ss_asset_response, indent=2)}
```
''')
return (packages_section,)
@app.cell
def _(client_section, mo):
ui_accordion_section_1 = mo.accordion(
{"Section 1: **watsonx.ai Credentials**": client_section}
)
ui_accordion_section_1
return
@app.cell
def _(function_section, mo):
ui_accordion_section_2 = mo.accordion(
{"Section 2: **Function Creation**": function_section}
)
ui_accordion_section_2
return
@app.cell
def _(mo, packages_section):
ui_accordion_section_3 = mo.accordion(
{"Section 3: **Create a Package Extension (Optional)**": packages_section}
)
ui_accordion_section_3
return
@app.cell
def _(mo, upload_section):
ui_accordion_section_4 = mo.accordion(
{"Section 4: **Function Upload**": upload_section}
)
ui_accordion_section_4
return
@app.cell
def _(deployment_section, mo):
ui_accordion_section_5 = mo.accordion(
{"Section 5: **Function Deployment**": deployment_section}
)
ui_accordion_section_5
return
@app.cell
def _(mo, purging_section):
ui_accordion_section_6 = mo.accordion(
{"Section 6: **Helper Functions**": purging_section}
)
ui_accordion_section_6
return
@app.cell
def _(mo, template_variant):
# Template for WatsonX.ai deployable function
if template_variant.value == "Stream Files to IBM COS [Example]":
with open("stream_files_to_cos.py", "r") as file:
template = file.read()
else:
template = '''def your_function_name():
import subprocess
subprocess.check_output('pip install gensim', shell=True)
import gensim
def score(input_data):
message_from_input_payload = payload.get("input_data")[0].get("values")[0][0]
response_message = "Received message - {0}".format(message_from_input_payload)
# Score using the pre-defined model
score_response = {
'predictions': [{'fields': ['Response_message_field', 'installed_lib_version'],
'values': [[response_message, gensim.__version__]]
}]
}
return score_response
return score
score = your_function_name()
'''
function_editor = (
mo.md('''
#### **Create your function by editing the template:**
{editor}
''')
.batch(
editor = mo.ui.code_editor(value=template, language="python", min_height=200, theme="dark")
)
.form(show_clear_button=True, bordered=False)
)
# function_editor
return (function_editor,)
@app.cell
def _(ast, function_editor, mo, os):
function_name = None
if function_editor.value:
# Get the edited code from the function editor
code = function_editor.value['editor']
# Extract function name using AST without executing the code
try:
# Parse the code to find function definitions
parsed_ast = ast.parse(code)
for node in parsed_ast.body:
if isinstance(node, ast.FunctionDef):
function_name = node.name
break
if function_name is not None:
# Set deployable_function to None since we're not executing the code
deployable_function = None
mo.md(f"Found function: '{function_name}' (using file path approach)")
# Create the directory if it doesn't exist
save_dir = "/tmp/notebook_functions"
os.makedirs(save_dir, exist_ok=True)
# Save the function code to a file
file_path = os.path.join(save_dir, f"{function_name}.py")
with open(file_path, "w") as f:
f.write(code)
else:
mo.md("No function found in the editor code")
except SyntaxError as e:
mo.md(f"Syntax error in function code: {str(e)}")
return (function_name,)
@app.cell
def _():
yaml_templates = {
"empty": """dependencies:
- pip
- pip:
- <library_name>
- ...
""",
"llama_index_and_scikit": """dependencies:
- pip
- pip:
- scikit-learn==1.6.1
- scikit-llm==1.4.1
- plotly==6.0.1
- altair==5.5.0
- PyMuPDF==1.25.1
- llama-index==0.12.24
- llama-index-readers-file==0.4.6
- llama-index-readers-web==0.3.8
""",
"data_product_hub": """dependencies:
- pip
- pip:
- PyMuPDF==1.25.1
- llama-index==0.12.24
- llama-index-readers-file==0.4.6
- llama-index-readers-web==0.3.8
- plotly==6.0.1
- altair==5.5.0
- dask==2025.2.0
- dph-services==0.4.0
""",
"watsonx_data": """dependencies:
- pip
- pip:
- prestodb
- PyMuPDF==1.25.1
- llama-index==0.12.24
- llama-index-readers-file==0.4.6
- llama-index-readers-web==0.3.8
- ibm-watsonxdata==0.4.0
"""
}
return (yaml_templates,)
@app.cell
def _(mo, yaml_templates):
yaml_template = mo.ui.dropdown(yaml_templates, searchable=True, label="**Select a template:**", value="empty")
return (yaml_template,)
@app.cell
def _(tempfile):
def create_yaml_tempfile(yaml_editor_value):
"""Creates temporary YAML file and returns its path"""
temp_file = tempfile.NamedTemporaryFile(suffix='.yaml', delete=False)
with open(temp_file.name, 'w') as f:
f.write(str(yaml_editor_value))
return temp_file.name
return (create_yaml_tempfile,)
@app.cell
def _(mo, yaml_template):
pkg_types = {"Conda Yaml":"conda_yml","Custom user library":"custom_library"}
package_meta = (
mo.md('''**Create your Conda YAML by editing the template:**
{yml_editor}
{package_name}
{package_description}
{software_spec_name}
{software_spec_description}
''')
.batch(
yml_editor = mo.ui.code_editor(value=yaml_template.value, language="yaml", min_height=100, theme="dark"),
package_name = mo.ui.text(placeholder="Python Package for...",
label="Package Extension Name:",
kind="text",
value="Custom Python Package"),
software_spec_name = mo.ui.text(placeholder="Software Spec Name",
label="Custom Software Spec Name:",
kind="text", value="Extended Python Function Software Spec"),
package_description = mo.ui.text_area(placeholder="Write a description for your package.",
label="Package Description:",
value=" "),
software_spec_description = mo.ui.text_area(placeholder="Write a description for your software spec.",
label="Software Spec Description:",
value=" "),
package_type = mo.ui.dropdown(pkg_types,
label="Select your package type:",
value="Conda Yaml")
)
.form(show_clear_button=True, bordered=False)
)
return (package_meta,)
@app.cell
def _(mo):
check_packages =(mo.md('''
**Check if a package you want to use is in the base software_specification already:**
{package_list}
{return_full_list}
''')
.batch(
package_list = mo.ui.text_area(placeholder="Add packages as a comma separated list (with or without versions)."),
return_full_list = mo.ui.checkbox(value=False, label="Return a full list of packages in the base software specification.")
)
.form(show_clear_button=True, bordered=False)
)
return (check_packages,)
@app.cell
def _(check_packages):
if check_packages.value is not None:
packages = check_packages.value['package_list']
verification_list = [item.strip() for item in packages.split(',')]
full_list_return = check_packages.value['return_full_list']
else:
packages = None
verification_list = None
full_list_return = None
return full_list_return, verification_list
@app.cell
def _(
analyze_software_spec,
base_software_spec,
full_list_return,
verification_list,
visualize_software_spec,
):
if verification_list is not None:
pkg_analysis = analyze_software_spec(base_software_spec, verification_list, return_full_sw_package_list=full_list_return)
package_df = visualize_software_spec(pkg_analysis, verification_list)
else:
pkg_analysis = None
package_df = None
return (package_df,)
@app.cell
def _(check_packages, mo, package_df):
package_analysis_stack = mo.vstack([check_packages, package_df], justify="center")
return (package_analysis_stack,)
@app.cell
def _(client):
if client is not None:
base_sw_spec_id = "45f12dfe-aa78-5b8d-9f38-0ee223c47309"
base_software_spec = client.software_specifications.get_details(base_sw_spec_id)
else:
base_sw_spec_id = None
base_software_spec = None
return base_software_spec, base_sw_spec_id
@app.cell
def _(client, create_yaml_tempfile, package_meta, uuid):
if package_meta.value is not None and client is not None:
pack_suffix = str(uuid.uuid4())[:4]
pack_name = package_meta.value['package_name']
pe_metadata = {
client.package_extensions.ConfigurationMetaNames.NAME: f"{pack_name}_{pack_suffix}",
client.package_extensions.ConfigurationMetaNames.TYPE: package_meta.value['package_type'],
client.software_specifications.ConfigurationMetaNames.DESCRIPTION:package_meta.value['package_description']
}
yaml_file_path = create_yaml_tempfile(package_meta.value['yml_editor'])
else:
pe_metadata = {
}
yaml_file_path = None
return pe_metadata, yaml_file_path
@app.cell
def _(client, pe_metadata, yaml_file_path):
if yaml_file_path is not None:
### Stores the package extension
pe_asset_details = client.package_extensions.store(
meta_props=pe_metadata,
file_path=yaml_file_path
)
package_id = pe_asset_details["metadata"]["asset_id"]
else:
pe_asset_details = None
package_id = None
return (package_id,)
@app.cell
def _():
### Helper function for checking if a python library is in the standard software spec.
def analyze_software_spec(sw_spec_response, required_libraries, return_full_sw_package_list=False):
"""
Analyzes a software specification against a list of required libraries.
Args:
sw_spec_response (dict): The software specification response from the API
required_libraries (list): List of required libraries in format ["package_name", "package_name==version", etc.]
return_full_sw_package_list (bool): Whether to return the complete package list from sw_spec
Returns:
dict: A dictionary with analysis results containing:
- not_present: Dict of libraries not found in the software spec
- version_mismatch: Dict of libraries with version mismatches, with [sw_version, required_version] as values
- present: Dict of libraries that are present with matching versions
- sw_packages: (Optional) Complete dict of all packages in the software spec
"""
result = {
"present": {},
"not_present": {},
"version_mismatch": {}
}
# Extract all packages from the software specification
sw_packages = {}
try:
# Extract packages from included_packages in the software specification
included_packages = sw_spec_response["entity"]["software_specification"]["software_configuration"]["included_packages"]
# Create a dictionary of all packages in the software specification
for package in included_packages:
package_name = package["name"]
package_version = package["version"]
sw_packages[package_name] = package_version
except KeyError as e:
raise ValueError(f"Invalid software specification format: {e}")
# Parse required libraries
for lib in required_libraries:
if "==" in lib:
lib_name, lib_version = lib.split("==", 1)
lib_name = lib_name.strip()
lib_version = lib_version.strip()
else:
lib_name = lib.strip()
lib_version = None
# Check if library is present in software specification
if lib_name not in sw_packages:
result["not_present"][lib_name] = None
elif lib_version is not None and lib_version != sw_packages[lib_name]:
# Check version mismatch
result["version_mismatch"][lib_name] = [sw_packages[lib_name], lib_version]
else:
# Library is present with matching version (or no specific version required)
result["present"][lib_name] = sw_packages[lib_name]
if return_full_sw_package_list:
# Extract just the library names from required_libraries
req_libs_names = [lib.split("==")[0].strip() if "==" in lib else lib.strip() for lib in required_libraries]
def sort_key(pkg_name):
if pkg_name in result["not_present"]:
return (0, pkg_name) # Missing packages first
elif pkg_name in result["version_mismatch"]:
return (1, pkg_name) # Version mismatch second
elif pkg_name in req_libs_names:
return (2, pkg_name) # Required packages that match third
else:
return (3, pkg_name) # All other packages last
# Sort sw_packages using the custom sorting key
result["sw_packages"] = {k: sw_packages[k] for k in sorted(sw_packages.keys(), key=sort_key)}
# Add missing packages to the top of sw_packages
for pkg in result["not_present"]:
result["sw_packages"] = {pkg: None, **result["sw_packages"]}
return result
def visualize_software_spec(analysis_result, required_libraries=None):
"""
Visualizes the results of analyze_software_spec in a DataFrame with status indicators.
For use in Marimo notebooks.
Args:
analysis_result (dict): The result from analyze_software_spec function
required_libraries (list, optional): The original list of required libraries
used in analyze_software_spec
Returns:
pandas.DataFrame: A DataFrame showing the analysis results with status indicators
"""
import pandas as pd
# Parse required libraries to get the exact names for lookup
req_libs_parsed = {}
if required_libraries:
for lib in required_libraries:
if "==" in lib:
lib_name, lib_version = lib.split("==", 1)
lib_name = lib_name.strip()
lib_version = lib_version.strip()
req_libs_parsed[lib_name] = lib_version
else:
lib_name = lib.strip()
req_libs_parsed[lib_name] = None
# Determine if we have the full sw_packages list
has_full_list = "sw_packages" in analysis_result
# Create a DataFrame based on available data
if has_full_list:
# Use the full package list
packages = analysis_result["sw_packages"]
# Prepare data rows
rows = []
for package, version in packages.items():
if package in analysis_result.get("not_present", {}):
status = "❌ Missing"
priority = 0 # Top priority
elif package in analysis_result.get("version_mismatch", {}):
status = "⚠️ Version Mismatch"
priority = 1 # Second priority
elif package in req_libs_parsed:
status = "✅ Present"
priority = 2 # Third priority
else:
status = "Other"
priority = 3 # Lowest priority
rows.append({
"Package": package,
"Version": version if version is not None else "Not Present",
"Status": status,
"_priority": priority # Temporary field for sorting
})
df = pd.DataFrame(rows)
# Sort by priority and then package name
df = df.sort_values(by=["_priority", "Package"]).drop("_priority", axis=1).reset_index(drop=True)
else:
# Only use the packages mentioned in required_libraries
packages = set(list(analysis_result.get("not_present", {}).keys()) +
list(analysis_result.get("version_mismatch", {}).keys()) +
list(analysis_result.get("present", {}).keys()))
# Create dataframe rows
rows = []
for package in packages:
if package in analysis_result.get("not_present", {}):
version = "Not Present"
status = "❌ Missing"
priority = 0 # Top priority
elif package in analysis_result.get("version_mismatch", {}):
version = analysis_result["version_mismatch"][package][0] # sw_spec version
status = "⚠️ Version Mismatch"
priority = 1 # Second priority
else:
version = analysis_result["present"][package]
status = "✅ Present"
priority = 2 # Third priority
rows.append({
"Package": package,
"Version": version,
"Status": status,
"_priority": priority # Temporary field for sorting
})
df = pd.DataFrame(rows)
# Sort by priority and then package name
df = df.sort_values(by=["_priority", "Package"]).drop("_priority", axis=1).reset_index(drop=True)
return df
return analyze_software_spec, visualize_software_spec
@app.cell
def _(base_sw_spec_id, client, package_id, package_meta, uuid):
if package_id is not None:
### Creates a custom software specification based on the standard python function spec_id - "45f12dfe-aa78-5b8d-9f38-0ee223c47309"
ss_suffix = str(uuid.uuid4())[:4]
ss_name = package_meta.value['software_spec_name']
ss_metadata = {
client.software_specifications.ConfigurationMetaNames.NAME: f"{ss_name}_{ss_suffix}",
client.software_specifications.ConfigurationMetaNames.DESCRIPTION: package_meta.value['software_spec_description'],
client.software_specifications.ConfigurationMetaNames.BASE_SOFTWARE_SPECIFICATION: {'guid': base_sw_spec_id},
client.software_specifications.ConfigurationMetaNames.PACKAGE_EXTENSIONS: [{'guid': package_id}]
}
ss_asset_details = client.software_specifications.store(meta_props=ss_metadata)
current_status = get_selection_table_status()
if ss_asset_details is not None:
new_status = 1 if current_status is None else current_status + 1
set_selection_table_status(new_status)
if ss_asset_details is not None and "metadata" in ss_asset_details:
ss_asset_response = ss_asset_details["metadata"]
else:
ss_asset_response = {"error": "Unsuccessful Upload"}
else:
ss_metadata = {}
ss_asset_details = None
ss_asset_response = None
# ss_asset_details
return (ss_asset_response,)
@app.cell
def _(mo):
get_selection_table_status, set_selection_table_status = mo.state(None)
return get_selection_table_status, set_selection_table_status
@app.cell
def _(mo, client):
if client:
# First, get all specs data once
specs_df = client.software_specifications.list()
elif get_selection_table_status() is not None and get_selection_table_status() > 0:
specs_df = client.software_specifications.list()
else:
specs_df = None
# ss_asset_details
return (specs_df,)
@app.cell
def _(client, mo, pd, specs_df):
if client:
# Filter the specs into two groups
base_specs = specs_df[
(specs_df['STATE'] == 'supported') &
(specs_df['NAME'].isin(['runtime-24.1-py3.11', 'runtime-24.1-py3.11-cuda']))
]
derived_specs = specs_df[
(specs_df['TYPE'] == 'derived')
]
# Concatenate with base specs first, then derived specs
supported_specs = pd.concat([base_specs, derived_specs]).reset_index(drop=True)
# Create a mapping dictionary for framework names based on software specifications
framework_mapping = {
"tensorflow_rt24.1-py3.11": "TensorFlow",
"pytorch-onnx_rt24.1-py3.11": "PyTorch",
"onnxruntime_opset_19": "ONNX or ONNXRuntime",
"runtime-24.1-py3.11": "AI Services/Python Functions/Python Scripts",
"autoai-ts_rt24.1-py3.11": "AutoAI",
"autoai-kb_rt24.1-py3.11": "AutoAI",
"runtime-24.1-py3.11-cuda": "CUDA-enabled (GPU) Python Runtime",
"runtime-24.1-r4.3": "R Runtime 4.3",
"spark-mllib_3.4": "Apache Spark 3.4",
"autoai-rag_rt24.1-py3.11": "AutoAI RAG"
}
# Define the preferred order for items to appear at the top
preferred_order = [
"runtime-24.1-py3.11",
"runtime-24.1-py3.11-cuda",
"runtime-24.1-r4.3",
"ai-service-v5-software-specification",
"autoai-rag_rt24.1-py3.11",
"autoai-ts_rt24.1-py3.11",
"autoai-kb_rt24.1-py3.11",
"tensorflow_rt24.1-py3.11",
"pytorch-onnx_rt24.1-py3.11",
"onnxruntime_opset_19",
"spark-mllib_3.4",
]
# Create a new column for sorting
supported_specs['SORT_ORDER'] = supported_specs['NAME'].apply(
lambda x: preferred_order.index(x) if x in preferred_order else len(preferred_order)
)
# Sort the DataFrame by the new column
supported_specs = supported_specs.sort_values('SORT_ORDER').reset_index(drop=True)
# Drop the sorting column as it's no longer needed
supported_specs = supported_specs.drop(columns=['SORT_ORDER'])
# Drop the REPLACEMENT column if it exists and add NOTES column
if 'REPLACEMENT' in supported_specs.columns:
supported_specs = supported_specs.drop(columns=['REPLACEMENT'])
# Add NOTES column with framework information
supported_specs['NOTES'] = supported_specs['NAME'].map(framework_mapping).fillna("Other")
# Create a table with single-row selection
selection_table = mo.ui.table(
supported_specs,
selection="single", # Only allow selecting one row
label="#### **Select a supported software_spec runtime for your function asset** (For Python Functions select - *'runtime-24.1-py3.11'* ):",
initial_selection=[0], # Now selecting the first row, which should be runtime-24.1-py3.11
page_size=6
)
else:
sel_df = pd.DataFrame(
data=[["ID", "Activate client."]],
columns=["ID", "VALUE"]
)
selection_table = mo.ui.table(
sel_df,
selection="single", # Only allow selecting one row
label="You haven't activated the client",
initial_selection=[0]
)
return (selection_table,)
@app.cell
def _(mo):
get_selected_sw_spec, set_selected_sw_spec = mo.state(None)
return get_selected_sw_spec, set_selected_sw_spec
@app.cell
def _(selection_table, set_selected_sw_spec):
if selection_table.value is not None:
set_selected_sw_spec(selection_table.value['ID'].iloc[0])
return
@app.cell
def _(mo):
get_selected_hw_spec, set_selected_hw_spec = mo.state(None)
return get_selected_hw_spec, set_selected_hw_spec
@app.cell
def _(hw_selection_table, set_selected_hw_spec):
if hw_selection_table.value is not None:
set_selected_hw_spec(hw_selection_table.value['ID'].iloc[0])
return
@app.cell
def _(mo):
input_schema_checkbox = mo.ui.checkbox(label="Add input schema (optional)")
output_schema_checkbox = mo.ui.checkbox(label="Add output schema (optional)")
# sample_input_checkbox = mo.ui.checkbox(label="Add sample input example (optional)")
return input_schema_checkbox, output_schema_checkbox
@app.cell
def _(
function_name,
get_selected_sw_spec,
input_schema_checkbox,
mo,
output_schema_checkbox,
selection_table,
template_variant,
):
if selection_table.value is not None:
# Create the input fields
if function_name is not None:
fnc_nm = function_name
else:
if template_variant.value == "Stream Files to IBM COS [Example]":
fnc_nm = "stream_file_to_cos"
else:
fnc_nm = "your_function_name"
uploaded_function_name = mo.ui.text(placeholder="<Must be the same as the name in editor>", label="Function Name:", kind="text", value=f"{fnc_nm}", full_width=False)
tags_editor = mo.ui.array(
[mo.ui.text(placeholder="Metadata Tags..."), mo.ui.text(), mo.ui.text()],
label="Optional Metadata Tags"
)
software_spec = get_selected_sw_spec()
description_input = mo.ui.text_area(
placeholder="Write a description for your function...)",
label="Description",
max_length=256,
rows=5,
full_width=True
)
func_metadata=mo.hstack([
description_input,
mo.hstack([
uploaded_function_name,
tags_editor,
], justify="start", gap=1, align="start", wrap=True)
],
widths=[0.6,0.4],
gap=2.75
)
schema_metadata=mo.hstack([
input_schema_checkbox,
output_schema_checkbox,
# sample_input_checkbox
],
justify="center", gap=1, align="center", wrap=True
)
fm = mo.vstack([
func_metadata,
],
align="center",
gap=2
)
sc_m = mo.vstack([
schema_metadata,
mo.md("**Make sure to select the checkbox options before filling in descriptions and tags or they will reset.**")
],
align="center",
gap=2
)
return (
description_input,
fm,
sc_m,
software_spec,
tags_editor,
uploaded_function_name,
)
@app.cell
def _(json, mo, template_variant):
if template_variant.value == "Stream Files to IBM COS [Example]":
from cos_stream_schema_examples import input_schema, output_schema
else:
input_schema = [
{
'id': '1',
'type': 'struct',
'fields': [
{
'name': '<variable name 1>',
'type': 'string',
'nullable': False,
'metadata': {}
},
{
'name': '<variable name 2>',
'type': 'string',
'nullable': False,
'metadata': {}
}
]
}
]
output_schema = [
{
'id': '1',
'type': 'struct',
'fields': [
{
'name': '<output return name>',
'type': 'string',
'nullable': False,
'metadata': {}
}
]
}
]
input_schema_editor = mo.ui.code_editor(value=json.dumps(input_schema, indent=4), language="python", min_height=100,theme="dark")
output_schema_editor = mo.ui.code_editor(value=json.dumps(output_schema, indent=4), language="python", min_height=100,theme="dark")
schema_editors = mo.accordion(
{
"""**Input Schema Metadata Editor**""": input_schema_editor,
"""**Output Schema Metadata Editor**""": output_schema_editor,
}, multiple=True
)
# schema_editors
return input_schema_editor, output_schema_editor, schema_editors
@app.cell
def _(
ast,
client,
description_input,
function_editor,
importlib,
input_schema_checkbox,
input_schema_editor,
json,
mo,
os,
output_schema_checkbox,
output_schema_editor,
software_spec,
sys,
tags_editor,
uploaded_function_name,
):
get_upload_status, set_upload_status = mo.state("No uploads yet")
function_meta = {}
if software_spec and client is not None:
# Start with the base required fields
function_meta = {
client.repository.FunctionMetaNames.NAME: f"{uploaded_function_name.value}" or "your_function_name",
client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec or "45f12dfe-aa78-5b8d-9f38-0ee223c47309"
}
# Add optional fields if they exist
if tags_editor.value:
# Filter out empty strings from the tags list
filtered_tags = [tag for tag in tags_editor.value if tag and tag.strip()]
if filtered_tags: # Only add if there are non-empty tags
function_meta[client.repository.FunctionMetaNames.TAGS] = filtered_tags
if description_input.value:
function_meta[client.repository.FunctionMetaNames.DESCRIPTION] = description_input.value
# Add input schema if checkbox is checked
if input_schema_checkbox.value:
try:
function_meta[client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = json.loads(input_schema_editor.value)
except json.JSONDecodeError:
# If JSON parsing fails, try Python literal evaluation as fallback
function_meta[client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = ast.literal_eval(input_schema_editor.value)
# Add output schema if checkbox is checked
if output_schema_checkbox.value:
try:
function_meta[client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = json.loads(output_schema_editor.value)
except json.JSONDecodeError:
# If JSON parsing fails, try Python literal evaluation as fallback
function_meta[client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = ast.literal_eval(output_schema_editor.value)
def upload_function(function_meta, use_function_object=False):
"""
Uploads a Python function to watsonx.ai as a deployable asset.
Parameters:
function_meta (dict): Metadata for the function
use_function_object (bool): Whether to use function object (True) or file path (False)
Returns:
dict: Details of the uploaded function
"""
# Store the original working directory
original_dir = os.getcwd()
try:
# Create temp file from the code in the editor
code_to_deploy = function_editor.value['editor']
# This function is defined elsewhere in the notebook
func_name = uploaded_function_name.value or "your_function_name"
# Ensure function_meta has the correct function name
function_meta[client.repository.FunctionMetaNames.NAME] = func_name
# Save the file locally first
save_dir = "/tmp/notebook_functions"
os.makedirs(save_dir, exist_ok=True)
file_path = f"{save_dir}/{func_name}.py"
with open(file_path, "w", encoding="utf-8") as f:
f.write(code_to_deploy)
if use_function_object:
# Import the function from the file
# Add the directory to Python's path
sys.path.append(save_dir)
# Import the module
spec = importlib.util.spec_from_file_location(func_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Get the function object
function_object = getattr(module, func_name)
# Change to /tmp directory before calling IBM Watson SDK functions
os.chdir('/tmp/notebook_functions')
# Upload the function object
mo.md(f"Uploading function object: {func_name}")
func_details = client.repository.store_function(function_object, function_meta)
else:
# Change to /tmp directory before calling IBM Watson SDK functions
os.chdir('/tmp/notebook_functions')
# Create a zip file of the Python module
import gzip
import shutil
# Path for the gzipped file
gz_path = f"{save_dir}/{func_name}.py.gz"
# Create gzip file
with open(file_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Upload using the gzipped file path
mo.md(f"Uploading function from gzip: {gz_path}")
func_details = client.repository.store_function(gz_path, function_meta)
set_upload_status(f"Latest Upload - id - {func_details['metadata']['id']}")
return func_details
except Exception as e:
set_upload_status(f"Error uploading function: {str(e)}")
mo.md(f"Detailed error: {str(e)}")
raise
finally:
# Always change back to the original directory, even if an exception occurs
os.chdir(original_dir)
upload_status = mo.state("No uploads yet")
upload_button = mo.ui.button(
label="Upload Function",
on_click=lambda _: upload_function(function_meta, use_function_object=False),
kind="success",
tooltip="Click to upload function to watsonx.ai"
)
# function_meta
return get_upload_status, upload_button
@app.cell
def _(get_upload_status, mo, upload_button):
# Upload your function
if upload_button.value:
try:
upload_result = upload_button.value
artifact_id = upload_result['metadata']['id']
except Exception as e:
mo.md(f"Error: {str(e)}")
upload_func = mo.vstack([
upload_button,
mo.md(f"**Status:** {get_upload_status()}")
], justify="space-around", align="center")
return artifact_id, upload_func
@app.cell
def _(client, mo, pd, upload_button, uuid):
def reorder_hardware_specifications(df):
"""
Reorders a hardware specifications dataframe by type and size of environment
without hardcoding specific hardware types.
Parameters:
df (pandas.DataFrame): The hardware specifications dataframe to reorder
Returns:
pandas.DataFrame: Reordered dataframe with reset index
"""
# Create a copy to avoid modifying the original dataframe
result_df = df.copy()
# Define a function to extract the base type and size
def get_sort_key(name):
# Create a custom ordering list
custom_order = [
"XXS", "XS", "S", "M", "L", "XL",
"XS-Spark", "S-Spark", "M-Spark", "L-Spark", "XL-Spark",
"K80", "K80x2", "K80x4",
"V100", "V100x2",
"WXaaS-XS", "WXaaS-S", "WXaaS-M", "WXaaS-L", "WXaaS-XL",
"Default Spark", "Notebook Default Spark", "ML"
]
# If name is in the custom order list, use its index
if name in custom_order:
return (0, custom_order.index(name))
# For any name not in the custom order, put it at the end
return (1, name)
# Add a temporary column for sorting
result_df['sort_key'] = result_df['NAME'].apply(get_sort_key)
# Sort the dataframe and drop the temporary column
result_df = result_df.sort_values('sort_key').drop('sort_key', axis=1)
# Reset the index
result_df = result_df.reset_index(drop=True)
return result_df
if client and upload_button.value:
hardware_specs = client.hardware_specifications.list()
hardware_specs_df = reorder_hardware_specifications(hardware_specs)
# Create a table with single-row selection
hw_selection_table = mo.ui.table(
hardware_specs_df,
selection="single", # Only allow selecting one row
label="#### **Select a supported hardware_specification for your deployment** *(Default: 'XS' - 1vCPU_4GB Ram)*",
initial_selection=[1],
page_size=6,
wrapped_columns=['DESCRIPTION']
)
deployment_type = mo.ui.radio(
options={"Function":"Online (Function Endpoint)","Runnable Job":"Batch (Runnable Jobs)"}, value="Function", label="Select the Type of Deployment:", inline=True
)
uuid_suffix = str(uuid.uuid4())[:4]
deployment_name = mo.ui.text(value=f"deployed_func_{uuid_suffix}", label="Deployment Name:", placeholder="<Must be completely unique>")
else:
hw_df = pd.DataFrame(
data=[["ID", "Activate client."]],
columns=["ID", "VALUE"]
)
hw_selection_table = mo.ui.table(
hw_df,
selection="single", # Only allow selecting one row
label="You haven't activated the client",
initial_selection=[0]
)
return deployment_name, deployment_type, hw_selection_table
@app.cell
def _(
artifact_id,
client,
deployment_details,
deployment_name,
deployment_type,
get_selected_hw_spec,
hw_selection_table,
mo,
upload_button,
):
def deploy_function(artifact_id, deployment_type):
"""
Deploys a function asset to watsonx.ai.
Parameters:
artifact_id (str): ID of the function artifact to deploy
deployment_type (object): Type of deployment (online or batch)
Returns:
dict: Details of the deployed function
"""
if not artifact_id:
print("Error: No artifact ID provided. Please upload a function first.")
return None
if deployment_type.value == "Online (Function Endpoint)": # Changed from "Online (Function Endpoint)"
deployment_props = {
client.deployments.ConfigurationMetaNames.NAME: deployment_name.value,
client.deployments.ConfigurationMetaNames.ONLINE: {},
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config},
client.deployments.ConfigurationMetaNames.SERVING_NAME: deployment_name.value,
}
else: # "Runnable Job" instead of "Batch (Runnable Jobs)"
deployment_props = {
client.deployments.ConfigurationMetaNames.NAME: deployment_name.value,
client.deployments.ConfigurationMetaNames.BATCH: {},
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config},
# batch does not use serving names
}
try:
print(deployment_props)
# First, get the asset details to confirm it exists
asset_details = client.repository.get_details(artifact_id)
print(f"Asset found: {asset_details['metadata']['name']} with ID: {asset_details['metadata']['id']}")
# Create the deployment
deployed_function = client.deployments.create(artifact_id, deployment_props)
print(f"Creating deployment from Asset: {artifact_id} with deployment properties {str(deployment_props)}")
return deployed_function
except Exception as e:
print(f"Deployment error: {str(e)}")
return None
def get_deployment_id(deployed_function):
deployment_id = client.deployments.get_uid(deployment_details)
return deployment_id
def get_deployment_info(deployment_id):
deployment_info = client.deployments.get_details(deployment_id)
return deployment_info
deployment_status = mo.state("No deployments yet")
if hw_selection_table.value is not None:
selected_hw_config = get_selected_hw_spec()
deploy_button = mo.ui.button(
label="Deploy Function",
on_click=lambda _: deploy_function(artifact_id, deployment_type),
kind="success",
tooltip="Click to deploy function to watsonx.ai"
)
if client and upload_button.value:
deployment_definition = mo.hstack([
deployment_type,
deployment_name
], justify="space-around")
else:
deployment_definition = mo.hstack([
"No Deployment Type Selected",
"No Deployment Name Provided"
], justify="space-around")
# deployment_definition
return deploy_button, deployment_definition
@app.cell
def _(deploy_button, deployment_definition, mo):
_ = deployment_definition
deploy_fnc = mo.vstack([
deploy_button,
deploy_button.value
], justify="space-around", align="center")
return (deploy_fnc,)
@app.cell
def _(client, mo, pd, sys):
### Functions to List , Get ID's as a list and Purge of Assets
def get_deployment_list():
dep_df = client.deployments.list()
dep_df = pd.DataFrame(dep_df)
columns_to_drop = [col for col in dep_df.columns if 'STATE' in col or 'REPLACEMENT' in col]
if columns_to_drop:
dep_df = dep_df.drop(columns=columns_to_drop)
return dep_df
def get_deployment_ids(df):
dep_list = df['ID'].tolist()
return dep_list
#----
def get_data_assets_list():
data_a_df = client.data_assets.list()
data_a_df = pd.DataFrame(data_a_df)
return data_a_df
def get_data_asset_ids(df):
data_asset_list = df['ASSET_ID'].tolist()
return data_asset_list
#----
def get_repository_list():
rep_list_df = client.repository.list()
rep_list_df = pd.DataFrame(rep_list_df)
columns_to_drop = [col for col in ['SPEC_STATE', 'SPEC_REPLACEMENT'] if col in rep_list_df.columns]
if columns_to_drop:
rep_list_df = rep_list_df.drop(columns=columns_to_drop)
return rep_list_df
def get_repository_ids(df):
repository_list = df['ID'].tolist()
return repository_list
#----
def get_pkg_ext_list():
pkg_ext_list_df = client.package_extensions.list()
pkg_ext_list_df = pd.DataFrame(pkg_ext_list_df)
return pkg_ext_list_df
def get_pkg_ext_ids(df):
pkg_ext_id_list = df['ASSET_ID'].tolist()
return pkg_ext_id_list
#----
def get_sws_list():
sws_list_df = client.software_specifications.list()
# Filter to only include derived types
derived_sws_list_df = sws_list_df[
(sws_list_df['TYPE'] == 'derived')
]
# Reset the index and prepare final dataframe
sws_list_df = pd.DataFrame(derived_sws_list_df).reset_index(drop=True)
# Drop STATE and REPLACEMENT columns if they exist
columns_to_drop = [col for col in ['STATE', 'REPLACEMENT'] if col in sws_list_df.columns]
if columns_to_drop:
sws_list_df = sws_list_df.drop(columns=columns_to_drop)
return sws_list_df
def get_sws_ids(df):
sws_id_list = df['ID'].tolist()
return sws_id_list
#----
def delete_with_progress(ids_list, delete_function, item_type="items", display_errors=True):
errors = []
with mo.status.progress_bar(
total=len(ids_list) or 1,
title=f"Purging {item_type}",
subtitle=f"Deleting {item_type}...",
completion_title="Purge Complete",
completion_subtitle=f"Successfully deleted {len(ids_list) - len(errors)} {item_type}",
remove_on_exit=True
) as progress:
for item_id in ids_list:
try:
delete_function(item_id)
except Exception as e:
error_msg = f"Error deleting {item_type} with ID {item_id}: {str(e)}"
if display_errors:
print(error_msg)
errors.append((item_id, str(e)))
finally:
progress.update(increment=1)
if errors and display_errors:
with mo.redirect_stderr():
sys.stderr.write("\nErrors encountered during deletion:\n")
for item_id, error in errors:
sys.stderr.write(f" - ID {item_id}: {error}\n")
return f"Deleted {len(ids_list) - len(errors)} {item_type} successfully"
# Use with existing deletion functions
def delete_deployments(deployment_ids):
return delete_with_progress(
deployment_ids,
lambda id: client.deployments.delete(id),
"deployments"
)
def delete_data_assets(data_asset_ids):
return delete_with_progress(
data_asset_ids,
lambda id: client.data_assets.delete(id),
"data assets"
)
def delete_repository_items(repository_ids):
return delete_with_progress(
repository_ids,
lambda id: client.repository.delete(id),
"repository items"
)
def delete_pkg_ext_items(pkg_ids):
return delete_with_progress(
pkg_ids,
lambda id: client.package_extensions.delete(id),
"package extensions"
)
def delete_sws_items(sws_ids):
return delete_with_progress(
sws_ids,
lambda id: client.software_specifications.delete(id),
"software specifications"
)
return (
delete_data_assets,
delete_deployments,
delete_pkg_ext_items,
delete_repository_items,
delete_sws_items,
get_data_asset_ids,
get_data_assets_list,
get_deployment_ids,
get_deployment_list,
get_pkg_ext_ids,
get_pkg_ext_list,
get_repository_ids,
get_repository_list,
get_sws_ids,
get_sws_list,
)
@app.cell
def _(
get_data_assets_tab,
get_deployments_tab,
get_pkg_ext_tab,
get_repository_tab,
get_sws_tab,
mo,
):
if get_deployments_tab() is not None:
deployments_table = mo.ui.table(get_deployments_tab())
else:
deployments_table = mo.md("No Table Loaded")
if get_repository_tab() is not None:
repository_table = mo.ui.table(get_repository_tab())
else:
repository_table = mo.md("No Table Loaded")
if get_data_assets_tab() is not None:
data_assets_table = mo.ui.table(get_data_assets_tab())
else:
data_assets_table = mo.md("No Table Loaded")
if get_sws_tab() is not None:
sws_table = mo.ui.table(get_sws_tab())
else:
sws_table = mo.md("No Table Loaded")
if get_pkg_ext_tab() is not None:
pkg_ext_table = mo.ui.table(get_pkg_ext_tab())
else:
pkg_ext_table = mo.md("No Table Loaded")
return (
data_assets_table,
deployments_table,
pkg_ext_table,
repository_table,
sws_table,
)
@app.cell
def _(
deployments_table,
get_deployment_id_list,
get_deployments_button,
mo,
purge_deployments,
):
deployments_purge_stack = mo.hstack([get_deployments_button, get_deployment_id_list, purge_deployments])
deployments_purge_stack_results = mo.vstack([deployments_table, get_deployment_id_list.value, purge_deployments.value])
deployments_purge_tab = mo.vstack([deployments_purge_stack, deployments_purge_stack_results])
return (deployments_purge_tab,)
@app.cell
def _(
get_repository_button,
get_repository_id_list,
mo,
purge_repository,
repository_table,
):
repository_purge_stack = mo.hstack([get_repository_button, get_repository_id_list, purge_repository])
repository_purge_stack_results = mo.vstack([repository_table, get_repository_id_list.value, purge_repository.value])
repository_purge_tab = mo.vstack([repository_purge_stack, repository_purge_stack_results])
return (repository_purge_tab,)
@app.cell
def _(
data_assets_table,
get_data_asset_id_list,
get_data_assets_button,
mo,
purge_data_assets,
):
data_assets_purge_stack = mo.hstack([get_data_assets_button, get_data_asset_id_list, purge_data_assets])
data_assets_purge_stack_results = mo.vstack([data_assets_table, get_data_asset_id_list.value, purge_data_assets.value])
data_assets_purge_tab = mo.vstack([data_assets_purge_stack, data_assets_purge_stack_results])
return (data_assets_purge_tab,)
@app.cell
def _(get_sws_button, get_sws_id_list, mo, purge_sws, sws_table):
sws_purge_stack = mo.hstack([get_sws_button, get_sws_id_list, purge_sws])
sws_purge_stack_results = mo.vstack([sws_table, get_sws_id_list.value, purge_sws.value])
sws_purge_stack_tab = mo.vstack([sws_purge_stack, sws_purge_stack_results])
return (sws_purge_stack_tab,)
@app.cell
def _(
get_pkg_ext_button,
get_pkg_ext_id_list,
mo,
pkg_ext_table,
purge_pkg_ext,
):
pkg_ext_purge_stack = mo.hstack([get_pkg_ext_button, get_pkg_ext_id_list, purge_pkg_ext])
pkg_ext_purge_stack_results = mo.vstack([pkg_ext_table, get_pkg_ext_id_list.value, purge_pkg_ext.value])
pkg_ext_purge_tab = mo.vstack([pkg_ext_purge_stack, pkg_ext_purge_stack_results])
return (pkg_ext_purge_tab,)
@app.cell
def _(
data_assets_purge_tab,
deployments_purge_tab,
mo,
pkg_ext_purge_tab,
repository_purge_tab,
sws_purge_stack_tab,
):
purge_tabs = mo.ui.tabs(
{
"Purge Deployments": deployments_purge_tab,
"Purge Repository Assets": repository_purge_tab,
"Purge Data Assets": data_assets_purge_tab,
"Purge Software Specifications": sws_purge_stack_tab,
"Purge Package Extensions": pkg_ext_purge_tab,
}
, lazy=False
)
return (purge_tabs,)
@app.cell
def _(mo):
get_deployments_tab, set_deployments_tab = mo.state(None)
return get_deployments_tab, set_deployments_tab
@app.cell
def _(mo):
get_repository_tab, set_repository_tab = mo.state(None)
return get_repository_tab, set_repository_tab
@app.cell
def _(mo):
get_data_assets_tab, set_data_assets_tab = mo.state(None)
return get_data_assets_tab, set_data_assets_tab
@app.cell
def _(mo):
get_pkg_ext_tab, set_pkg_ext_tab = mo.state(None)
return get_pkg_ext_tab, set_pkg_ext_tab
@app.cell
def _(mo):
get_sws_tab, set_sws_tab = mo.state(None)
return get_sws_tab, set_sws_tab
@app.cell
def _(
data_assets_table,
delete_data_assets,
delete_deployments,
delete_pkg_ext_items,
delete_repository_items,
delete_sws_items,
deployments_table,
get_data_asset_ids,
get_data_assets_list,
get_deployment_ids,
get_deployment_list,
get_pkg_ext_ids,
get_pkg_ext_list,
get_repository_ids,
get_repository_list,
get_sws_ids,
get_sws_list,
mo,
pkg_ext_table,
repository_table,
set_data_assets_tab,
set_deployments_tab,
set_pkg_ext_tab,
set_repository_tab,
set_sws_tab,
sws_table,
):
### Temporary Function Purge - Assets
get_data_assets_button = mo.ui.button(
label="Get Data Assets Dataframe",
on_click=lambda _: get_data_assets_list(),
on_change=lambda value: set_data_assets_tab(value),
kind="neutral",
)
get_data_asset_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_data_asset_ids(data_assets_table.value),
kind="neutral",
)
purge_data_assets = mo.ui.button(
label="Purge Data Assets",
on_click=lambda _: delete_data_assets(get_data_asset_id_list.value),
kind="danger",
)
### Temporary Function Purge - Deployments
get_deployments_button = mo.ui.button(
label="Get Deployments Dataframe",
on_click=lambda _: get_deployment_list(),
on_change=lambda value: set_deployments_tab(value),
kind="neutral",
)
get_deployment_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_deployment_ids(deployments_table.value),
kind="neutral",
)
purge_deployments = mo.ui.button(
label="Purge Deployments",
on_click=lambda _: delete_deployments(get_deployment_id_list.value),
kind="danger",
)
### Repository Items Purge
get_repository_button = mo.ui.button(
label="Get Repository Dataframe",
on_click=lambda _: get_repository_list(),
on_change=lambda value: set_repository_tab(value),
kind="neutral",
)
get_repository_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_repository_ids(repository_table.value),
kind="neutral",
)
purge_repository = mo.ui.button(
label="Purge Repository Items",
on_click=lambda _: delete_repository_items(get_repository_id_list.value),
kind="danger",
)
### Software Spec Purge
get_sws_button = mo.ui.button(
label="Get Software Spec Dataframe",
on_click=lambda _: get_sws_list(),
on_change=lambda value: set_sws_tab(value),
kind="neutral",
)
get_sws_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_sws_ids(sws_table.value),
kind="neutral",
)
purge_sws = mo.ui.button(
label="Purge Software Specifications",
on_click=lambda _: delete_sws_items(get_sws_id_list.value),
kind="danger",
)
### Package Extensions Purge
get_pkg_ext_button = mo.ui.button(
label="Get Package Extensions Dataframe",
on_click=lambda _: get_pkg_ext_list(),
on_change=lambda value: set_pkg_ext_tab(value),
kind="neutral",
)
get_pkg_ext_id_list = mo.ui.button(
label="Turn Dataframe into List of IDs",
on_click=lambda _: get_pkg_ext_ids(pkg_ext_table.value),
kind="neutral",
)
purge_pkg_ext = mo.ui.button(
label="Purge Package Extensions",
on_click=lambda _: delete_pkg_ext_items(get_pkg_ext_id_list.value),
kind="danger",
)
return (
get_data_asset_id_list,
get_data_assets_button,
get_deployment_id_list,
get_deployments_button,
get_pkg_ext_button,
get_pkg_ext_id_list,
get_repository_button,
get_repository_id_list,
get_sws_button,
get_sws_id_list,
purge_data_assets,
purge_deployments,
purge_pkg_ext,
purge_repository,
purge_sws,
)
if __name__ == "__main__":
app.run()