File size: 18,971 Bytes
87b6e34 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 |
from ibm_watsonx_ai import APIClient, Credentials
from typing import Dict, Optional, List, Union, Any, Set
import pandas as pd
import marimo as mo
import json
import glob
import io
import os
def get_cred_value(key, creds_var_name="baked_in_creds", default=""):
"""
Helper function to safely get a value from a credentials dictionary.
Searches for credentials in:
1. Global variables with the specified variable name
2. Imported modules containing the specified variable name
Args:
key: The key to look up in the credentials dictionary.
creds_var_name: The variable name of the credentials dictionary.
default: The default value to return if the key is not found.
Returns:
The value from the credentials dictionary if it exists and contains the key,
otherwise returns the default value.
"""
# Check if the credentials variable exists in globals
if creds_var_name in globals():
creds_dict = globals()[creds_var_name]
if isinstance(creds_dict, dict) and key in creds_dict:
return creds_dict[key]
# Check if credentials are in an imported module
import sys
for module_name, module_obj in sys.modules.items():
if hasattr(module_obj, creds_var_name):
creds_dict = getattr(module_obj, creds_var_name)
if isinstance(creds_dict, dict) and key in creds_dict:
return creds_dict[key]
return default
def get_key_by_value(dictionary, value):
for key, val in dictionary.items():
if val == value:
return key
return None
def markdown_spacing(number):
"""Convert a number to that many ' ' characters."""
return ' ' * number
def wrap_with_spaces(text_to_wrap, prefix_spaces=2, suffix_spaces=2):
"""Wrap text with non-breaking spaces on either side."""
prefix = markdown_spacing(prefix_spaces) if prefix_spaces > 0 else ""
suffix = markdown_spacing(suffix_spaces) if suffix_spaces > 0 else ""
return f"{prefix}{text_to_wrap}{suffix}"
def load_file_dataframe(file, file_extension, sheet_selector=None, excel_data=None, header_row=0):
"""
Load a dataframe from an uploaded file with customizable header and row skipping.
Parameters:
-----------
file : marimo.ui.file object
The file upload component containing the file data
file_extension : str
The extension of the uploaded file (.xlsx, .xls, .csv, .json)
sheet_selector : marimo.ui.dropdown, optional
Dropdown component for selecting Excel sheets
excel_data : BytesIO, optional
BytesIO object containing Excel data
header_row : int, optional
Row index to use as column headers (0-based). Default is 0 (first row).
Use None to have pandas generate default column names.
Returns:
--------
tuple
(pandas.DataFrame, list) - The loaded dataframe and list of column names
"""
dataframe = pd.DataFrame([])
column_names = []
if file.contents():
# Handle different file types
if file_extension in ['.xlsx', '.xls'] and sheet_selector is not None and sheet_selector.value:
# For Excel files - now we can safely access sheet_selector.value
excel_data.seek(0) # Reset buffer position
dataframe = pd.read_excel(
excel_data,
sheet_name=sheet_selector.value,
header=header_row,
engine="openpyxl" if file_extension == '.xlsx' else "xlrd"
)
column_names = list(dataframe.columns)
elif file_extension == '.csv':
# For CSV files
csv_data = io.StringIO(file.contents().decode('utf-8'))
dataframe = pd.read_csv(csv_data, header=header_row)
column_names = list(dataframe.columns)
elif file_extension == '.json':
# For JSON files
try:
json_data = json.loads(file.contents().decode('utf-8'))
# Handle different JSON structures
if isinstance(json_data, list):
dataframe = pd.DataFrame(json_data)
elif isinstance(json_data, dict):
# If it's a dictionary with nested structures, try to normalize it
if any(isinstance(v, (dict, list)) for v in json_data.values()):
# For nested JSON with consistent structure
dataframe = pd.json_normalize(json_data)
else:
# For flat JSON
dataframe = pd.DataFrame([json_data])
column_names = list(dataframe.columns)
except Exception as e:
print(f"Error parsing JSON: {e}")
return dataframe, column_names
def create_parameter_table(input_list, column_name="Active Options", label="Select the Parameters to set to Active",
selection_type="multi-cell", text_justify="center"):
"""
Creates a marimo table for parameter selection.
Args:
input_list: List of parameter names to display in the table
column_name: Name of the column (default: "Active Options")
label: Label for the table (default: "Select the Parameters to set to Active:")
selection_type: Selection type, either "single-cell" or "multi-cell" (default: "multi-cell")
text_justify: Text justification for the column (default: "center")
Returns:
A marimo table configured for parameter selection
"""
import marimo as mo
# Validate selection type
if selection_type not in ["single-cell", "multi-cell"]:
raise ValueError("selection_type must be either 'single-cell' or 'multi-cell'")
# Validate text justification
if text_justify not in ["left", "center", "right"]:
raise ValueError("text_justify must be one of: 'left', 'center', 'right'")
# Create the table
parameter_table = mo.ui.table(
label=f"**{label}**",
data={column_name: input_list},
selection=selection_type,
text_justify_columns={column_name: text_justify}
)
return parameter_table
def get_cell_values(parameter_options):
"""
Extract active parameter values from a mo.ui.table.
Args:
parameter_options: A mo.ui.table with cell selection enabled
Returns:
Dictionary mapping parameter names to boolean values (True/False)
"""
# Get all parameter names from the table data
all_params = set()
# Use the data property to get all options from the table
if hasattr(parameter_options, 'data'):
table_data = parameter_options.data
# Handle DataFrame-like structure
if hasattr(table_data, 'shape') and hasattr(table_data, 'iloc'):
for i in range(table_data.shape[0]):
# Get value from first column
if table_data.shape[1] > 0:
param = table_data.iloc[i, 0]
if param and isinstance(param, str):
all_params.add(param)
# Handle dict structure (common in marimo tables)
elif isinstance(table_data, dict):
# Get the first column's values
if len(table_data) > 0:
col_name = next(iter(table_data))
for param in table_data[col_name]:
if param and isinstance(param, str):
all_params.add(param)
# Create result dictionary with all parameters set to False by default
result = {param: False for param in all_params}
# Get the selected cells
if hasattr(parameter_options, 'value') and parameter_options.value is not None:
selected_cells = parameter_options.value
# Process selected cells
for cell in selected_cells:
if hasattr(cell, 'value') and cell.value in result:
result[cell.value] = True
elif isinstance(cell, dict) and 'value' in cell and cell['value'] in result:
result[cell['value']] = True
elif isinstance(cell, str) and cell in result:
result[cell] = True
return result
def convert_table_to_json_docs(df, selected_columns=None):
"""
Convert a pandas DataFrame or dictionary to a list of JSON documents.
Dynamically includes columns based on user selection.
Column names are standardized to lowercase with underscores instead of spaces
and special characters removed.
Args:
df: The DataFrame or dictionary to process
selected_columns: List of column names to include in the output documents
Returns:
list: A list of dictionaries, each representing a row as a JSON document
"""
import pandas as pd
import re
def standardize_key(key):
"""Convert a column name to lowercase with underscores instead of spaces and no special characters"""
if not isinstance(key, str):
return str(key).lower()
# Replace spaces with underscores and convert to lowercase
key = key.lower().replace(' ', '_')
# Remove special characters (keeping alphanumeric and underscores)
return re.sub(r'[^\w]', '', key)
# Handle case when input is a dictionary
if isinstance(df, dict):
# Filter the dictionary to include only selected columns
if selected_columns:
return [{standardize_key(k): df.get(k, None) for k in selected_columns}]
else:
# If no columns selected, return all key-value pairs with standardized keys
return [{standardize_key(k): v for k, v in df.items()}]
# Handle case when df is None
if df is None:
return []
# Ensure df is a DataFrame
if not isinstance(df, pd.DataFrame):
try:
df = pd.DataFrame(df)
except:
return [] # Return empty list if conversion fails
# Now check if DataFrame is empty
if df.empty:
return []
# Process selected_columns if it's a dictionary of true/false values
if isinstance(selected_columns, dict):
# Extract keys where value is True
selected_columns = [col for col, include in selected_columns.items() if include]
# If no columns are specifically selected, use all available columns
if not selected_columns or not isinstance(selected_columns, list) or len(selected_columns) == 0:
selected_columns = list(df.columns)
# Determine which columns exist in the DataFrame
available_columns = []
columns_lower = {col.lower(): col for col in df.columns if isinstance(col, str)}
for col in selected_columns:
if col in df.columns:
available_columns.append(col)
elif isinstance(col, str) and col.lower() in columns_lower:
available_columns.append(columns_lower[col.lower()])
# If no valid columns found, return empty list
if not available_columns:
return []
# Process rows
json_docs = []
for _, row in df.iterrows():
doc = {}
for col in available_columns:
value = row[col]
# Standardize the column name when adding to document
std_col = standardize_key(col)
doc[std_col] = None if pd.isna(value) else value
json_docs.append(doc)
return json_docs
def filter_models_by_function(resources, function_type="prompt_chat"):
"""
Filter model IDs from resources list that have a specific function type
Args:
resources (list): List of model resource objects
function_type (str, optional): Function type to filter by. Defaults to "prompt_chat".
Returns:
list: List of model IDs that have the specified function
"""
filtered_model_ids = []
if not resources or not isinstance(resources, list):
return filtered_model_ids
for model in resources:
# Check if the model has a functions attribute
if "functions" in model and isinstance(model["functions"], list):
# Check if any function has the matching id
has_function = any(
func.get("id") == function_type
for func in model["functions"]
if isinstance(func, dict)
)
if has_function and "model_id" in model:
filtered_model_ids.append(model["model_id"])
return filtered_model_ids
def get_model_selection_table(client=None, model_type="all", filter_functionality=None, selection_mode="single-cell"):
"""
Creates and displays a table for model selection based on specified parameters.
Args:
client: The client object for API calls. If None, returns default models.
model_type (str): Type of models to display. Options: "all", "chat", "embedding".
filter_functionality (str, optional): Filter models by functionality type.
Options include: "image_chat", "text_chat", "autoai_rag",
"text_generation", "multilingual", etc.
selection_mode (str): Mode for selecting table entries. Options: "single", "single-cell".
Defaults to "single-cell".
Returns:
The selected model ID from the displayed table.
"""
# Default model list if client is None
default_models = ['mistralai/mistral-large']
if client is None:
# If no client, use default models
available_models = default_models
selection = mo.ui.table(
available_models,
selection="single",
label="Select a model to use.",
page_size=30,
)
return selection
# Get appropriate model specs based on model_type
if model_type == "chat":
model_specs = client.foundation_models.get_chat_model_specs()
elif model_type == "embedding":
model_specs = client.foundation_models.get_embeddings_model_specs()
else:
model_specs = client.foundation_models.get_model_specs()
# Extract resources from model specs
resources = model_specs.get("resources", [])
# Filter by functionality if specified
if filter_functionality and resources:
model_id_list = filter_models_by_function(resources, filter_functionality)
else:
# Create list of model IDs if no filtering
model_id_list = [resource["model_id"] for resource in resources]
# If no models available after filtering, use defaults
if not model_id_list:
model_id_list = default_models
# Create and display selection table
model_selector = mo.ui.table(
model_id_list,
selection=selection_mode,
label="Select a model to use.",
page_size=30,
initial_selection = [("0", "value")] if selection_mode == "single-cell" else [0]
### For single-cell it must have [("<row_nr as a string>","column_name string")] to work as initial value
)
return model_selector, resources, model_id_list
def _enforce_model_selection(model_selection, model_id_list):
# If nothing is selected (empty list) or value is None
if not model_selection.value:
# Reset to first item
model = 0
model_selection._value = model_id_list[model]
print(model_selection.value)
return model_selection.value
def update_max_tokens_limit(model_selection, resources, model_id_list):
# Default value
default_max_tokens = 4096
try:
# Check if we have a selection and resources
if model_selection.value is None or not hasattr(model_selection, 'value'):
print("No model selection or selection has no value")
return default_max_tokens
if not resources or not isinstance(resources, list) or len(resources) == 0:
print("Resources is empty or not a list")
return default_max_tokens
# Get the model ID - handle both index selection and direct string selection
selected_value = model_selection.value
print(f"Raw selection value: {selected_value}")
# If it's an array with indices
if isinstance(selected_value, list) and len(selected_value) > 0:
if isinstance(selected_value[0], int) and 0 <= selected_value[0] < len(model_id_list):
selected_model_id = model_id_list[selected_value[0]]
else:
selected_model_id = str(selected_value[0]) # Convert to string if needed
else:
selected_model_id = str(selected_value) # Direct value
print(f"Selected model ID: {selected_model_id}")
# Find the model
for model in resources:
model_id = model.get("model_id")
if model_id == selected_model_id:
if "model_limits" in model and "max_output_tokens" in model["model_limits"]:
return model["model_limits"]["max_output_tokens"]
break
except Exception as e:
print(f"Error: {e}")
return default_max_tokens
def load_templates(
folder_path: str,
file_extensions: Optional[List[str]] = None,
strip_whitespace: bool = True
) -> Dict[str, str]:
"""
Load template files from a specified folder into a dictionary.
Args:
folder_path: Path to the folder containing template files
file_extensions: List of file extensions to include (default: ['.txt', '.md'])
strip_whitespace: Whether to strip leading/trailing whitespace from templates (default: True)
Returns:
Dictionary with filename (without extension) as key and file content as value
"""
# Default extensions if none provided
if file_extensions is None:
file_extensions = ['.txt', '.md']
# Ensure extensions start with a dot
file_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in file_extensions]
templates = {"empty": " "} # Default empty template
# Create glob patterns for each extension
patterns = [os.path.join(folder_path, f'*{ext}') for ext in file_extensions]
# Find all matching files
for pattern in patterns:
for file_path in glob.glob(pattern):
try:
# Extract filename without extension to use as key
filename = os.path.basename(file_path)
template_name = os.path.splitext(filename)[0]
# Read file content
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Strip whitespace if specified
if strip_whitespace:
content = content.strip()
templates[template_name] = content
except Exception as e:
print(f"Error loading template from {file_path}: {str(e)}")
return templates
|