|
from ibm_watsonx_ai import APIClient, Credentials |
|
from typing import Dict, Optional, List, Union, Any, Set |
|
import pandas as pd |
|
import marimo as mo |
|
import json |
|
import glob |
|
import io |
|
import os |
|
|
|
def get_cred_value(key, creds_var_name="baked_in_creds", default=""): |
|
""" |
|
Helper function to safely get a value from a credentials dictionary. |
|
|
|
Searches for credentials in: |
|
1. Global variables with the specified variable name |
|
2. Imported modules containing the specified variable name |
|
|
|
Args: |
|
key: The key to look up in the credentials dictionary. |
|
creds_var_name: The variable name of the credentials dictionary. |
|
default: The default value to return if the key is not found. |
|
Returns: |
|
The value from the credentials dictionary if it exists and contains the key, |
|
otherwise returns the default value. |
|
""" |
|
|
|
if creds_var_name in globals(): |
|
creds_dict = globals()[creds_var_name] |
|
if isinstance(creds_dict, dict) and key in creds_dict: |
|
return creds_dict[key] |
|
|
|
|
|
import sys |
|
for module_name, module_obj in sys.modules.items(): |
|
if hasattr(module_obj, creds_var_name): |
|
creds_dict = getattr(module_obj, creds_var_name) |
|
if isinstance(creds_dict, dict) and key in creds_dict: |
|
return creds_dict[key] |
|
|
|
return default |
|
|
|
def get_key_by_value(dictionary, value): |
|
for key, val in dictionary.items(): |
|
if val == value: |
|
return key |
|
return None |
|
|
|
def markdown_spacing(number): |
|
"""Convert a number to that many ' ' characters.""" |
|
return ' ' * number |
|
|
|
def wrap_with_spaces(text_to_wrap, prefix_spaces=2, suffix_spaces=2): |
|
"""Wrap text with non-breaking spaces on either side.""" |
|
prefix = markdown_spacing(prefix_spaces) if prefix_spaces > 0 else "" |
|
suffix = markdown_spacing(suffix_spaces) if suffix_spaces > 0 else "" |
|
return f"{prefix}{text_to_wrap}{suffix}" |
|
|
|
|
|
def load_file_dataframe(file, file_extension, sheet_selector=None, excel_data=None, header_row=0): |
|
""" |
|
Load a dataframe from an uploaded file with customizable header and row skipping. |
|
|
|
Parameters: |
|
----------- |
|
file : marimo.ui.file object |
|
The file upload component containing the file data |
|
file_extension : str |
|
The extension of the uploaded file (.xlsx, .xls, .csv, .json) |
|
sheet_selector : marimo.ui.dropdown, optional |
|
Dropdown component for selecting Excel sheets |
|
excel_data : BytesIO, optional |
|
BytesIO object containing Excel data |
|
header_row : int, optional |
|
Row index to use as column headers (0-based). Default is 0 (first row). |
|
Use None to have pandas generate default column names. |
|
|
|
Returns: |
|
-------- |
|
tuple |
|
(pandas.DataFrame, list) - The loaded dataframe and list of column names |
|
""" |
|
|
|
dataframe = pd.DataFrame([]) |
|
column_names = [] |
|
|
|
if file.contents(): |
|
|
|
if file_extension in ['.xlsx', '.xls'] and sheet_selector is not None and sheet_selector.value: |
|
|
|
excel_data.seek(0) |
|
dataframe = pd.read_excel( |
|
excel_data, |
|
sheet_name=sheet_selector.value, |
|
header=header_row, |
|
engine="openpyxl" if file_extension == '.xlsx' else "xlrd" |
|
) |
|
column_names = list(dataframe.columns) |
|
elif file_extension == '.csv': |
|
|
|
csv_data = io.StringIO(file.contents().decode('utf-8')) |
|
dataframe = pd.read_csv(csv_data, header=header_row) |
|
column_names = list(dataframe.columns) |
|
elif file_extension == '.json': |
|
|
|
try: |
|
json_data = json.loads(file.contents().decode('utf-8')) |
|
|
|
if isinstance(json_data, list): |
|
dataframe = pd.DataFrame(json_data) |
|
elif isinstance(json_data, dict): |
|
|
|
if any(isinstance(v, (dict, list)) for v in json_data.values()): |
|
|
|
dataframe = pd.json_normalize(json_data) |
|
else: |
|
|
|
dataframe = pd.DataFrame([json_data]) |
|
column_names = list(dataframe.columns) |
|
except Exception as e: |
|
print(f"Error parsing JSON: {e}") |
|
|
|
return dataframe, column_names |
|
|
|
|
|
def create_parameter_table(input_list, column_name="Active Options", label="Select the Parameters to set to Active", |
|
selection_type="multi-cell", text_justify="center"): |
|
""" |
|
Creates a marimo table for parameter selection. |
|
|
|
Args: |
|
input_list: List of parameter names to display in the table |
|
column_name: Name of the column (default: "Active Options") |
|
label: Label for the table (default: "Select the Parameters to set to Active:") |
|
selection_type: Selection type, either "single-cell" or "multi-cell" (default: "multi-cell") |
|
text_justify: Text justification for the column (default: "center") |
|
|
|
Returns: |
|
A marimo table configured for parameter selection |
|
""" |
|
import marimo as mo |
|
|
|
|
|
if selection_type not in ["single-cell", "multi-cell"]: |
|
raise ValueError("selection_type must be either 'single-cell' or 'multi-cell'") |
|
|
|
|
|
if text_justify not in ["left", "center", "right"]: |
|
raise ValueError("text_justify must be one of: 'left', 'center', 'right'") |
|
|
|
|
|
parameter_table = mo.ui.table( |
|
label=f"**{label}**", |
|
data={column_name: input_list}, |
|
selection=selection_type, |
|
text_justify_columns={column_name: text_justify} |
|
) |
|
|
|
return parameter_table |
|
|
|
def get_cell_values(parameter_options): |
|
""" |
|
Extract active parameter values from a mo.ui.table. |
|
|
|
Args: |
|
parameter_options: A mo.ui.table with cell selection enabled |
|
|
|
Returns: |
|
Dictionary mapping parameter names to boolean values (True/False) |
|
""" |
|
|
|
all_params = set() |
|
|
|
|
|
if hasattr(parameter_options, 'data'): |
|
table_data = parameter_options.data |
|
|
|
|
|
if hasattr(table_data, 'shape') and hasattr(table_data, 'iloc'): |
|
for i in range(table_data.shape[0]): |
|
|
|
if table_data.shape[1] > 0: |
|
param = table_data.iloc[i, 0] |
|
if param and isinstance(param, str): |
|
all_params.add(param) |
|
|
|
|
|
elif isinstance(table_data, dict): |
|
|
|
if len(table_data) > 0: |
|
col_name = next(iter(table_data)) |
|
for param in table_data[col_name]: |
|
if param and isinstance(param, str): |
|
all_params.add(param) |
|
|
|
|
|
result = {param: False for param in all_params} |
|
|
|
|
|
if hasattr(parameter_options, 'value') and parameter_options.value is not None: |
|
selected_cells = parameter_options.value |
|
|
|
|
|
for cell in selected_cells: |
|
if hasattr(cell, 'value') and cell.value in result: |
|
result[cell.value] = True |
|
elif isinstance(cell, dict) and 'value' in cell and cell['value'] in result: |
|
result[cell['value']] = True |
|
elif isinstance(cell, str) and cell in result: |
|
result[cell] = True |
|
|
|
return result |
|
|
|
def convert_table_to_json_docs(df, selected_columns=None): |
|
""" |
|
Convert a pandas DataFrame or dictionary to a list of JSON documents. |
|
Dynamically includes columns based on user selection. |
|
Column names are standardized to lowercase with underscores instead of spaces |
|
and special characters removed. |
|
|
|
Args: |
|
df: The DataFrame or dictionary to process |
|
selected_columns: List of column names to include in the output documents |
|
|
|
Returns: |
|
list: A list of dictionaries, each representing a row as a JSON document |
|
""" |
|
import pandas as pd |
|
import re |
|
|
|
def standardize_key(key): |
|
"""Convert a column name to lowercase with underscores instead of spaces and no special characters""" |
|
if not isinstance(key, str): |
|
return str(key).lower() |
|
|
|
key = key.lower().replace(' ', '_') |
|
|
|
return re.sub(r'[^\w]', '', key) |
|
|
|
|
|
if isinstance(df, dict): |
|
|
|
if selected_columns: |
|
return [{standardize_key(k): df.get(k, None) for k in selected_columns}] |
|
else: |
|
|
|
return [{standardize_key(k): v for k, v in df.items()}] |
|
|
|
|
|
if df is None: |
|
return [] |
|
|
|
|
|
if not isinstance(df, pd.DataFrame): |
|
try: |
|
df = pd.DataFrame(df) |
|
except: |
|
return [] |
|
|
|
|
|
if df.empty: |
|
return [] |
|
|
|
|
|
if isinstance(selected_columns, dict): |
|
|
|
selected_columns = [col for col, include in selected_columns.items() if include] |
|
|
|
|
|
if not selected_columns or not isinstance(selected_columns, list) or len(selected_columns) == 0: |
|
selected_columns = list(df.columns) |
|
|
|
|
|
available_columns = [] |
|
columns_lower = {col.lower(): col for col in df.columns if isinstance(col, str)} |
|
|
|
for col in selected_columns: |
|
if col in df.columns: |
|
available_columns.append(col) |
|
elif isinstance(col, str) and col.lower() in columns_lower: |
|
available_columns.append(columns_lower[col.lower()]) |
|
|
|
|
|
if not available_columns: |
|
return [] |
|
|
|
|
|
json_docs = [] |
|
for _, row in df.iterrows(): |
|
doc = {} |
|
for col in available_columns: |
|
value = row[col] |
|
|
|
std_col = standardize_key(col) |
|
doc[std_col] = None if pd.isna(value) else value |
|
json_docs.append(doc) |
|
|
|
return json_docs |
|
|
|
def filter_models_by_function(resources, function_type="prompt_chat"): |
|
""" |
|
Filter model IDs from resources list that have a specific function type |
|
|
|
Args: |
|
resources (list): List of model resource objects |
|
function_type (str, optional): Function type to filter by. Defaults to "prompt_chat". |
|
|
|
Returns: |
|
list: List of model IDs that have the specified function |
|
""" |
|
filtered_model_ids = [] |
|
|
|
if not resources or not isinstance(resources, list): |
|
return filtered_model_ids |
|
|
|
for model in resources: |
|
|
|
if "functions" in model and isinstance(model["functions"], list): |
|
|
|
has_function = any( |
|
func.get("id") == function_type |
|
for func in model["functions"] |
|
if isinstance(func, dict) |
|
) |
|
|
|
if has_function and "model_id" in model: |
|
filtered_model_ids.append(model["model_id"]) |
|
|
|
return filtered_model_ids |
|
|
|
|
|
def get_model_selection_table(client=None, model_type="all", filter_functionality=None, selection_mode="single-cell"): |
|
""" |
|
Creates and displays a table for model selection based on specified parameters. |
|
|
|
Args: |
|
client: The client object for API calls. If None, returns default models. |
|
model_type (str): Type of models to display. Options: "all", "chat", "embedding". |
|
filter_functionality (str, optional): Filter models by functionality type. |
|
Options include: "image_chat", "text_chat", "autoai_rag", |
|
"text_generation", "multilingual", etc. |
|
selection_mode (str): Mode for selecting table entries. Options: "single", "single-cell". |
|
Defaults to "single-cell". |
|
|
|
Returns: |
|
The selected model ID from the displayed table. |
|
""" |
|
|
|
default_models = ['mistralai/mistral-large'] |
|
|
|
if client is None: |
|
|
|
available_models = default_models |
|
selection = mo.ui.table( |
|
available_models, |
|
selection="single", |
|
label="Select a model to use.", |
|
page_size=30, |
|
) |
|
return selection |
|
|
|
|
|
if model_type == "chat": |
|
model_specs = client.foundation_models.get_chat_model_specs() |
|
elif model_type == "embedding": |
|
model_specs = client.foundation_models.get_embeddings_model_specs() |
|
else: |
|
model_specs = client.foundation_models.get_model_specs() |
|
|
|
|
|
resources = model_specs.get("resources", []) |
|
|
|
|
|
if filter_functionality and resources: |
|
model_id_list = filter_models_by_function(resources, filter_functionality) |
|
else: |
|
|
|
model_id_list = [resource["model_id"] for resource in resources] |
|
|
|
|
|
if not model_id_list: |
|
model_id_list = default_models |
|
|
|
|
|
model_selector = mo.ui.table( |
|
model_id_list, |
|
selection=selection_mode, |
|
label="Select a model to use.", |
|
page_size=30, |
|
initial_selection = [("0", "value")] if selection_mode == "single-cell" else [0] |
|
|
|
) |
|
|
|
return model_selector, resources, model_id_list |
|
|
|
def _enforce_model_selection(model_selection, model_id_list): |
|
|
|
if not model_selection.value: |
|
|
|
model = 0 |
|
model_selection._value = model_id_list[model] |
|
print(model_selection.value) |
|
return model_selection.value |
|
|
|
def update_max_tokens_limit(model_selection, resources, model_id_list): |
|
|
|
default_max_tokens = 4096 |
|
|
|
try: |
|
|
|
if model_selection.value is None or not hasattr(model_selection, 'value'): |
|
print("No model selection or selection has no value") |
|
return default_max_tokens |
|
|
|
if not resources or not isinstance(resources, list) or len(resources) == 0: |
|
print("Resources is empty or not a list") |
|
return default_max_tokens |
|
|
|
|
|
selected_value = model_selection.value |
|
print(f"Raw selection value: {selected_value}") |
|
|
|
|
|
if isinstance(selected_value, list) and len(selected_value) > 0: |
|
if isinstance(selected_value[0], int) and 0 <= selected_value[0] < len(model_id_list): |
|
selected_model_id = model_id_list[selected_value[0]] |
|
else: |
|
selected_model_id = str(selected_value[0]) |
|
else: |
|
selected_model_id = str(selected_value) |
|
|
|
print(f"Selected model ID: {selected_model_id}") |
|
|
|
|
|
for model in resources: |
|
model_id = model.get("model_id") |
|
if model_id == selected_model_id: |
|
if "model_limits" in model and "max_output_tokens" in model["model_limits"]: |
|
return model["model_limits"]["max_output_tokens"] |
|
break |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
|
|
return default_max_tokens |
|
|
|
|
|
def load_templates( |
|
folder_path: str, |
|
file_extensions: Optional[List[str]] = None, |
|
strip_whitespace: bool = True |
|
) -> Dict[str, str]: |
|
""" |
|
Load template files from a specified folder into a dictionary. |
|
|
|
Args: |
|
folder_path: Path to the folder containing template files |
|
file_extensions: List of file extensions to include (default: ['.txt', '.md']) |
|
strip_whitespace: Whether to strip leading/trailing whitespace from templates (default: True) |
|
|
|
Returns: |
|
Dictionary with filename (without extension) as key and file content as value |
|
""" |
|
|
|
if file_extensions is None: |
|
file_extensions = ['.txt', '.md'] |
|
|
|
|
|
file_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in file_extensions] |
|
|
|
templates = {"empty": " "} |
|
|
|
|
|
patterns = [os.path.join(folder_path, f'*{ext}') for ext in file_extensions] |
|
|
|
|
|
for pattern in patterns: |
|
for file_path in glob.glob(pattern): |
|
try: |
|
|
|
filename = os.path.basename(file_path) |
|
template_name = os.path.splitext(filename)[0] |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
|
|
|
|
if strip_whitespace: |
|
content = content.strip() |
|
|
|
templates[template_name] = content |
|
|
|
except Exception as e: |
|
print(f"Error loading template from {file_path}: {str(e)}") |
|
|
|
return templates |
|
|