def process_with_llm(fields_to_process, prompt_template, inf_model, params, batch_size=10): """ Process documents with LLM using a prompt template with dynamic field mapping. Uses template fields to extract values from pre-standardized document fields. Args: fields_to_process (list): List of document dictionaries to process prompt_template (str): Template with {field_name} placeholders matching keys in documents inf_model: The inference model instance to use for generation params: Parameters to pass to the inference model batch_size (int): Number of documents to process per batch Returns: list: Processed results from the LLM """ import marimo as mo import time import re # Safety check for inputs if not fields_to_process or not inf_model: print("Missing required inputs") return [] # Handle case where prompt_template is a dictionary (from UI components) if isinstance(prompt_template, dict) and 'value' in prompt_template: prompt_template = prompt_template['value'] elif not isinstance(prompt_template, str): print(f"Invalid prompt template type: {type(prompt_template)}, expected string") return [] # Extract field names from the prompt template using regex # This finds all strings between curly braces field_pattern = r'\{([^{}]+)\}' template_fields = re.findall(field_pattern, prompt_template) if not template_fields: print("No field placeholders found in template") return [] # Create formatted prompts from the documents formatted_prompts = [] for doc in fields_to_process: try: # Create a dictionary of field values to substitute field_values = {} for field in template_fields: # Try direct match first if field in doc: field_values[field] = doc[field] if doc[field] is not None else "" # If field contains periods (e.g., "data.title"), evaluate it elif '.' in field: try: # Build a safe evaluation string parts = field.split('.') value = doc for part in parts: if isinstance(value, dict) and part in value: value = value[part] else: value = None break field_values[field] = value if value is not None else "" except: field_values[field] = "" else: # Default to empty string if field not found field_values[field] = "" # Handle None values at the top level to ensure formatting works for key in field_values: if field_values[key] is None: field_values[key] = "" # Format the prompt with all available fields prompt = prompt_template.format(**field_values) formatted_prompts.append(prompt) except Exception as e: print(f"Error formatting prompt: {str(e)}") print(f"Field values: {field_values}") continue # Return empty list if no valid prompts if not formatted_prompts: print("No valid prompts generated") return [] # Print a sample of the formatted prompts for debugging if formatted_prompts: print(f"Sample formatted prompt: {formatted_prompts[0][:200]}...") # Split into batches batches = [formatted_prompts[i:i + batch_size] for i in range(0, len(formatted_prompts), batch_size)] results = [] # Process each batch with mo.status.progress_bar( total=len(batches), title="Processing Batches", subtitle=f"Processing {len(formatted_prompts)} prompts in {len(batches)} batches", completion_title="Processing Complete", completion_subtitle=f"Processed {len(formatted_prompts)} prompts successfully", show_rate=True, show_eta=True, remove_on_exit=True ) as progress: for i, batch in enumerate(batches): start_time = time.time() try: # Use the provided inference model to generate responses print(f"Sending batch {i+1} of {len(batches)} to model") # Call the inference model with the batch of prompts and params batch_results = inf_model.generate_text(prompt=batch, params=params) results.extend(batch_results) except Exception as e: print(f"Error in batch {i+1}: {str(e)}") continue end_time = time.time() inference_time = end_time - start_time print(f"Inference time for Batch {i+1}: {inference_time:.2f} seconds") # Update progress bar progress.update(increment=1) # Add 1 second delay on completion before removing time.sleep(1) return results # def process_with_llm_no_progress_bar(fields_to_process, prompt_template, inf_model, params, batch_size=10): # """ # Process documents with LLM using a prompt template with dynamic field mapping. # Uses template fields to extract values from pre-standardized document fields. # Args: # fields_to_process (list): List of document dictionaries to process # prompt_template (str): Template with {field_name} placeholders matching keys in documents # inf_model: The inference model instance to use for generation # params: Parameters to pass to the inference model # batch_size (int): Number of documents to process per batch # Returns: # list: Processed results from the LLM # """ # import time # import re # # Safety check for inputs # if not fields_to_process or not inf_model: # print("Missing required inputs") # return [] # # Handle case where prompt_template is a dictionary (from UI components) # if isinstance(prompt_template, dict) and 'value' in prompt_template: # prompt_template = prompt_template['value'] # elif not isinstance(prompt_template, str): # print(f"Invalid prompt template type: {type(prompt_template)}, expected string") # return [] # # Extract field names from the prompt template using regex # # This finds all strings between curly braces # field_pattern = r'\{([^{}]+)\}' # template_fields = re.findall(field_pattern, prompt_template) # if not template_fields: # print("No field placeholders found in template") # return [] # # Create formatted prompts from the documents # formatted_prompts = [] # for doc in fields_to_process: # try: # # Create a dictionary of field values to substitute # field_values = {} # for field in template_fields: # # Try direct match first # if field in doc: # field_values[field] = doc[field] if doc[field] is not None else "" # # If field contains periods (e.g., "data.title"), evaluate it # elif '.' in field: # try: # # Build a safe evaluation string # parts = field.split('.') # value = doc # for part in parts: # if isinstance(value, dict) and part in value: # value = value[part] # else: # value = None # break # field_values[field] = value if value is not None else "" # except: # field_values[field] = "" # else: # # Default to empty string if field not found # field_values[field] = "" # # Handle None values at the top level to ensure formatting works # for key in field_values: # if field_values[key] is None: # field_values[key] = "" # # Format the prompt with all available fields # prompt = prompt_template.format(**field_values) # formatted_prompts.append(prompt) # except Exception as e: # print(f"Error formatting prompt: {str(e)}") # print(f"Field values: {field_values}") # continue # # Return empty list if no valid prompts # if not formatted_prompts: # print("No valid prompts generated") # return [] # # Print a sample of the formatted prompts for debugging # if formatted_prompts: # print(f"Sample formatted prompt: {formatted_prompts[0][:200]}...") # # Split into batches # batches = [formatted_prompts[i:i + batch_size] for i in range(0, len(formatted_prompts), batch_size)] # results = [] # # Process each batch # for i, batch in enumerate(batches): # start_time = time.time() # try: # # Use the provided inference model to generate responses # print(f"Sending batch {i+1} of {len(batches)} to model") # # Call the inference model with the batch of prompts and params # batch_results = inf_model.generate_text(prompt=batch, params=params) # results.extend(batch_results) # except Exception as e: # print(f"Error in batch {i+1}: {str(e)}") # continue # end_time = time.time() # inference_time = end_time - start_time # print(f"Inference time for Batch {i+1}: {inference_time:.2f} seconds") # return results def append_llm_results_to_dataframe(target_dataframe, fields_to_process, llm_results, selection_table, column_name=None): """ Add LLM processing results directly to the target DataFrame using selection indices Args: target_dataframe (pandas.DataFrame): DataFrame to modify in-place fields_to_process (list): List of document dictionaries that were processed llm_results (list): Results from the process_with_llm function selection_table: Table selection containing indices of rows to update column_name (str, optional): Custom name for the new column """ column_name = column_name or f"Added Column {len(list(target_dataframe))}" # Initialize the new column with empty strings if it doesn't exist if column_name not in target_dataframe.columns: target_dataframe[column_name] = "" # Safety checks if not isinstance(llm_results, list) or not llm_results: print("No LLM results to add") return # Get indices from selection table if selection_table is not None and not selection_table.empty: selected_indices = selection_table.index.tolist() # Make sure we have the right number of results for the selected rows if len(selected_indices) != len(llm_results): print(f"Warning: Number of results ({len(llm_results)}) doesn't match selected rows ({len(selected_indices)})") # Add results to the DataFrame at the selected indices for idx, result in zip(selected_indices, llm_results): try: if idx < len(target_dataframe): target_dataframe.at[idx, column_name] = result else: print(f"Warning: Selected index {idx} exceeds DataFrame length") except Exception as e: print(f"Error adding result to DataFrame: {str(e)}") else: print("No selection table provided or empty selection") def add_llm_results_to_dataframe(original_df, fields_to_process, llm_results, column_name=None): """ Add LLM processing results to a copy of the original DataFrame Args: original_df (pandas.DataFrame): Original DataFrame fields_to_process (list): List of document dictionaries that were processed llm_results (list): Results from the process_with_llm function Returns: pandas.DataFrame: Copy of original DataFrame with added "Added Column {len(list(original_df))}" column or a custom name """ import pandas as pd column_name = column_name or f"Added Column {len(list(original_df))}" # Create a copy of the original DataFrame result_df = original_df.copy() # Initialize the new column with empty strings result_df[column_name] = "" # Safety checks if not isinstance(llm_results, list) or not llm_results: print("No LLM results to add") return result_df # Add results to the DataFrame for i, (doc, result) in enumerate(zip(fields_to_process, llm_results)): try: # Find the matching row in the DataFrame # This assumes the order of fields_to_process matches the original DataFrame if i < len(result_df): result_df.at[i, column_name] = result else: print(f"Warning: Result index {i} exceeds DataFrame length") except Exception as e: print(f"Error adding result to DataFrame: {str(e)}") continue return result_df def display_answers_as_markdown(answers, mo): """ Takes a list of answers and displays each one as markdown using mo.md() Args: answers (list): List of text answers from the LLM mo: The existing marimo module from the environment Returns: list: List of markdown elements """ # Handle case where answers is None or empty if not answers: return [mo.md("No answers available")] # Create markdown for each answer markdown_elements = [] for i, answer in enumerate(answers): # Create a formatted markdown element with answer number and content md_element = mo.md(f"""\n\n---\n\n# Answer {i+1}\n\n{answer}""") markdown_elements.append(md_element) return markdown_elements def display_answers_stacked(answers, mo): """ Takes a list of answers and displays them stacked vertically using mo.vstack() Args: answers (list): List of text answers from the LLM mo: The existing marimo module from the environment Returns: element: A vertically stacked collection of markdown elements """ # Get individual markdown elements md_elements = display_answers_as_markdown(answers, mo) # Add separator between each answer separator = mo.md("---") elements_with_separators = [] for i, elem in enumerate(md_elements): elements_with_separators.append(elem) if i < len(md_elements) - 1: # Don't add separator after the last element elements_with_separators.append(separator) # Return a vertically stacked collection return mo.vstack(elements_with_separators, align="start", gap="2")