from ibm_watsonx_ai import APIClient, Credentials from typing import Dict, Optional, List, Union, Any, Set import pandas as pd import marimo as mo import json import glob import io import os def get_cred_value(key, creds_var_name="baked_in_creds", default=""): """ Helper function to safely get a value from a credentials dictionary. Searches for credentials in: 1. Global variables with the specified variable name 2. Imported modules containing the specified variable name Args: key: The key to look up in the credentials dictionary. creds_var_name: The variable name of the credentials dictionary. default: The default value to return if the key is not found. Returns: The value from the credentials dictionary if it exists and contains the key, otherwise returns the default value. """ # Check if the credentials variable exists in globals if creds_var_name in globals(): creds_dict = globals()[creds_var_name] if isinstance(creds_dict, dict) and key in creds_dict: return creds_dict[key] # Check if credentials are in an imported module import sys for module_name, module_obj in sys.modules.items(): if hasattr(module_obj, creds_var_name): creds_dict = getattr(module_obj, creds_var_name) if isinstance(creds_dict, dict) and key in creds_dict: return creds_dict[key] return default def get_key_by_value(dictionary, value): for key, val in dictionary.items(): if val == value: return key return None def markdown_spacing(number): """Convert a number to that many ' ' characters.""" return ' ' * number def wrap_with_spaces(text_to_wrap, prefix_spaces=2, suffix_spaces=2): """Wrap text with non-breaking spaces on either side.""" prefix = markdown_spacing(prefix_spaces) if prefix_spaces > 0 else "" suffix = markdown_spacing(suffix_spaces) if suffix_spaces > 0 else "" return f"{prefix}{text_to_wrap}{suffix}" def load_file_dataframe(file, file_extension, sheet_selector=None, excel_data=None, header_row=0): """ Load a dataframe from an uploaded file with customizable header and row skipping. Parameters: ----------- file : marimo.ui.file object The file upload component containing the file data file_extension : str The extension of the uploaded file (.xlsx, .xls, .csv, .json) sheet_selector : marimo.ui.dropdown, optional Dropdown component for selecting Excel sheets excel_data : BytesIO, optional BytesIO object containing Excel data header_row : int, optional Row index to use as column headers (0-based). Default is 0 (first row). Use None to have pandas generate default column names. Returns: -------- tuple (pandas.DataFrame, list) - The loaded dataframe and list of column names """ dataframe = pd.DataFrame([]) column_names = [] if file.contents(): # Handle different file types if file_extension in ['.xlsx', '.xls'] and sheet_selector is not None and sheet_selector.value: # For Excel files - now we can safely access sheet_selector.value excel_data.seek(0) # Reset buffer position dataframe = pd.read_excel( excel_data, sheet_name=sheet_selector.value, header=header_row, engine="openpyxl" if file_extension == '.xlsx' else "xlrd" ) column_names = list(dataframe.columns) elif file_extension == '.csv': # For CSV files csv_data = io.StringIO(file.contents().decode('utf-8')) dataframe = pd.read_csv(csv_data, header=header_row) column_names = list(dataframe.columns) elif file_extension == '.json': # For JSON files try: json_data = json.loads(file.contents().decode('utf-8')) # Handle different JSON structures if isinstance(json_data, list): dataframe = pd.DataFrame(json_data) elif isinstance(json_data, dict): # If it's a dictionary with nested structures, try to normalize it if any(isinstance(v, (dict, list)) for v in json_data.values()): # For nested JSON with consistent structure dataframe = pd.json_normalize(json_data) else: # For flat JSON dataframe = pd.DataFrame([json_data]) column_names = list(dataframe.columns) except Exception as e: print(f"Error parsing JSON: {e}") return dataframe, column_names def create_parameter_table(input_list, column_name="Active Options", label="Select the Parameters to set to Active", selection_type="multi-cell", text_justify="center"): """ Creates a marimo table for parameter selection. Args: input_list: List of parameter names to display in the table column_name: Name of the column (default: "Active Options") label: Label for the table (default: "Select the Parameters to set to Active:") selection_type: Selection type, either "single-cell" or "multi-cell" (default: "multi-cell") text_justify: Text justification for the column (default: "center") Returns: A marimo table configured for parameter selection """ import marimo as mo # Validate selection type if selection_type not in ["single-cell", "multi-cell"]: raise ValueError("selection_type must be either 'single-cell' or 'multi-cell'") # Validate text justification if text_justify not in ["left", "center", "right"]: raise ValueError("text_justify must be one of: 'left', 'center', 'right'") # Create the table parameter_table = mo.ui.table( label=f"**{label}**", data={column_name: input_list}, selection=selection_type, text_justify_columns={column_name: text_justify} ) return parameter_table def get_cell_values(parameter_options): """ Extract active parameter values from a mo.ui.table. Args: parameter_options: A mo.ui.table with cell selection enabled Returns: Dictionary mapping parameter names to boolean values (True/False) """ # Get all parameter names from the table data all_params = set() # Use the data property to get all options from the table if hasattr(parameter_options, 'data'): table_data = parameter_options.data # Handle DataFrame-like structure if hasattr(table_data, 'shape') and hasattr(table_data, 'iloc'): for i in range(table_data.shape[0]): # Get value from first column if table_data.shape[1] > 0: param = table_data.iloc[i, 0] if param and isinstance(param, str): all_params.add(param) # Handle dict structure (common in marimo tables) elif isinstance(table_data, dict): # Get the first column's values if len(table_data) > 0: col_name = next(iter(table_data)) for param in table_data[col_name]: if param and isinstance(param, str): all_params.add(param) # Create result dictionary with all parameters set to False by default result = {param: False for param in all_params} # Get the selected cells if hasattr(parameter_options, 'value') and parameter_options.value is not None: selected_cells = parameter_options.value # Process selected cells for cell in selected_cells: if hasattr(cell, 'value') and cell.value in result: result[cell.value] = True elif isinstance(cell, dict) and 'value' in cell and cell['value'] in result: result[cell['value']] = True elif isinstance(cell, str) and cell in result: result[cell] = True return result def convert_table_to_json_docs(df, selected_columns=None): """ Convert a pandas DataFrame or dictionary to a list of JSON documents. Dynamically includes columns based on user selection. Column names are standardized to lowercase with underscores instead of spaces and special characters removed. Args: df: The DataFrame or dictionary to process selected_columns: List of column names to include in the output documents Returns: list: A list of dictionaries, each representing a row as a JSON document """ import pandas as pd import re def standardize_key(key): """Convert a column name to lowercase with underscores instead of spaces and no special characters""" if not isinstance(key, str): return str(key).lower() # Replace spaces with underscores and convert to lowercase key = key.lower().replace(' ', '_') # Remove special characters (keeping alphanumeric and underscores) return re.sub(r'[^\w]', '', key) # Handle case when input is a dictionary if isinstance(df, dict): # Filter the dictionary to include only selected columns if selected_columns: return [{standardize_key(k): df.get(k, None) for k in selected_columns}] else: # If no columns selected, return all key-value pairs with standardized keys return [{standardize_key(k): v for k, v in df.items()}] # Handle case when df is None if df is None: return [] # Ensure df is a DataFrame if not isinstance(df, pd.DataFrame): try: df = pd.DataFrame(df) except: return [] # Return empty list if conversion fails # Now check if DataFrame is empty if df.empty: return [] # Process selected_columns if it's a dictionary of true/false values if isinstance(selected_columns, dict): # Extract keys where value is True selected_columns = [col for col, include in selected_columns.items() if include] # If no columns are specifically selected, use all available columns if not selected_columns or not isinstance(selected_columns, list) or len(selected_columns) == 0: selected_columns = list(df.columns) # Determine which columns exist in the DataFrame available_columns = [] columns_lower = {col.lower(): col for col in df.columns if isinstance(col, str)} for col in selected_columns: if col in df.columns: available_columns.append(col) elif isinstance(col, str) and col.lower() in columns_lower: available_columns.append(columns_lower[col.lower()]) # If no valid columns found, return empty list if not available_columns: return [] # Process rows json_docs = [] for _, row in df.iterrows(): doc = {} for col in available_columns: value = row[col] # Standardize the column name when adding to document std_col = standardize_key(col) doc[std_col] = None if pd.isna(value) else value json_docs.append(doc) return json_docs def filter_models_by_function(resources, function_type="prompt_chat"): """ Filter model IDs from resources list that have a specific function type Args: resources (list): List of model resource objects function_type (str, optional): Function type to filter by. Defaults to "prompt_chat". Returns: list: List of model IDs that have the specified function """ filtered_model_ids = [] if not resources or not isinstance(resources, list): return filtered_model_ids for model in resources: # Check if the model has a functions attribute if "functions" in model and isinstance(model["functions"], list): # Check if any function has the matching id has_function = any( func.get("id") == function_type for func in model["functions"] if isinstance(func, dict) ) if has_function and "model_id" in model: filtered_model_ids.append(model["model_id"]) return filtered_model_ids def get_model_selection_table(client=None, model_type="all", filter_functionality=None, selection_mode="single-cell"): """ Creates and displays a table for model selection based on specified parameters. Args: client: The client object for API calls. If None, returns default models. model_type (str): Type of models to display. Options: "all", "chat", "embedding". filter_functionality (str, optional): Filter models by functionality type. Options include: "image_chat", "text_chat", "autoai_rag", "text_generation", "multilingual", etc. selection_mode (str): Mode for selecting table entries. Options: "single", "single-cell". Defaults to "single-cell". Returns: The selected model ID from the displayed table. """ # Default model list if client is None default_models = ['mistralai/mistral-large'] if client is None: # If no client, use default models available_models = default_models selection = mo.ui.table( available_models, selection="single", label="Select a model to use.", page_size=30, ) return selection # Get appropriate model specs based on model_type if model_type == "chat": model_specs = client.foundation_models.get_chat_model_specs() elif model_type == "embedding": model_specs = client.foundation_models.get_embeddings_model_specs() else: model_specs = client.foundation_models.get_model_specs() # Extract resources from model specs resources = model_specs.get("resources", []) # Filter by functionality if specified if filter_functionality and resources: model_id_list = filter_models_by_function(resources, filter_functionality) else: # Create list of model IDs if no filtering model_id_list = [resource["model_id"] for resource in resources] # If no models available after filtering, use defaults if not model_id_list: model_id_list = default_models # Create and display selection table model_selector = mo.ui.table( model_id_list, selection=selection_mode, label="Select a model to use.", page_size=30, initial_selection = [("0", "value")] if selection_mode == "single-cell" else [0] ### For single-cell it must have [("","column_name string")] to work as initial value ) return model_selector, resources, model_id_list def _enforce_model_selection(model_selection, model_id_list): # If nothing is selected (empty list) or value is None if not model_selection.value: # Reset to first item model = 0 model_selection._value = model_id_list[model] print(model_selection.value) return model_selection.value def update_max_tokens_limit(model_selection, resources, model_id_list): # Default value default_max_tokens = 4096 try: # Check if we have a selection and resources if model_selection.value is None or not hasattr(model_selection, 'value'): print("No model selection or selection has no value") return default_max_tokens if not resources or not isinstance(resources, list) or len(resources) == 0: print("Resources is empty or not a list") return default_max_tokens # Get the model ID - handle both index selection and direct string selection selected_value = model_selection.value print(f"Raw selection value: {selected_value}") # If it's an array with indices if isinstance(selected_value, list) and len(selected_value) > 0: if isinstance(selected_value[0], int) and 0 <= selected_value[0] < len(model_id_list): selected_model_id = model_id_list[selected_value[0]] else: selected_model_id = str(selected_value[0]) # Convert to string if needed else: selected_model_id = str(selected_value) # Direct value print(f"Selected model ID: {selected_model_id}") # Find the model for model in resources: model_id = model.get("model_id") if model_id == selected_model_id: if "model_limits" in model and "max_output_tokens" in model["model_limits"]: return model["model_limits"]["max_output_tokens"] break except Exception as e: print(f"Error: {e}") return default_max_tokens def load_templates( folder_path: str, file_extensions: Optional[List[str]] = None, strip_whitespace: bool = True ) -> Dict[str, str]: """ Load template files from a specified folder into a dictionary. Args: folder_path: Path to the folder containing template files file_extensions: List of file extensions to include (default: ['.txt', '.md']) strip_whitespace: Whether to strip leading/trailing whitespace from templates (default: True) Returns: Dictionary with filename (without extension) as key and file content as value """ # Default extensions if none provided if file_extensions is None: file_extensions = ['.txt', '.md'] # Ensure extensions start with a dot file_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in file_extensions] templates = {"empty": " "} # Default empty template # Create glob patterns for each extension patterns = [os.path.join(folder_path, f'*{ext}') for ext in file_extensions] # Find all matching files for pattern in patterns: for file_path in glob.glob(pattern): try: # Extract filename without extension to use as key filename = os.path.basename(file_path) template_name = os.path.splitext(filename)[0] # Read file content with open(file_path, 'r', encoding='utf-8') as file: content = file.read() # Strip whitespace if specified if strip_whitespace: content = content.strip() templates[template_name] = content except Exception as e: print(f"Error loading template from {file_path}: {str(e)}") return templates