# /// script # [tool.marimo.display] # custom_css = ["./custom_header_font.css"] # /// import marimo __generated_with = "0.13.4" app = marimo.App( width="full", app_title="watsonx-SheetProcessor-9000", ) @app.cell def _(): import marimo as mo from typing import Dict, Optional, List, Union, Any, Set from ibm_watsonx_ai import APIClient, Credentials from pathlib import Path import pandas as pd import mimetypes import requests import zipfile import tempfile import base64 import json import glob import ast import re import os import io return APIClient, Credentials, ast, io, mo, os, pd @app.cell def _(): ### Loads baked in credentials if present from baked_in_credentials.creds import credentials from base_variables import wx_regions, wx_platform_url ### Loads helper functions from helper_functions.helper_functions import ( get_cred_value, get_model_selection_table, filter_models_by_function, _enforce_model_selection, update_max_tokens_limit, get_key_by_value, markdown_spacing, load_file_dataframe, create_parameter_table, convert_table_to_json_docs, get_cell_values, wrap_with_spaces, load_templates ) ### Table Related Helper Functions from helper_functions.table_helper_functions import ( append_llm_results_to_dataframe, display_answers_as_markdown, display_answers_stacked, process_with_llm ) return ( append_llm_results_to_dataframe, convert_table_to_json_docs, create_parameter_table, credentials, get_cell_values, get_cred_value, get_key_by_value, get_model_selection_table, load_file_dataframe, load_templates, process_with_llm, wrap_with_spaces, wx_regions, ) @app.cell def _(credentials, get_cred_value, mo, wx_regions): # Create a form with multiple elements baked_in_creds = credentials client_instantiation_form = ( mo.md(''' ###**watsonx.ai credentials:** {wx_region} {wx_api_key} {project_id} {space_id} ''') .batch( wx_region = mo.ui.dropdown( wx_regions, label="Select your watsonx.ai region:", value="US", searchable=True ), wx_api_key = mo.ui.text( placeholder="Add your IBM Cloud api-key...", label="IBM Cloud Api-key:", kind="password", value=get_cred_value('api_key', creds_var_name='baked_in_creds') ), project_id = mo.ui.text( placeholder="Add your watsonx.ai project_id...", label="Project_ID:", kind="text", value=get_cred_value('project_id', creds_var_name='baked_in_creds') ), space_id = mo.ui.text( placeholder="Add your watsonx.ai space_id...", label="Space_ID:", kind="text", value=get_cred_value('space_id', creds_var_name='baked_in_creds') ) ,) .form(show_clear_button=True, bordered=False) ) return (client_instantiation_form,) @app.cell def _(client_section, mo): ui_accordion_section_1 = mo.accordion( {"Section 1: **watsonx.ai Credentials**": client_section} ) ui_accordion_section_1 return @app.cell def _(file_uploader, mo): ui_accordion_section_2 = mo.accordion( {"Section 2: **File Loading**": file_uploader} ) ui_accordion_section_2 return @app.cell def _(column_and_rows_sector, mo): ui_accordion_section_3 = mo.accordion( {"Section 3: **Column and Row Selection**": column_and_rows_sector} ) ui_accordion_section_3 return @app.cell def _(llm_setup, mo): ui_accordion_section_4 = mo.accordion( {"Section 4: **Model Setup**": llm_setup} ) ui_accordion_section_4 return @app.cell def _(mo, prompt_setup_stack): ui_accordion_section_5 = mo.accordion( {"Section 5: **Choose the Number of Prompts and Templates**": prompt_setup_stack} ) ui_accordion_section_5 return @app.cell def _(mo, prompt_stack): ui_accordion_section_6 = mo.accordion( {"Section 6: **Prompt Setup**": prompt_stack} ) ui_accordion_section_6 return @app.cell def _(client_instantiation_form, os): if client_instantiation_form.value: client_setup = client_instantiation_form.value else: client_setup = None ### Extract Credential Variables: if client_setup is not None: wx_url = client_setup["wx_region"] wx_api_key = client_setup["wx_api_key"] os.environ["WATSONX_APIKEY"] = wx_api_key project_id = client_setup.get("project_id") space_id = client_setup.get("space_id") else: os.environ["WATSONX_APIKEY"] = "" project_id = space_id = wx_api_key = wx_url = None return client_setup, project_id, space_id, wx_api_key, wx_url @app.cell def _( APIClient, Credentials, client_setup, project_id, space_id, wx_api_key, wx_url, ): if client_setup: wx_credentials = Credentials(url=wx_url, api_key=wx_api_key) project_client = APIClient(credentials=wx_credentials, project_id=project_id) if project_id else None deployment_client = APIClient(credentials=wx_credentials, space_id=space_id) if space_id else None else: wx_credentials = project_client = deployment_client = None return deployment_client, project_client @app.cell def _( client, client_key, client_options, client_selector, get_key_by_value, mo, wrap_with_spaces, ): active_client_name = get_key_by_value(client_options,client_key) client_status = mo.md(f"### Client Instantiation Status will turn Green When Ready\n\n {client_selector}\n\n**Active Client:**{wrap_with_spaces(active_client_name, prefix_spaces=5)}") client_callout_kind = "success" if client is not None else "neutral" return client_callout_kind, client_status @app.cell def _(client_callout_kind, client_instantiation_form, client_status, mo): client_callout = mo.callout(client_status, kind=client_callout_kind) client_section = mo.hstack([client_instantiation_form, client_callout], align="center", justify="space-around") return (client_section,) @app.cell def _(deployment_client, mo, project_client): if project_client is not None and deployment_client is not None: client_options = {"Project Client":project_client, "Deployment Client":deployment_client} elif project_client is not None: client_options = {"Project Client":project_client} elif deployment_client is not None: client_options = {"Deployment Client":deployment_client} else: client_options = {"No Client": "Instantiate a Client"} default_client = next(iter(client_options)) client_selector = mo.ui.dropdown(client_options, value=default_client, label="**Switch your active client:**") return client_options, client_selector @app.cell def _(client_selector): client_key = client_selector.value if client_key == "Instantiate a Client": client = None else: client = client_key return client, client_key @app.cell def _(mo): file = mo.ui.file( kind="area", filetypes=[".xlsx", ".xls", ".csv", ".json"], label="Upload a file (CSV, Excel, or JSON)" ) return (file,) @app.cell def _(file, io, mo, os, pd): def get_file_extension(filename): """Get the file extension from a filename.""" if not filename: return None return os.path.splitext(filename)[1].lower() # Initialize variables sheet_names = [] file_extension = None excel_data = None if file.contents(): file_extension = get_file_extension(file.name()) # Handle Excel files to get sheet names if file_extension in ['.xlsx', '.xls']: # For Excel files excel_data = io.BytesIO(file.contents()) # Get sheet names without loading the data yet sheet_names = pd.ExcelFile(excel_data).sheet_names # Create sheet selector for Excel files if file_extension in ['.xlsx', '.xls'] and sheet_names: sheet_selector = mo.ui.dropdown( options=sheet_names, value=sheet_names[0], label="Select Sheet:", full_width=False, searchable=True ) else: sheet_selector = None return excel_data, file_extension, sheet_selector @app.cell def _(mo, table_dataframe_raw): if not table_dataframe_raw.empty: apply_header_readjustment = mo.ui.checkbox(label="Activate Header Adjustment") else: apply_header_readjustment = None return (apply_header_readjustment,) @app.cell def _(mo): show_variable_sidebar = mo.ui.checkbox(label="Show Sidebar with Input Variables", value=False) return (show_variable_sidebar,) @app.cell def _(apply_header_readjustment, mo, sheet_selector, table_dataframe_raw): if not table_dataframe_raw.empty: if apply_header_readjustment.value: header_row = mo.ui.number( label="Header Row index:", value=0, start=0, stop=len(table_dataframe_raw)+1 ) else: header_row = mo.ui.number( label="Header Row index:", value=0, start=0, stop=0 ) else: header_row = None sheet_and_column_controls = mo.hstack([sheet_selector, apply_header_readjustment, header_row], gap=2, justify="space-around") return header_row, sheet_and_column_controls @app.cell def _(file, mo, sheet_and_column_controls): if file.name(): name_printout = mo.md(f"**{file.name()}**") else: name_printout = mo.md(f"No File Uploaded") file_uploader = mo.vstack([file,name_printout, sheet_and_column_controls], justify="space-around", align="center") return (file_uploader,) @app.function def apply_header_row(table_dataframe, header_row_value): """ Set a specific row as the header for a dataframe. Parameters: ----------- table_dataframe : pd.DataFrame The dataframe to modify header_row_value : int Row index to use as header (0-based) Returns: -------- tuple : (pd.DataFrame, list) Modified dataframe with new headers and list of column names """ if not table_dataframe.empty: # Convert header row to column names new_header = table_dataframe.iloc[header_row_value] # Create new dataframe without the header row new_df = table_dataframe.iloc[header_row_value+1:] # Set the new header new_df.columns = new_header # Get list of column names column_names = list(new_df.columns) return new_df, column_names return table_dataframe, [] @app.cell def _(excel_data, file, file_extension, load_file_dataframe, sheet_selector): table_dataframe_raw, table_column_names_raw = load_file_dataframe(file=file, file_extension=file_extension, sheet_selector=sheet_selector, excel_data=excel_data) return table_column_names_raw, table_dataframe_raw @app.cell def _( apply_header_readjustment, header_row, table_column_names_raw, table_dataframe_raw, ): if apply_header_readjustment is not None and apply_header_readjustment.value: table_dataframe, table_column_names = apply_header_row(table_dataframe_raw, header_row_value=header_row.value if header_row is not None else 0) else: table_dataframe, table_column_names = table_dataframe_raw, table_column_names_raw return table_column_names, table_dataframe @app.cell def _(mo, pd, table_column_names, table_dataframe): if not table_dataframe.empty: center_column_text = {col: "center" for col in table_column_names} table = mo.ui.table(table_dataframe, show_column_summaries=False, initial_selection=[0], wrapped_columns=table_column_names, text_justify_columns=center_column_text, label="**Select the Rows to Process**") else: table = ( mo.md('''###**No data available in the uploaded file**''') .batch(upload_a_file = mo.ui.table(data=pd.DataFrame( {"Upload File":"No File"}, index=[0]) ) ) ) return (table,) @app.cell def _(create_parameter_table, table_column_names, table_dataframe): if not table_dataframe.empty: column_selector = create_parameter_table( label="Select the Columns to Process", input_list=table_column_names, column_name="Column Options", selection_type="multi-cell", text_justify="center" ) else: column_selector = create_parameter_table( label="Select the Columns to Process", input_list=[], column_name="Column Options", selection_type="multi-cell", text_justify="center" ) return (column_selector,) @app.cell def _(column_selector, get_cell_values): columns_to_use = get_cell_values(column_selector) return (columns_to_use,) @app.cell def _(column_selector, mo, table): column_and_rows_sector = mo.vstack([column_selector, table], align="stretch", justify="start") return (column_and_rows_sector,) @app.cell def _(columns_to_use, convert_table_to_json_docs, pd, table): if table.value is not None: selected_rows = table.value fields_to_process = convert_table_to_json_docs(selected_rows, selected_columns=columns_to_use) else: selected_rows = pd.DataFrame([]) fields_to_process = [] return fields_to_process, selected_rows @app.cell def _(client, get_model_selection_table): if client is not None: model_selector, resources, model_id_list = get_model_selection_table(client=client, model_type="chat", filter_functionality=None, selection_mode="single-cell" ) else: model_selector = get_model_selection_table(client=None, selection_mode="single-cell") resources = model_id_list = None return (model_selector,) @app.cell def _(mo, model_selector): from ibm_watsonx_ai.foundation_models import ModelInference from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams # Create a form with multiple elements llm_parameters = ( mo.md(''' ###**LLM parameters:** {decoding_method} {repetition_penalty} {min_tokens} {max_tokens} {stop_sequences} ''') .batch( ### Preset Options decoding_method = mo.ui.dropdown(options=["greedy", "sample"], value="greedy",label="Decoding Method:"), min_tokens = mo.ui.number(start=1, stop=1, label="Minimum Output Tokens:"), max_tokens = mo.ui.number(start=1, stop=8096, value=500, label="Maximum Output Tokens:"), repetition_penalty = mo.ui.number(start=1.0, stop=2.0, step=0.01, label="Repetition Penalty:"), stop_sequences = mo.ui.text(label="Stopping Sequences:", value="['<|end_of_text|>','']", placeholder="List of Strings, e.g. ['<|end_of_text|>','']", full_width=False) ) ) llm_setup = mo.hstack([model_selector, llm_parameters], align="center", justify="space-around") return GenParams, ModelInference, llm_parameters, llm_setup @app.cell def _(model_selector): if model_selector.value: selected_model = model_selector.value[0]['value'] else: selected_model = "mistralai/mistral-large" return (selected_model,) @app.cell def _(GenParams, ModelInference, ast, client, llm_parameters, selected_model): if llm_parameters.value: params = { GenParams.DECODING_METHOD: llm_parameters.value['decoding_method'], GenParams.MAX_NEW_TOKENS: llm_parameters.value['max_tokens'], GenParams.MIN_NEW_TOKENS: llm_parameters.value['min_tokens'], GenParams.REPETITION_PENALTY: llm_parameters.value['repetition_penalty'], GenParams.STOP_SEQUENCES: ast.literal_eval(llm_parameters.value['stop_sequences']), GenParams.RETURN_OPTIONS: { 'input_text': False, 'generated_tokens': False, 'input_tokens': True, 'token_logprobs': False } } else: params = {} if client: inf_model = ModelInference(api_client=client, model_id=selected_model, params=params) else: inf_model = None return inf_model, params @app.cell def _(mo): prompt_template_folders = get_subfolder_paths("watsonx Sheet Processor MNB/prompt_templates", depth=1) prompt_template_model_paths = mo.ui.dropdown(options=prompt_template_folders, label="Prompt Template Variants *(Select Based on your Model)*") return (prompt_template_model_paths,) @app.cell def _(load_templates, prompt_template_model_paths): template_folder = str(prompt_template_model_paths.value) templates = load_templates(template_folder) return (templates,) @app.cell def _(mo): prompt_number_slider = mo.ui.slider(start=1, stop=5, value=1, step=1, label="Prompt Templates", show_value=True) return (prompt_number_slider,) @app.cell def _(mo, prompt_number_slider, prompt_template_model_paths): prompt_mechanics_stack = mo.vstack([prompt_number_slider, prompt_template_model_paths], align="start") return (prompt_mechanics_stack,) @app.cell def _(mo, prompt_mechanics_stack, template_selector_stack): prompt_setup_stack = mo.hstack([prompt_mechanics_stack, template_selector_stack], justify="space-around") return (prompt_setup_stack,) @app.cell def _(mo): get_pt_state, set_pt_state = mo.state(None) return get_pt_state, set_pt_state @app.function def get_subfolder_paths(base_path, depth=1): """Lists subfolder paths up to specified depth. Args: base_path: Directory to search in depth: Subdirectory levels to traverse (default: 1) Returns: Dict with folder names as keys and paths as values """ import os result = {} if depth <= 0 or not os.path.isdir(base_path): return result for entry in os.scandir(base_path): if entry.is_dir(): result[entry.name] = entry.path if depth > 1: # Recursively get subdirectories subfolders = get_subfolder_paths(entry.path, depth - 1) # Update result with subfolders result.update(subfolders) return result @app.cell def _(mo, prompt_number_slider, set_pt_state, templates): def update_state_from_templates(value): # Get current values from all template selectors template_values = [selector.value for selector in template_selectors] # Update the state with the list of values set_pt_state(template_values) # Return the list of values return template_values def create_template_dropdowns(num=1): return mo.ui.dropdown( options=templates, label=f"**Select Prompt {num} Template with Syntax:**", value="empty", on_change=update_state_from_templates ) template_selectors = [create_template_dropdowns(i) for i in range(1, prompt_number_slider.value+1)] template_selector_stack = mo.vstack(template_selectors, align="start") return (template_selector_stack,) @app.cell def _(fields_to_process): if fields_to_process: variable_names = [key for key in fields_to_process[0].keys() if key != "_marimo_row_id" and key != "upload_a_file"] else: variable_names = [] return (variable_names,) @app.cell def _(prompt_number_slider): def is_disabled(button_num): return prompt_number_slider.value < button_num return (is_disabled,) @app.cell def _(mo): def create_stats_from_variables(variable_names): """ Creates mo.stat objects for each variable name in the list. Labels are formatted as "Column Variable Tag: {index+1}" Values display the variable name itself. """ stats = [] for i, var_name in enumerate(variable_names): stat = mo.stat( value=f"{{{var_name}}}", label=f"Column Variable Tag {i+1}", bordered=True ) stats.append(stat) return mo.sidebar(stats, width="375") return (create_stats_from_variables,) @app.cell def _(create_stats_from_variables, variable_names): if variable_names: prompt_input_variables_sidebar = create_stats_from_variables(variable_names) else: prompt_input_variables_sidebar = None return (prompt_input_variables_sidebar,) @app.cell def _(get_pt_state, mo, prompt_number_slider): def create_prompt(num=1): # Wrap in a function to create reactive dependency on state def get_template_content(num=1): content = get_pt_state()[num-1] if get_pt_state() and num-1 < len(get_pt_state()) else "empty" return content prompt_column_label = mo.ui.text(label=f"**Add output column name for Prompt {num}:**", value=f"Added Column {num}") prompt_editor = mo.ui.code_editor(value=get_template_content(num), language="python", min_height=300, theme="dark") return [ prompt_column_label, prompt_editor ] prompts_raw = [create_prompt(i) for i in range(1, prompt_number_slider.value+1)] return (prompts_raw,) @app.cell def _(is_disabled, mo): run_prompt_button_1 = mo.ui.run_button(label="Run Prompt 1", disabled=is_disabled(1)) return (run_prompt_button_1,) @app.cell def _(is_disabled, mo): run_prompt_button_2 = mo.ui.run_button(label="Run Prompt 2", disabled=is_disabled(2)) return (run_prompt_button_2,) @app.cell def _(is_disabled, mo): run_prompt_button_3 = mo.ui.run_button(label="Run Prompt 3", disabled=is_disabled(3)) return (run_prompt_button_3,) @app.cell def _(is_disabled, mo): run_prompt_button_4 = mo.ui.run_button(label="Run Prompt 4", disabled=is_disabled(4)) return (run_prompt_button_4,) @app.cell def _(is_disabled, mo): run_prompt_button_5 = mo.ui.run_button(label="Run Prompt 5", disabled=is_disabled(5)) return (run_prompt_button_5,) @app.cell def _( append_llm_results_to_dataframe, fields_to_process, inf_model, params, process_with_llm, prompts_raw, results_table, run_prompt_button_1, selected_rows, ): if run_prompt_button_1.value: prompt_answers_1 = process_with_llm(fields_to_process=fields_to_process, prompt_template=prompts_raw[0][1].value, inf_model=inf_model, params=params, batch_size=10) append_llm_results_to_dataframe(target_dataframe=results_table, fields_to_process=fields_to_process, llm_results=prompt_answers_1, selection_table=selected_rows, column_name=prompts_raw[0][0].value) else: prompt_answers_1 = [] return @app.cell def _( append_llm_results_to_dataframe, fields_to_process, inf_model, params, process_with_llm, prompts_raw, results_table, run_prompt_button_2, selected_rows, ): if run_prompt_button_2.value: prompt_answers_2 = process_with_llm(fields_to_process=fields_to_process, prompt_template=prompts_raw[1][1].value, inf_model=inf_model, params=params, batch_size=10) append_llm_results_to_dataframe(target_dataframe=results_table, fields_to_process=fields_to_process, llm_results=prompt_answers_2, selection_table=selected_rows, column_name=prompts_raw[1][0].value) else: prompt_answers_2 = [] return @app.cell def _( append_llm_results_to_dataframe, fields_to_process, inf_model, params, process_with_llm, prompts_raw, results_table, run_prompt_button_3, selected_rows, ): if run_prompt_button_3.value: prompt_answers_3 = process_with_llm(fields_to_process=fields_to_process, prompt_template=prompts_raw[2][1].value, inf_model=inf_model, params=params, batch_size=10) append_llm_results_to_dataframe(target_dataframe=results_table, fields_to_process=fields_to_process, llm_results=prompt_answers_3, selection_table=selected_rows, column_name=prompts_raw[2][0].value) else: prompt_answers_3 = [] return @app.cell def _( append_llm_results_to_dataframe, fields_to_process, inf_model, params, process_with_llm, prompts_raw, results_table, run_prompt_button_4, selected_rows, ): if run_prompt_button_4.value: prompt_answers_4 = process_with_llm(fields_to_process=fields_to_process, prompt_template=prompts_raw[3][1].value, inf_model=inf_model, params=params, batch_size=10) append_llm_results_to_dataframe(target_dataframe=results_table, fields_to_process=fields_to_process, llm_results=prompt_answers_4, selection_table=selected_rows, column_name=prompts_raw[3][0].value) else: prompt_answers_4 = [] return @app.cell def _( append_llm_results_to_dataframe, fields_to_process, inf_model, params, process_with_llm, prompts_raw, results_table, run_prompt_button_5, selected_rows, ): if run_prompt_button_5.value: prompt_answers_5 = process_with_llm(fields_to_process=fields_to_process, prompt_template=prompts_raw[4][1].value, inf_model=inf_model, params=params, batch_size=10) append_llm_results_to_dataframe(target_dataframe=results_table, fields_to_process=fields_to_process, llm_results=prompt_answers_5, selection_table=selected_rows,column_name=prompts_raw[4][0].value) else: prompt_answers_5 = [] return @app.cell def _(mo, results_stack): ui_accordion_section_7 = mo.accordion( {"Section 7: **Run and View Results**": results_stack} ) ui_accordion_section_7 return @app.cell def _(show_variable_sidebar): show_variable_sidebar.right() return @app.cell def _(prompt_input_variables_sidebar, show_variable_sidebar): show_variable_sidebar.value is not False and prompt_input_variables_sidebar return @app.cell def _(table_dataframe): if not table_dataframe.empty: results_table = table_dataframe.copy() else: results_table = [] return (results_table,) @app.cell def _( mo, results_table, run_prompt_button_1, run_prompt_button_2, run_prompt_button_3, run_prompt_button_4, run_prompt_button_5, ): if run_prompt_button_1.value or run_prompt_button_2.value or run_prompt_button_3.value or run_prompt_button_4.value or run_prompt_button_5.value: results_table_view = mo.ui.table(results_table) else: results_table_view = mo.ui.table(results_table) return (results_table_view,) @app.cell def _(mo, prompts_raw): prompts = [mo.vstack([prompt[0], prompt[1]]) for prompt in prompts_raw] prompt_stack = mo.vstack(prompts, heights="equal", align="stretch", justify="space-around", gap=3) return (prompt_stack,) @app.cell def _( mo, results_table_view, run_prompt_button_1, run_prompt_button_2, run_prompt_button_3, run_prompt_button_4, run_prompt_button_5, ): run_button_stack = mo.hstack([run_prompt_button_1,run_prompt_button_2,run_prompt_button_3,run_prompt_button_4,run_prompt_button_5]) results_stack = mo.vstack([run_button_stack,results_table_view]) return (results_stack,) if __name__ == "__main__": app.run()