MilanM's picture
Upload 3 files
87b6e34 verified
from ibm_watsonx_ai import APIClient, Credentials
from typing import Dict, Optional, List, Union, Any, Set
import pandas as pd
import marimo as mo
import json
import glob
import io
import os
def get_cred_value(key, creds_var_name="baked_in_creds", default=""):
"""
Helper function to safely get a value from a credentials dictionary.
Searches for credentials in:
1. Global variables with the specified variable name
2. Imported modules containing the specified variable name
Args:
key: The key to look up in the credentials dictionary.
creds_var_name: The variable name of the credentials dictionary.
default: The default value to return if the key is not found.
Returns:
The value from the credentials dictionary if it exists and contains the key,
otherwise returns the default value.
"""
# Check if the credentials variable exists in globals
if creds_var_name in globals():
creds_dict = globals()[creds_var_name]
if isinstance(creds_dict, dict) and key in creds_dict:
return creds_dict[key]
# Check if credentials are in an imported module
import sys
for module_name, module_obj in sys.modules.items():
if hasattr(module_obj, creds_var_name):
creds_dict = getattr(module_obj, creds_var_name)
if isinstance(creds_dict, dict) and key in creds_dict:
return creds_dict[key]
return default
def get_key_by_value(dictionary, value):
for key, val in dictionary.items():
if val == value:
return key
return None
def markdown_spacing(number):
"""Convert a number to that many ' ' characters."""
return ' ' * number
def wrap_with_spaces(text_to_wrap, prefix_spaces=2, suffix_spaces=2):
"""Wrap text with non-breaking spaces on either side."""
prefix = markdown_spacing(prefix_spaces) if prefix_spaces > 0 else ""
suffix = markdown_spacing(suffix_spaces) if suffix_spaces > 0 else ""
return f"{prefix}{text_to_wrap}{suffix}"
def load_file_dataframe(file, file_extension, sheet_selector=None, excel_data=None, header_row=0):
"""
Load a dataframe from an uploaded file with customizable header and row skipping.
Parameters:
-----------
file : marimo.ui.file object
The file upload component containing the file data
file_extension : str
The extension of the uploaded file (.xlsx, .xls, .csv, .json)
sheet_selector : marimo.ui.dropdown, optional
Dropdown component for selecting Excel sheets
excel_data : BytesIO, optional
BytesIO object containing Excel data
header_row : int, optional
Row index to use as column headers (0-based). Default is 0 (first row).
Use None to have pandas generate default column names.
Returns:
--------
tuple
(pandas.DataFrame, list) - The loaded dataframe and list of column names
"""
dataframe = pd.DataFrame([])
column_names = []
if file.contents():
# Handle different file types
if file_extension in ['.xlsx', '.xls'] and sheet_selector is not None and sheet_selector.value:
# For Excel files - now we can safely access sheet_selector.value
excel_data.seek(0) # Reset buffer position
dataframe = pd.read_excel(
excel_data,
sheet_name=sheet_selector.value,
header=header_row,
engine="openpyxl" if file_extension == '.xlsx' else "xlrd"
)
column_names = list(dataframe.columns)
elif file_extension == '.csv':
# For CSV files
csv_data = io.StringIO(file.contents().decode('utf-8'))
dataframe = pd.read_csv(csv_data, header=header_row)
column_names = list(dataframe.columns)
elif file_extension == '.json':
# For JSON files
try:
json_data = json.loads(file.contents().decode('utf-8'))
# Handle different JSON structures
if isinstance(json_data, list):
dataframe = pd.DataFrame(json_data)
elif isinstance(json_data, dict):
# If it's a dictionary with nested structures, try to normalize it
if any(isinstance(v, (dict, list)) for v in json_data.values()):
# For nested JSON with consistent structure
dataframe = pd.json_normalize(json_data)
else:
# For flat JSON
dataframe = pd.DataFrame([json_data])
column_names = list(dataframe.columns)
except Exception as e:
print(f"Error parsing JSON: {e}")
return dataframe, column_names
def create_parameter_table(input_list, column_name="Active Options", label="Select the Parameters to set to Active",
selection_type="multi-cell", text_justify="center"):
"""
Creates a marimo table for parameter selection.
Args:
input_list: List of parameter names to display in the table
column_name: Name of the column (default: "Active Options")
label: Label for the table (default: "Select the Parameters to set to Active:")
selection_type: Selection type, either "single-cell" or "multi-cell" (default: "multi-cell")
text_justify: Text justification for the column (default: "center")
Returns:
A marimo table configured for parameter selection
"""
import marimo as mo
# Validate selection type
if selection_type not in ["single-cell", "multi-cell"]:
raise ValueError("selection_type must be either 'single-cell' or 'multi-cell'")
# Validate text justification
if text_justify not in ["left", "center", "right"]:
raise ValueError("text_justify must be one of: 'left', 'center', 'right'")
# Create the table
parameter_table = mo.ui.table(
label=f"**{label}**",
data={column_name: input_list},
selection=selection_type,
text_justify_columns={column_name: text_justify}
)
return parameter_table
def get_cell_values(parameter_options):
"""
Extract active parameter values from a mo.ui.table.
Args:
parameter_options: A mo.ui.table with cell selection enabled
Returns:
Dictionary mapping parameter names to boolean values (True/False)
"""
# Get all parameter names from the table data
all_params = set()
# Use the data property to get all options from the table
if hasattr(parameter_options, 'data'):
table_data = parameter_options.data
# Handle DataFrame-like structure
if hasattr(table_data, 'shape') and hasattr(table_data, 'iloc'):
for i in range(table_data.shape[0]):
# Get value from first column
if table_data.shape[1] > 0:
param = table_data.iloc[i, 0]
if param and isinstance(param, str):
all_params.add(param)
# Handle dict structure (common in marimo tables)
elif isinstance(table_data, dict):
# Get the first column's values
if len(table_data) > 0:
col_name = next(iter(table_data))
for param in table_data[col_name]:
if param and isinstance(param, str):
all_params.add(param)
# Create result dictionary with all parameters set to False by default
result = {param: False for param in all_params}
# Get the selected cells
if hasattr(parameter_options, 'value') and parameter_options.value is not None:
selected_cells = parameter_options.value
# Process selected cells
for cell in selected_cells:
if hasattr(cell, 'value') and cell.value in result:
result[cell.value] = True
elif isinstance(cell, dict) and 'value' in cell and cell['value'] in result:
result[cell['value']] = True
elif isinstance(cell, str) and cell in result:
result[cell] = True
return result
def convert_table_to_json_docs(df, selected_columns=None):
"""
Convert a pandas DataFrame or dictionary to a list of JSON documents.
Dynamically includes columns based on user selection.
Column names are standardized to lowercase with underscores instead of spaces
and special characters removed.
Args:
df: The DataFrame or dictionary to process
selected_columns: List of column names to include in the output documents
Returns:
list: A list of dictionaries, each representing a row as a JSON document
"""
import pandas as pd
import re
def standardize_key(key):
"""Convert a column name to lowercase with underscores instead of spaces and no special characters"""
if not isinstance(key, str):
return str(key).lower()
# Replace spaces with underscores and convert to lowercase
key = key.lower().replace(' ', '_')
# Remove special characters (keeping alphanumeric and underscores)
return re.sub(r'[^\w]', '', key)
# Handle case when input is a dictionary
if isinstance(df, dict):
# Filter the dictionary to include only selected columns
if selected_columns:
return [{standardize_key(k): df.get(k, None) for k in selected_columns}]
else:
# If no columns selected, return all key-value pairs with standardized keys
return [{standardize_key(k): v for k, v in df.items()}]
# Handle case when df is None
if df is None:
return []
# Ensure df is a DataFrame
if not isinstance(df, pd.DataFrame):
try:
df = pd.DataFrame(df)
except:
return [] # Return empty list if conversion fails
# Now check if DataFrame is empty
if df.empty:
return []
# Process selected_columns if it's a dictionary of true/false values
if isinstance(selected_columns, dict):
# Extract keys where value is True
selected_columns = [col for col, include in selected_columns.items() if include]
# If no columns are specifically selected, use all available columns
if not selected_columns or not isinstance(selected_columns, list) or len(selected_columns) == 0:
selected_columns = list(df.columns)
# Determine which columns exist in the DataFrame
available_columns = []
columns_lower = {col.lower(): col for col in df.columns if isinstance(col, str)}
for col in selected_columns:
if col in df.columns:
available_columns.append(col)
elif isinstance(col, str) and col.lower() in columns_lower:
available_columns.append(columns_lower[col.lower()])
# If no valid columns found, return empty list
if not available_columns:
return []
# Process rows
json_docs = []
for _, row in df.iterrows():
doc = {}
for col in available_columns:
value = row[col]
# Standardize the column name when adding to document
std_col = standardize_key(col)
doc[std_col] = None if pd.isna(value) else value
json_docs.append(doc)
return json_docs
def filter_models_by_function(resources, function_type="prompt_chat"):
"""
Filter model IDs from resources list that have a specific function type
Args:
resources (list): List of model resource objects
function_type (str, optional): Function type to filter by. Defaults to "prompt_chat".
Returns:
list: List of model IDs that have the specified function
"""
filtered_model_ids = []
if not resources or not isinstance(resources, list):
return filtered_model_ids
for model in resources:
# Check if the model has a functions attribute
if "functions" in model and isinstance(model["functions"], list):
# Check if any function has the matching id
has_function = any(
func.get("id") == function_type
for func in model["functions"]
if isinstance(func, dict)
)
if has_function and "model_id" in model:
filtered_model_ids.append(model["model_id"])
return filtered_model_ids
def get_model_selection_table(client=None, model_type="all", filter_functionality=None, selection_mode="single-cell"):
"""
Creates and displays a table for model selection based on specified parameters.
Args:
client: The client object for API calls. If None, returns default models.
model_type (str): Type of models to display. Options: "all", "chat", "embedding".
filter_functionality (str, optional): Filter models by functionality type.
Options include: "image_chat", "text_chat", "autoai_rag",
"text_generation", "multilingual", etc.
selection_mode (str): Mode for selecting table entries. Options: "single", "single-cell".
Defaults to "single-cell".
Returns:
The selected model ID from the displayed table.
"""
# Default model list if client is None
default_models = ['mistralai/mistral-large']
if client is None:
# If no client, use default models
available_models = default_models
selection = mo.ui.table(
available_models,
selection="single",
label="Select a model to use.",
page_size=30,
)
return selection
# Get appropriate model specs based on model_type
if model_type == "chat":
model_specs = client.foundation_models.get_chat_model_specs()
elif model_type == "embedding":
model_specs = client.foundation_models.get_embeddings_model_specs()
else:
model_specs = client.foundation_models.get_model_specs()
# Extract resources from model specs
resources = model_specs.get("resources", [])
# Filter by functionality if specified
if filter_functionality and resources:
model_id_list = filter_models_by_function(resources, filter_functionality)
else:
# Create list of model IDs if no filtering
model_id_list = [resource["model_id"] for resource in resources]
# If no models available after filtering, use defaults
if not model_id_list:
model_id_list = default_models
# Create and display selection table
model_selector = mo.ui.table(
model_id_list,
selection=selection_mode,
label="Select a model to use.",
page_size=30,
initial_selection = [("0", "value")] if selection_mode == "single-cell" else [0]
### For single-cell it must have [("<row_nr as a string>","column_name string")] to work as initial value
)
return model_selector, resources, model_id_list
def _enforce_model_selection(model_selection, model_id_list):
# If nothing is selected (empty list) or value is None
if not model_selection.value:
# Reset to first item
model = 0
model_selection._value = model_id_list[model]
print(model_selection.value)
return model_selection.value
def update_max_tokens_limit(model_selection, resources, model_id_list):
# Default value
default_max_tokens = 4096
try:
# Check if we have a selection and resources
if model_selection.value is None or not hasattr(model_selection, 'value'):
print("No model selection or selection has no value")
return default_max_tokens
if not resources or not isinstance(resources, list) or len(resources) == 0:
print("Resources is empty or not a list")
return default_max_tokens
# Get the model ID - handle both index selection and direct string selection
selected_value = model_selection.value
print(f"Raw selection value: {selected_value}")
# If it's an array with indices
if isinstance(selected_value, list) and len(selected_value) > 0:
if isinstance(selected_value[0], int) and 0 <= selected_value[0] < len(model_id_list):
selected_model_id = model_id_list[selected_value[0]]
else:
selected_model_id = str(selected_value[0]) # Convert to string if needed
else:
selected_model_id = str(selected_value) # Direct value
print(f"Selected model ID: {selected_model_id}")
# Find the model
for model in resources:
model_id = model.get("model_id")
if model_id == selected_model_id:
if "model_limits" in model and "max_output_tokens" in model["model_limits"]:
return model["model_limits"]["max_output_tokens"]
break
except Exception as e:
print(f"Error: {e}")
return default_max_tokens
def load_templates(
folder_path: str,
file_extensions: Optional[List[str]] = None,
strip_whitespace: bool = True
) -> Dict[str, str]:
"""
Load template files from a specified folder into a dictionary.
Args:
folder_path: Path to the folder containing template files
file_extensions: List of file extensions to include (default: ['.txt', '.md'])
strip_whitespace: Whether to strip leading/trailing whitespace from templates (default: True)
Returns:
Dictionary with filename (without extension) as key and file content as value
"""
# Default extensions if none provided
if file_extensions is None:
file_extensions = ['.txt', '.md']
# Ensure extensions start with a dot
file_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in file_extensions]
templates = {"empty": " "} # Default empty template
# Create glob patterns for each extension
patterns = [os.path.join(folder_path, f'*{ext}') for ext in file_extensions]
# Find all matching files
for pattern in patterns:
for file_path in glob.glob(pattern):
try:
# Extract filename without extension to use as key
filename = os.path.basename(file_path)
template_name = os.path.splitext(filename)[0]
# Read file content
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Strip whitespace if specified
if strip_whitespace:
content = content.strip()
templates[template_name] = content
except Exception as e:
print(f"Error loading template from {file_path}: {str(e)}")
return templates