import marimo __generated_with = "0.13.0" app = marimo.App(width="medium") @app.cell def _(): import marimo as mo import os return mo, os @app.function def get_markdown_content(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() return content @app.cell def _(mo): intro_text = get_markdown_content('intro_markdown/intro.md') intro_marimo = get_markdown_content('intro_markdown/intro_marimo.md') intro_notebook = get_markdown_content('intro_markdown/intro_notebook.md') intro_comparison = get_markdown_content('intro_markdown/intro_comparison.md') intro = mo.carousel([ mo.md(f"{intro_text}"), mo.md(f"{intro_marimo}"), mo.md(f"{intro_notebook}"), mo.md(f"{intro_comparison}"), ]) mo.accordion({"## Notebook Introduction":intro}) return @app.cell def _(os): ### Imports from typing import Any, Dict, List, Optional, Pattern, Set, Union, Tuple from ibm_watsonx_ai import APIClient, Credentials from pathlib import Path import pandas as pd import mimetypes import requests import zipfile import polars import urllib3 import tempfile import base64 import uuid import ssl import time import json import ast import io import re # Set explicit temporary directory os.environ['TMPDIR'] = '/tmp/notebook_functions' # Make sure Python's tempfile module also uses this directory tempfile.tempdir = '/tmp/notebook_functions' def get_iam_token(api_key): return requests.post( 'https://iam.cloud.ibm.com/identity/token', headers={'Content-Type': 'application/x-www-form-urlencoded'}, data={'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': api_key} ).json()['access_token'] def setup_task_credentials(client): # Get existing task credentials existing_credentials = client.task_credentials.get_details() # Delete existing credentials if any if "resources" in existing_credentials and existing_credentials["resources"]: for cred in existing_credentials["resources"]: cred_id = client.task_credentials.get_id(cred) client.task_credentials.delete(cred_id) # Store new credentials return client.task_credentials.store() def get_cred_value(key, creds_var_name="baked_in_creds", default=""): ### Helper for working with preset credentials """ Helper function to safely get a value from a credentials dictionary. Args: key: The key to look up in the credentials dictionary. creds_var_name: The variable name of the credentials dictionary. default: The default value to return if the key is not found. Returns: The value from the credentials dictionary if it exists and contains the key, otherwise returns the default value. """ # Check if the credentials variable exists in globals if creds_var_name in globals(): creds_dict = globals()[creds_var_name] if isinstance(creds_dict, dict) and key in creds_dict: return creds_dict[key] return default return ( APIClient, Credentials, ast, get_iam_token, json, pd, setup_task_credentials, tempfile, uuid, ) @app.cell def _(client_instantiation_form, os): if client_instantiation_form.value: client_setup = client_instantiation_form.value else: client_setup = None ### Extract Credential Variables: if client_setup is not None: wx_url = client_setup["wx_region"] wx_api_key = client_setup["wx_api_key"] os.environ["WATSONX_APIKEY"] = wx_api_key if client_setup["project_id"] is not None: project_id = client_setup["project_id"] else: project_id = None if client_setup["space_id"] is not None: space_id = client_setup["space_id"] else: space_id = None else: os.environ["WATSONX_APIKEY"] = "" project_id = None space_id = None wx_api_key = None wx_url = None return client_setup, project_id, space_id, wx_api_key, wx_url @app.cell def _(client_setup, get_iam_token, wx_api_key): if client_setup: token = get_iam_token(wx_api_key) else: token = None return @app.cell def _(mo): ### Credentials for the watsonx.ai SDK client # Endpoints wx_platform_url = "https://api.dataplatform.cloud.ibm.com" regions = { "US": "https://us-south.ml.cloud.ibm.com", "EU": "https://eu-de.ml.cloud.ibm.com", "GB": "https://eu-gb.ml.cloud.ibm.com", "JP": "https://jp-tok.ml.cloud.ibm.com", "AU": "https://au-syd.ml.cloud.ibm.com", "CA": "https://ca-tor.ml.cloud.ibm.com" } # Create a form with multiple elements client_instantiation_form = ( mo.md(''' ###**watsonx.ai credentials:** {wx_region} {wx_api_key} {space_id} ''').style(max_height="300px", overflow="auto", border_color="blue") .batch( wx_region = mo.ui.dropdown(regions, label="Select your watsonx.ai region:", value="US", searchable=True), wx_api_key = mo.ui.text(placeholder="Add your IBM Cloud api-key...", label="IBM Cloud Api-key:", kind="password"), project_id = mo.ui.text(placeholder="Add your watsonx.ai project_id...", label="Project_ID:", kind="text"), space_id = mo.ui.text(placeholder="Add your watsonx.ai space_id...", label="Space_ID:", kind="text") ,) .form(show_clear_button=True, bordered=False) ) return (client_instantiation_form,) @app.cell def _( APIClient, Credentials, client_setup, project_id, setup_task_credentials, space_id, wx_api_key, wx_url, ): ### Instantiate the watsonx.ai client if client_setup: wx_credentials = Credentials( url=wx_url, api_key=wx_api_key ) if project_id: project_client = APIClient(credentials=wx_credentials, project_id=project_id) else: project_client = None if space_id: deployment_client = APIClient(credentials=wx_credentials, space_id=space_id) else: deployment_client = None if project_client is not None: task_credentials_details = setup_task_credentials(project_client) else: task_credentials_details = setup_task_credentials(deployment_client) else: wx_credentials = None project_client = None deployment_client = None task_credentials_details = None if project_client is not None or deployment_client is not None: client_callout_kind = "success" else: client_callout_kind = "neutral" return client_callout_kind, deployment_client @app.cell def _(mo): template_variants = [ "Base", "Stream Files to IBM COS [Example]", ] template_variant = mo.ui.dropdown(template_variants, label="Code Template:", value="Base") return (template_variant,) @app.cell def _(client_callout_kind, client_instantiation_form, mo, template_variant): client_callout = mo.callout(template_variant, kind=client_callout_kind) client_stack = mo.hstack([client_instantiation_form, client_callout], align="center", justify="space-around") return (client_stack,) @app.cell def _( client_stack, deploy_fnc, deployment_definition, fm, function_editor, hw_selection_table, mo, package_analysis_stack, package_meta, purge_tabs, sc_m, schema_editors, selection_table, ss_asset_details, upload_func, yaml_template, ): client_section = mo.md(f''' ###**Instantiate your watsonx.ai client:** 1. Select a region from the dropdown menu 2. Provide an IBM Cloud Apikey and watsonx.ai deployment space id 3. Once you submit, the area with the code template will turn green if successful 4. Select a base (provide baseline format) or example code function template --- {client_stack} ''') sc_tabs = mo.ui.tabs( { "Schema Option Selection": sc_m, "Schema Definition": mo.md(f""" ####**Edit the schema definitions you selected in the previous tab.**
{schema_editors}"""), } ) function_section = mo.md(f'''###**Create your function from the template:** 1. Use the code editor window to create a function to deploy
The function must:
--- Include a payload and score element
--- Have the same function name in both the score = () segment and the Function Name input field below
--- Additional details can be found here -> [watsonx.ai - Writing deployable Python functions ](https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/ml-deploy-py-function-write.html?utm_medium=Exinfluencer&utm_source=ibm_developer&utm_content=in_content_link&utm_term=10006555&utm_id=blogs_awb-tekton-optimizations-for-kubeflow-pipelines-2-0&context=wx&audience=wdp) 3. Click submit, then proceed to select whether you wish to add:
--- An input schema (describing the format of the variables the function takes) **[Optional]**
--- An output schema (describing the format of the output results the function returns) **[Optional]**
--- An sample input example (showing an example of a mapping of the input and output schema to actual values.) **[Optional]** 4. Fill in the function name field **(must be exactly the same as in the function editor)** 5. Add a description and metadata tags **[Optional]** --- {function_editor} --- {sc_tabs} --- {fm} ''') upload_section = mo.md(f''' ###**Review and Upload your function** 1. Review the function metadata specs JSON 2. Select a software specification if necessary (default for python functions is pre-selected), this is the runtime environment of python that your function will run in. Environments on watsonx.ai come pre-packaged with many different libraries, if necessary install new ones by adding them into the function as a `subprocess.check_output('pip install ', shell=True)` command. 3. Once your are satisfied, click the upload function button and wait for the response. > If you see no table of software specs, you haven't activated your watsonx.ai client. --- {selection_table} --- {upload_func} ''') deployment_section = mo.md(f''' ###**Deploy your function:** 1. Select a hardware specification (vCPUs/GB) that you want your function deployed on
--- XXS and XS cost the same (0.5 CUH per hour, so XS is the better option
--- Select larger instances for more resource intensive tasks or runnable jobs 2. Select the type of deployment:
--- Function (Online) for always-on endpoints - Always available and low latency, but consume resources continuously for every hour they are deployed.
--- Batch (Batch) for runnable jobs - Only consume resources during job runs, but aren't as flexible to deploy. 3. If you've selected Function, pick a completely unique (globally, not just your account) deployment serving name that will be in the endpoint url. 4. Once your are satisfied, click the deploy function button and wait for the response. --- {hw_selection_table} --- {deployment_definition} --- {deploy_fnc} ''') purging_section = mo.md(f''' ###**Helper Purge Functions:** These functions help you retrieve, select and delete deployments, data assets or repository assets (functions, models, etc.) that you have in the deployment space. This is meant to support fast cleanup. Select the tab based on what you want to delete, then click each of the buttons one by one after the previous gives a response. --- {purge_tabs} ''') packages_section = mo.md(f''' ###**If needed - Create a custom software-spec with added python packages** 1. Check to see if the python library you want to use is already available inside watsonx.ai's runtime environment base specs for deployed functions by adding them as a comma separated list, e.g. - plotly, ibm-watsonx-ai==1.3.6, etc. into the text area. 2. If you wish to see all the packages already present, check the box to return it alognside your validation results. --- {package_analysis_stack} --- 3. If it isn't, you can create a custom software spec that adds a package_extension add-on to it. Use the template selector to see some examples, but when you are ready add your packages in the code editor after the "pip:" section. {yaml_template} 4. Give your package extension and software spec names and descriptions and submit. 5. You will find it in the Function Upload table afterward. --- {package_meta} --- Results: {ss_asset_details} ''') return ( client_section, deployment_section, function_section, packages_section, upload_section, ) @app.cell def _(client_section, mo): ui_accordion_section_1 = mo.accordion( {"Section 1: **watsonx.ai Credentials**": client_section} ) ui_accordion_section_1 return @app.cell def _(function_section, mo): ui_accordion_section_2 = mo.accordion( {"Section 2: **Function Creation**": function_section} ) ui_accordion_section_2 return @app.cell def _(mo, packages_section): ui_accordion_section_3 = mo.accordion( {"Section 3: **Create a Package Extension (Optional)**": packages_section} ) ui_accordion_section_3 return @app.cell def _(mo, upload_section): ui_accordion_section_4 = mo.accordion( {"Section 4: **Function Upload**": upload_section} ) ui_accordion_section_4 return @app.cell def _(deployment_section, mo): ui_accordion_section_5 = mo.accordion( {"Section 5: **Function Deployment**": deployment_section} ) ui_accordion_section_5 return @app.cell def _(mo, packages_section): ui_accordion_section_6 = mo.accordion( {"Section 6: **Helper Functions**": packages_section} ) ui_accordion_section_6 return @app.cell def _(mo, template_variant): # Template for WatsonX.ai deployable function if template_variant.value == "Stream Files to IBM COS [Example]": with open("stream_files_to_cos.py", "r") as file: template = file.read() else: template = '''def your_function_name(): import subprocess subprocess.check_output('pip install gensim', shell=True) import gensim def score(input_data): message_from_input_payload = payload.get("input_data")[0].get("values")[0][0] response_message = "Received message - {0}".format(message_from_input_payload) # Score using the pre-defined model score_response = { 'predictions': [{'fields': ['Response_message_field', 'installed_lib_version'], 'values': [[response_message, gensim.__version__]] }] } return score_response return score score = your_function_name() ''' function_editor = ( mo.md(''' #### **Create your function by editing the template:** {editor} ''') .batch( editor = mo.ui.code_editor(value=template, language="python", min_height=200, theme="dark") ) .form(show_clear_button=True, bordered=False) ) # function_editor return (function_editor,) @app.cell def _(ast, function_editor, mo, os): function_name = None if function_editor.value: # Get the edited code from the function editor code = function_editor.value['editor'] # Extract function name using AST without executing the code try: # Parse the code to find function definitions parsed_ast = ast.parse(code) for node in parsed_ast.body: if isinstance(node, ast.FunctionDef): function_name = node.name break if function_name is not None: # Set deployable_function to None since we're not executing the code deployable_function = None mo.md(f"Found function: '{function_name}' (using file path approach)") # Create the directory if it doesn't exist save_dir = "/tmp/notebook_functions" os.makedirs(save_dir, exist_ok=True) # Save the function code to a file file_path = os.path.join(save_dir, f"{function_name}.py") with open(file_path, "w") as f: f.write(code) else: mo.md("No function found in the editor code") except SyntaxError as e: mo.md(f"Syntax error in function code: {str(e)}") return (function_name,) @app.cell def _(): yaml_templates = { "empty": """dependencies: - pip - pip: - - ... """, "llama_index_and_scikit": """dependencies: - pip - pip: - scikit-learn==1.6.1 - scikit-llm==1.4.1 - plotly==6.0.1 - altair==5.5.0 - PyMuPDF==1.25.1 - llama-index==0.12.24 - llama-index-readers-file==0.4.6 - llama-index-readers-web==0.3.8 """, "data_product_hub": """dependencies: - pip - pip: - PyMuPDF==1.25.1 - llama-index==0.12.24 - llama-index-readers-file==0.4.6 - llama-index-readers-web==0.3.8 - plotly==6.0.1 - altair==5.5.0 - dask==2025.2.0 - dph-services==0.4.0 """, "watsonx_data": """dependencies: - pip - pip: - prestodb - PyMuPDF==1.25.1 - llama-index==0.12.24 - llama-index-readers-file==0.4.6 - llama-index-readers-web==0.3.8 - ibm-watsonxdata==0.4.0 """ } return (yaml_templates,) @app.cell def _(mo, yaml_templates): yaml_template = mo.ui.dropdown(yaml_templates, searchable=True, label="**Select a template:**", value="empty") return (yaml_template,) @app.cell def _(tempfile): def create_yaml_tempfile(yaml_editor_value): """Creates temporary YAML file and returns its path""" temp_file = tempfile.NamedTemporaryFile(suffix='.yaml', delete=False) # Access the actual YAML content within the dictionary structure if isinstance(yaml_editor_value, dict) and 'yml_editor' in yaml_editor_value: # Extract the YAML content string from the yml_editor key yaml_content = yaml_editor_value['yml_editor'] else: # Use as is if it's already a string or has unexpected structure yaml_content = yaml_editor_value # Write the content to the file with open(temp_file.name, 'w') as f: f.write(str(yaml_content)) return temp_file.name return (create_yaml_tempfile,) @app.cell def _(mo, yaml_template): pkg_types = {"Conda Yaml":"conda_yml","Custom user library":"custom_library"} package_meta = ( mo.md('''**Create your Conda YAML by editing the template:** {yml_editor} {package_name} {package_description} {software_spec_name} {software_spec_description} ''') .batch( yml_editor = mo.ui.code_editor(value=yaml_template.value, language="yaml", min_height=100, theme="dark"), package_name = mo.ui.text(placeholder="Python Package for...", label="Package Extension Name:", kind="text", value="Custom Python Package"), software_spec_name = mo.ui.text(placeholder="Software Spec Name", label="Custom Software Spec Name:", kind="text", value="Extended Python Function Software Spec"), package_description = mo.ui.text_area(placeholder="Write a description for your package.", label="Package Description:", value=" "), software_spec_description = mo.ui.text_area(placeholder="Write a description for your software spec.", label="Software Spec Description:", value=" "), package_type = mo.ui.dropdown(pkg_types, label="Select your package type:", value="Conda Yaml") ) .form(show_clear_button=True, bordered=False) ) return (package_meta,) @app.cell def _(mo): check_packages =(mo.md(''' **Check if a package you want to use is in the base software_specification already:** {package_list} {return_full_list} ''') .batch( package_list = mo.ui.text_area(placeholder="Add packages as a comma separated list (with or without versions)."), return_full_list = mo.ui.checkbox(value=False, label="Return a full list of packages in the base software specification.") ) .form(show_clear_button=True, bordered=False) ) return (check_packages,) @app.cell def _(check_packages): if check_packages.value is not None: packages = check_packages.value['package_list'] verification_list = [item.strip() for item in packages.split(',')] full_list_return = check_packages.value['return_full_list'] else: packages = None verification_list = None full_list_return = None return full_list_return, verification_list @app.cell def _( analyze_software_spec, base_software_spec, full_list_return, verification_list, visualize_software_spec, ): if verification_list is not None: pkg_analysis = analyze_software_spec(base_software_spec, verification_list, return_full_sw_package_list=full_list_return) package_df = visualize_software_spec(pkg_analysis, verification_list) else: pkg_analysis = None package_df = None return (package_df,) @app.cell def _(check_packages, mo, package_df): package_analysis_stack = mo.vstack([check_packages, package_df], justify="center") return (package_analysis_stack,) @app.cell def _(deployment_client): if deployment_client is not None: base_sw_spec_id = "45f12dfe-aa78-5b8d-9f38-0ee223c47309" base_software_spec = deployment_client.software_specifications.get_details(base_sw_spec_id) else: base_sw_spec_id = None base_software_spec = None return base_software_spec, base_sw_spec_id @app.cell def _(create_yaml_tempfile, deployment_client, package_meta): if package_meta.value is not None and deployment_client is not None: pe_metadata = { deployment_client.package_extensions.ConfigurationMetaNames.NAME: package_meta.value['package_name'], deployment_client.package_extensions.ConfigurationMetaNames.TYPE: package_meta.value['package_type'], deployment_client.software_specifications.ConfigurationMetaNames.DESCRIPTION:package_meta.value['package_description'] } yaml_file_path = create_yaml_tempfile(package_meta.value['yml_editor']) else: pe_metadata = { } yaml_file_path = None return pe_metadata, yaml_file_path @app.cell def _( base_sw_spec_id, deployment_client, package_meta, pe_metadata, yaml_file_path, ): if yaml_file_path is not None: ### Stores the package extension pe_asset_details = deployment_client.package_extensions.store( meta_props=pe_metadata, file_path=yaml_file_path ) package_id = pe_asset_details["metadata"]["asset_id"] ### Creates a custom software specification based on the standard python function spec_id - "45f12dfe-aa78-5b8d-9f38-0ee223c47309" ss_metadata = { deployment_client.software_specifications.ConfigurationMetaNames.NAME: package_meta.value['software_spec_name'], deployment_client.software_specifications.ConfigurationMetaNames.DESCRIPTION: package_meta.value['software_spec_description'], deployment_client.software_specifications.ConfigurationMetaNames.BASE_SOFTWARE_SPECIFICATION: {'guid': base_sw_spec_id}, deployment_client.software_specifications.ConfigurationMetaNames.PACKAGE_EXTENSIONS: [{'guid': package_id}] } ss_asset_details = deployment_client.software_specifications.store(meta_props=ss_metadata) else: pe_asset_details = None package_id = None ss_metadata = {} ss_asset_details = None # ss_asset_details return (ss_asset_details,) @app.cell def _(deployment_client, mo, pd): if deployment_client: supported_specs = deployment_client.software_specifications.list()[ deployment_client.software_specifications.list()['STATE'] == 'supported' ] # Reset the index to start from 0 supported_specs = supported_specs.reset_index(drop=True) # Create a mapping dictionary for framework names based on software specifications framework_mapping = { "tensorflow_rt24.1-py3.11": "TensorFlow", "pytorch-onnx_rt24.1-py3.11": "PyTorch", "onnxruntime_opset_19": "ONNX or ONNXRuntime", "runtime-24.1-py3.11": "AI Services/Python Functions/Python Scripts", "autoai-ts_rt24.1-py3.11": "AutoAI", "autoai-kb_rt24.1-py3.11": "AutoAI", "runtime-24.1-py3.11-cuda": "CUDA-enabled (GPU) Python Runtime", "runtime-24.1-r4.3": "R Runtime 4.3", "spark-mllib_3.4": "Apache Spark 3.4", "autoai-rag_rt24.1-py3.11": "AutoAI RAG" } # Define the preferred order for items to appear at the top preferred_order = [ "runtime-24.1-py3.11", "runtime-24.1-py3.11-cuda", "runtime-24.1-r4.3", "ai-service-v5-software-specification", "autoai-rag_rt24.1-py3.11", "autoai-ts_rt24.1-py3.11", "autoai-kb_rt24.1-py3.11", "tensorflow_rt24.1-py3.11", "pytorch-onnx_rt24.1-py3.11", "onnxruntime_opset_19", "spark-mllib_3.4", ] # Create a new column for sorting supported_specs['SORT_ORDER'] = supported_specs['NAME'].apply( lambda x: preferred_order.index(x) if x in preferred_order else len(preferred_order) ) # Sort the DataFrame by the new column supported_specs = supported_specs.sort_values('SORT_ORDER').reset_index(drop=True) # Drop the sorting column as it's no longer needed supported_specs = supported_specs.drop(columns=['SORT_ORDER']) # Drop the REPLACEMENT column if it exists and add NOTES column if 'REPLACEMENT' in supported_specs.columns: supported_specs = supported_specs.drop(columns=['REPLACEMENT']) # Add NOTES column with framework information supported_specs['NOTES'] = supported_specs['NAME'].map(framework_mapping).fillna("Other") # Create a table with single-row selection selection_table = mo.ui.table( supported_specs, selection="single", # Only allow selecting one row label="#### **Select a supported software_spec runtime for your function asset** (For Python Functions select - *'runtime-24.1-py3.11'* ):", initial_selection=[0], # Now selecting the first row, which should be runtime-24.1-py3.11 page_size=6 ) else: sel_df = pd.DataFrame( data=[["ID", "Activate deployment_client."]], columns=["ID", "VALUE"] ) selection_table = mo.ui.table( sel_df, selection="single", # Only allow selecting one row label="You haven't activated the Deployment_Client", initial_selection=[0] ) return (selection_table,) @app.cell def _(mo): input_schema_checkbox = mo.ui.checkbox(label="Add input schema (optional)") output_schema_checkbox = mo.ui.checkbox(label="Add output schema (optional)") # sample_input_checkbox = mo.ui.checkbox(label="Add sample input example (optional)") return input_schema_checkbox, output_schema_checkbox @app.cell def _( function_name, input_schema_checkbox, mo, output_schema_checkbox, selection_table, template_variant, ): if selection_table.value['ID'].iloc[0]: # Create the input fields if function_name is not None: fnc_nm = function_name else: if template_variant.value == "Stream Files to IBM COS [Example]": fnc_nm = "stream_file_to_cos" else: fnc_nm = "your_function_name" uploaded_function_name = mo.ui.text(placeholder="", label="Function Name:", kind="text", value=f"{fnc_nm}", full_width=False) tags_editor = mo.ui.array( [mo.ui.text(placeholder="Metadata Tags..."), mo.ui.text(), mo.ui.text()], label="Optional Metadata Tags" ) software_spec = selection_table.value['ID'].iloc[0] description_input = mo.ui.text_area( placeholder="Write a description for your function...)", label="Description", max_length=256, rows=5, full_width=True ) func_metadata=mo.hstack([ description_input, mo.hstack([ uploaded_function_name, tags_editor, ], justify="start", gap=1, align="start", wrap=True) ], widths=[0.6,0.4], gap=2.75 ) schema_metadata=mo.hstack([ input_schema_checkbox, output_schema_checkbox, # sample_input_checkbox ], justify="center", gap=1, align="center", wrap=True ) fm = mo.vstack([ func_metadata, ], align="center", gap=2 ) sc_m = mo.vstack([ schema_metadata, mo.md("**Make sure to select the checkbox options before filling in descriptions and tags or they will reset.**") ], align="center", gap=2 ) return ( description_input, fm, sc_m, software_spec, tags_editor, uploaded_function_name, ) @app.cell def _(json, mo, template_variant): if template_variant.value == "Stream Files to IBM COS [Example]": from cos_stream_schema_examples import input_schema, output_schema else: input_schema = [ { 'id': '1', 'type': 'struct', 'fields': [ { 'name': '', 'type': 'string', 'nullable': False, 'metadata': {} }, { 'name': '', 'type': 'string', 'nullable': False, 'metadata': {} } ] } ] output_schema = [ { 'id': '1', 'type': 'struct', 'fields': [ { 'name': '', 'type': 'string', 'nullable': False, 'metadata': {} } ] } ] input_schema_editor = mo.ui.code_editor(value=json.dumps(input_schema, indent=4), language="python", min_height=100,theme="dark") output_schema_editor = mo.ui.code_editor(value=json.dumps(output_schema, indent=4), language="python", min_height=100,theme="dark") schema_editors = mo.accordion( { """**Input Schema Metadata Editor**""": input_schema_editor, """**Output Schema Metadata Editor**""": output_schema_editor, }, multiple=True ) # schema_editors return input_schema_editor, output_schema_editor, schema_editors @app.cell def _( ast, deployment_client, description_input, function_editor, input_schema_checkbox, input_schema_editor, json, mo, os, output_schema_checkbox, output_schema_editor, selection_table, software_spec, tags_editor, uploaded_function_name, ): get_upload_status, set_upload_status = mo.state("No uploads yet") function_meta = {} if selection_table.value['ID'].iloc[0] and deployment_client is not None: # Start with the base required fields function_meta = { deployment_client.repository.FunctionMetaNames.NAME: f"{uploaded_function_name.value}" or "your_function_name", deployment_client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec or "45f12dfe-aa78-5b8d-9f38-0ee223c47309" } # Add optional fields if they exist if tags_editor.value: # Filter out empty strings from the tags list filtered_tags = [tag for tag in tags_editor.value if tag and tag.strip()] if filtered_tags: # Only add if there are non-empty tags function_meta[deployment_client.repository.FunctionMetaNames.TAGS] = filtered_tags if description_input.value: function_meta[deployment_client.repository.FunctionMetaNames.DESCRIPTION] = description_input.value # Add input schema if checkbox is checked if input_schema_checkbox.value: try: function_meta[deployment_client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = json.loads(input_schema_editor.value) except json.JSONDecodeError: # If JSON parsing fails, try Python literal evaluation as fallback function_meta[deployment_client.repository.FunctionMetaNames.INPUT_DATA_SCHEMAS] = ast.literal_eval(input_schema_editor.value) # Add output schema if checkbox is checked if output_schema_checkbox.value: try: function_meta[deployment_client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = json.loads(output_schema_editor.value) except json.JSONDecodeError: # If JSON parsing fails, try Python literal evaluation as fallback function_meta[deployment_client.repository.FunctionMetaNames.OUTPUT_DATA_SCHEMAS] = ast.literal_eval(output_schema_editor.value) # Add sample input if checkbox is checked # if sample_input_checkbox.value: # try: # function_meta[deployment_client.repository.FunctionMetaNames.SAMPLE_SCORING_INPUT] = json.loads(sample_input_editor.value) # except json.JSONDecodeError: # # If JSON parsing fails, try Python literal evaluation as fallback # function_meta[deployment_client.repository.FunctionMetaNames.SAMPLE_SCORING_INPUT] = ast.literal_eval(sample_input_editor.value) def upload_function(function_meta, use_function_object=False): """ Uploads a Python function to watsonx.ai as a deployable asset. Parameters: function_meta (dict): Metadata for the function use_function_object (bool): Whether to use function object (True) or file path (False) Returns: dict: Details of the uploaded function """ # Store the original working directory original_dir = os.getcwd() try: # Create temp file from the code in the editor code_to_deploy = function_editor.value['editor'] # This function is defined elsewhere in the notebook func_name = uploaded_function_name.value or "your_function_name" # Ensure function_meta has the correct function name function_meta[deployment_client.repository.FunctionMetaNames.NAME] = func_name # Save the file locally first save_dir = "/tmp/notebook_functions" os.makedirs(save_dir, exist_ok=True) file_path = f"{save_dir}/{func_name}.py" with open(file_path, "w", encoding="utf-8") as f: f.write(code_to_deploy) if use_function_object: # Import the function from the file import sys import importlib.util # Add the directory to Python's path sys.path.append(save_dir) # Import the module spec = importlib.util.spec_from_file_location(func_name, file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Get the function object function_object = getattr(module, func_name) # Change to /tmp directory before calling IBM Watson SDK functions os.chdir('/tmp/notebook_functions') # Upload the function object mo.md(f"Uploading function object: {func_name}") func_details = deployment_client.repository.store_function(function_object, function_meta) else: # Change to /tmp directory before calling IBM Watson SDK functions os.chdir('/tmp/notebook_functions') # # Upload using the absolute file path # abs_file_path = os.path.abspath(file_path) # mo.md(f"Uploading function from file: {abs_file_path}") # # Using the absolute file path might help in some environments # func_details = deployment_client.repository.store_function(abs_file_path, function_meta) # Upload using the file path approach # Create a zip file of the Python module import gzip import shutil # Path for the gzipped file gz_path = f"{save_dir}/{func_name}.py.gz" # Create gzip file with open(file_path, 'rb') as f_in: with gzip.open(gz_path, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) # Upload using the gzipped file path mo.md(f"Uploading function from gzip: {gz_path}") func_details = deployment_client.repository.store_function(gz_path, function_meta) # mo.md(f"Uploading function from file: {file_path}") # func_details = deployment_client.repository.store_function(file_path, function_meta) set_upload_status(f"Latest Upload - id - {func_details['metadata']['id']}") return func_details except Exception as e: set_upload_status(f"Error uploading function: {str(e)}") mo.md(f"Detailed error: {str(e)}") raise finally: # Always change back to the original directory, even if an exception occurs os.chdir(original_dir) upload_status = mo.state("No uploads yet") upload_button = mo.ui.button( label="Upload Function", on_click=lambda _: upload_function(function_meta, use_function_object=False), kind="success", tooltip="Click to upload function to watsonx.ai" ) # function_meta return get_upload_status, upload_button @app.cell def _(get_upload_status, mo, upload_button): # Upload your function if upload_button.value: try: upload_result = upload_button.value artifact_id = upload_result['metadata']['id'] except Exception as e: mo.md(f"Error: {str(e)}") upload_func = mo.vstack([ upload_button, mo.md(f"**Status:** {get_upload_status()}") ], justify="space-around", align="center") return artifact_id, upload_func @app.cell def _(deployment_client, mo, pd, upload_button, uuid): def reorder_hardware_specifications(df): """ Reorders a hardware specifications dataframe by type and size of environment without hardcoding specific hardware types. Parameters: df (pandas.DataFrame): The hardware specifications dataframe to reorder Returns: pandas.DataFrame: Reordered dataframe with reset index """ # Create a copy to avoid modifying the original dataframe result_df = df.copy() # Define a function to extract the base type and size def get_sort_key(name): # Create a custom ordering list custom_order = [ "XXS", "XS", "S", "M", "L", "XL", "XS-Spark", "S-Spark", "M-Spark", "L-Spark", "XL-Spark", "K80", "K80x2", "K80x4", "V100", "V100x2", "WXaaS-XS", "WXaaS-S", "WXaaS-M", "WXaaS-L", "WXaaS-XL", "Default Spark", "Notebook Default Spark", "ML" ] # If name is in the custom order list, use its index if name in custom_order: return (0, custom_order.index(name)) # For any name not in the custom order, put it at the end return (1, name) # Add a temporary column for sorting result_df['sort_key'] = result_df['NAME'].apply(get_sort_key) # Sort the dataframe and drop the temporary column result_df = result_df.sort_values('sort_key').drop('sort_key', axis=1) # Reset the index result_df = result_df.reset_index(drop=True) return result_df if deployment_client and upload_button.value: hardware_specs = deployment_client.hardware_specifications.list() hardware_specs_df = reorder_hardware_specifications(hardware_specs) # Create a table with single-row selection hw_selection_table = mo.ui.table( hardware_specs_df, selection="single", # Only allow selecting one row label="#### **Select a supported hardware_specification for your deployment** *(Default: 'XS' - 1vCPU_4GB Ram)*", initial_selection=[1], page_size=6, wrapped_columns=['DESCRIPTION'] ) deployment_type = mo.ui.radio( options={"Function":"Online (Function Endpoint)","Runnable Job":"Batch (Runnable Jobs)"}, value="Function", label="Select the Type of Deployment:", inline=True ) uuid_suffix = str(uuid.uuid4())[:4] deployment_name = mo.ui.text(value=f"deployed_func_{uuid_suffix}", label="Deployment Name:", placeholder="") else: hw_df = pd.DataFrame( data=[["ID", "Activate deployment_client."]], columns=["ID", "VALUE"] ) hw_selection_table = mo.ui.table( hw_df, selection="single", # Only allow selecting one row label="You haven't activated the Deployment_Client", initial_selection=[0] ) return deployment_name, deployment_type, hw_selection_table @app.cell def _( artifact_id, deployment_client, deployment_details, deployment_name, deployment_type, hw_selection_table, mo, upload_button, ): def deploy_function(artifact_id, deployment_type): """ Deploys a function asset to watsonx.ai. Parameters: artifact_id (str): ID of the function artifact to deploy deployment_type (object): Type of deployment (online or batch) Returns: dict: Details of the deployed function """ if not artifact_id: print("Error: No artifact ID provided. Please upload a function first.") return None if deployment_type.value == "Online (Function Endpoint)": # Changed from "Online (Function Endpoint)" deployment_props = { deployment_client.deployments.ConfigurationMetaNames.NAME: deployment_name.value, deployment_client.deployments.ConfigurationMetaNames.ONLINE: {}, deployment_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config}, deployment_client.deployments.ConfigurationMetaNames.SERVING_NAME: deployment_name.value, } else: # "Runnable Job" instead of "Batch (Runnable Jobs)" deployment_props = { deployment_client.deployments.ConfigurationMetaNames.NAME: deployment_name.value, deployment_client.deployments.ConfigurationMetaNames.BATCH: {}, deployment_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": selected_hw_config}, # batch does not use serving names } try: print(deployment_props) # First, get the asset details to confirm it exists asset_details = deployment_client.repository.get_details(artifact_id) print(f"Asset found: {asset_details['metadata']['name']} with ID: {asset_details['metadata']['id']}") # Create the deployment deployed_function = deployment_client.deployments.create(artifact_id, deployment_props) print(f"Creating deployment from Asset: {artifact_id} with deployment properties {str(deployment_props)}") return deployed_function except Exception as e: print(f"Deployment error: {str(e)}") return None def get_deployment_id(deployed_function): deployment_id = deployment_client.deployments.get_uid(deployment_details) return deployment_id def get_deployment_info(deployment_id): deployment_info = deployment_client.deployments.get_details(deployment_id) return deployment_info deployment_status = mo.state("No deployments yet") if hw_selection_table.value['ID'].iloc[0]: selected_hw_config = hw_selection_table.value['ID'].iloc[0] deploy_button = mo.ui.button( label="Deploy Function", on_click=lambda _: deploy_function(artifact_id, deployment_type), kind="success", tooltip="Click to deploy function to watsonx.ai" ) if deployment_client and upload_button.value: deployment_definition = mo.hstack([ deployment_type, deployment_name ], justify="space-around") else: deployment_definition = mo.hstack([ "No Deployment Type Selected", "No Deployment Name Provided" ], justify="space-around") # deployment_definition return deploy_button, deployment_definition @app.cell def _(deploy_button, deployment_definition, mo): _ = deployment_definition deploy_fnc = mo.vstack([ deploy_button, deploy_button.value ], justify="space-around", align="center") return (deploy_fnc,) @app.cell(hide_code=True) def _(deployment_client, mo, pd): ### Functions to List , Get ID's as a list and Purge of Assets def get_deployment_list(): dep_df = deployment_client.deployments.list() dep_df = pd.DataFrame(dep_df) return dep_df def get_deployment_ids(df): dep_list = df['ID'].tolist() return dep_list #---- def get_data_assets_list(): data_a_df = deployment_client.data_assets.list() data_a_df = pd.DataFrame(data_a_df) return data_a_df def get_data_asset_ids(df): data_asset_list = df['ASSET_ID'].tolist() return data_asset_list #---- def get_repository_list(): rep_list_df = deployment_client.repository.list() rep_list_df = pd.DataFrame(rep_list_df) return rep_list_df def get_repository_ids(df): repository_list = df['ID'].tolist() return repository_list #---- def delete_with_progress(ids_list, delete_function, item_type="items"): """ Generic wrapper that adds a progress bar to any deletion function Parameters: ids_list: List of IDs to delete delete_function: Function that deletes a single ID item_type: String describing what's being deleted (for display) """ with mo.status.progress_bar( total=len(ids_list) or 1, title=f"Purging {item_type}", subtitle=f"Deleting {item_type}...", completion_title="Purge Complete", completion_subtitle=f"Successfully deleted {len(ids_list)} {item_type}" ) as progress: for item_id in ids_list: delete_function(item_id) progress.update(increment=1) return f"Deleted {len(ids_list)} {item_type} successfully" # Use with existing deletion functions def delete_deployments(deployment_ids): return delete_with_progress( deployment_ids, lambda id: deployment_client.deployments.delete(id), "deployments" ) def delete_data_assets(data_asset_ids): return delete_with_progress( data_asset_ids, lambda id: deployment_client.data_assets.delete(id), "data assets" ) def delete_repository_items(repository_ids): return delete_with_progress( repository_ids, lambda id: deployment_client.repository.delete(id), "repository items" ) return ( delete_data_assets, delete_deployments, delete_repository_items, get_data_asset_ids, get_data_assets_list, get_deployment_ids, get_deployment_list, get_repository_ids, get_repository_list, ) @app.cell def _(get_data_assets_tab, get_deployments_tab, get_repository_tab, mo): if get_deployments_tab() is not None: deployments_table = mo.ui.table(get_deployments_tab()) else: deployments_table = mo.md("No Table Loaded") if get_repository_tab() is not None: repository_table = mo.ui.table(get_repository_tab()) else: repository_table = mo.md("No Table Loaded") if get_data_assets_tab() is not None: data_assets_table = mo.ui.table(get_data_assets_tab()) else: data_assets_table = mo.md("No Table Loaded") return data_assets_table, deployments_table, repository_table @app.cell def _( deployments_table, get_deployment_id_list, get_deployments_button, mo, purge_deployments, ): deployments_purge_stack = mo.hstack([get_deployments_button, get_deployment_id_list, purge_deployments]) deployments_purge_stack_results = mo.vstack([deployments_table, get_deployment_id_list.value, purge_deployments.value]) deployments_purge_tab = mo.vstack([deployments_purge_stack, deployments_purge_stack_results]) return (deployments_purge_tab,) @app.cell def _( get_repository_button, get_repository_id_list, mo, purge_repository, repository_table, ): repository_purge_stack = mo.hstack([get_repository_button, get_repository_id_list, purge_repository]) repository_purge_stack_results = mo.vstack([repository_table, get_repository_id_list.value, purge_repository.value]) repository_purge_tab = mo.vstack([repository_purge_stack, repository_purge_stack_results]) return (repository_purge_tab,) @app.cell def _( data_assets_table, get_data_asset_id_list, get_data_assets_button, mo, purge_data_assets, ): data_assets_purge_stack = mo.hstack([get_data_assets_button, get_data_asset_id_list, purge_data_assets]) data_assets_purge_stack_results = mo.vstack([data_assets_table, get_data_asset_id_list.value, purge_data_assets.value]) data_assets_purge_tab = mo.vstack([data_assets_purge_stack, data_assets_purge_stack_results]) return (data_assets_purge_tab,) @app.cell def _(data_assets_purge_tab, deployments_purge_tab, mo, repository_purge_tab): purge_tabs = mo.ui.tabs( {"Purge Deployments": deployments_purge_tab, "Purge Repository Assets": repository_purge_tab,"Purge Data Assets": data_assets_purge_tab }, lazy=False ) return (purge_tabs,) @app.cell def _(mo): get_deployments_tab, set_deployments_tab = mo.state(None) get_repository_tab, set_repository_tab = mo.state(None) get_data_assets_tab, set_data_assets_tab = mo.state(None) return ( get_data_assets_tab, get_deployments_tab, get_repository_tab, set_data_assets_tab, set_deployments_tab, set_repository_tab, ) @app.cell(hide_code=True) def _( data_assets_table, delete_data_assets, delete_deployments, delete_repository_items, deployments_table, get_data_asset_ids, get_data_assets_list, get_deployment_ids, get_deployment_list, get_repository_ids, get_repository_list, mo, repository_table, set_data_assets_tab, set_deployments_tab, set_repository_tab, ): ### Temporary Function Purge - Assets get_data_assets_button = mo.ui.button( label="Get Data Assets Dataframe", on_click=lambda _: get_data_assets_list(), on_change=lambda value: set_data_assets_tab(value), kind="neutral", ) get_data_asset_id_list = mo.ui.button( label="Turn Dataframe into List of IDs", on_click=lambda _: get_data_asset_ids(data_assets_table.value), kind="neutral", ) purge_data_assets = mo.ui.button( label="Purge Data Assets", on_click=lambda _: delete_data_assets(get_data_asset_id_list.value), kind="danger", ) ### Temporary Function Purge - Deployments get_deployments_button = mo.ui.button( label="Get Deployments Dataframe", on_click=lambda _: get_deployment_list(), on_change=lambda value: set_deployments_tab(value), kind="neutral", ) get_deployment_id_list = mo.ui.button( label="Turn Dataframe into List of IDs", on_click=lambda _: get_deployment_ids(deployments_table.value), kind="neutral", ) purge_deployments = mo.ui.button( label="Purge Deployments", on_click=lambda _: delete_deployments(get_deployment_id_list.value), kind="danger", ) ### Repository Items Purge get_repository_button = mo.ui.button( label="Get Repository Dataframe", on_click=lambda _: get_repository_list(), on_change=lambda value: set_repository_tab(value), kind="neutral", ) get_repository_id_list = mo.ui.button( label="Turn Dataframe into List of IDs", on_click=lambda _: get_repository_ids(repository_table.value), kind="neutral", ) purge_repository = mo.ui.button( label="Purge Repository Items", on_click=lambda _: delete_repository_items(get_repository_id_list.value), kind="danger", ) return ( get_data_asset_id_list, get_data_assets_button, get_deployment_id_list, get_deployments_button, get_repository_button, get_repository_id_list, purge_data_assets, purge_deployments, purge_repository, ) if __name__ == "__main__": app.run()