File size: 9,763 Bytes
87b6e34 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
def process_with_llm(fields_to_process, prompt_template, inf_model, params, batch_size=10):
"""
Process documents with LLM using a prompt template with dynamic field mapping.
Uses template fields to extract values from pre-standardized document fields.
Args:
fields_to_process (list): List of document dictionaries to process
prompt_template (str): Template with {field_name} placeholders matching keys in documents
inf_model: The inference model instance to use for generation
params: Parameters to pass to the inference model
batch_size (int): Number of documents to process per batch
Returns:
list: Processed results from the LLM
"""
import time
import re
# Safety check for inputs
if not fields_to_process or not inf_model:
print("Missing required inputs")
return []
# Handle case where prompt_template is a dictionary (from UI components)
if isinstance(prompt_template, dict) and 'value' in prompt_template:
prompt_template = prompt_template['value']
elif not isinstance(prompt_template, str):
print(f"Invalid prompt template type: {type(prompt_template)}, expected string")
return []
# Extract field names from the prompt template using regex
# This finds all strings between curly braces
field_pattern = r'\{([^{}]+)\}'
template_fields = re.findall(field_pattern, prompt_template)
if not template_fields:
print("No field placeholders found in template")
return []
# Create formatted prompts from the documents
formatted_prompts = []
for doc in fields_to_process:
try:
# Create a dictionary of field values to substitute
field_values = {}
for field in template_fields:
# Try direct match first
if field in doc:
field_values[field] = doc[field] if doc[field] is not None else ""
# If field contains periods (e.g., "data.title"), evaluate it
elif '.' in field:
try:
# Build a safe evaluation string
parts = field.split('.')
value = doc
for part in parts:
if isinstance(value, dict) and part in value:
value = value[part]
else:
value = None
break
field_values[field] = value if value is not None else ""
except:
field_values[field] = ""
else:
# Default to empty string if field not found
field_values[field] = ""
# Handle None values at the top level to ensure formatting works
for key in field_values:
if field_values[key] is None:
field_values[key] = ""
# Format the prompt with all available fields
prompt = prompt_template.format(**field_values)
formatted_prompts.append(prompt)
except Exception as e:
print(f"Error formatting prompt: {str(e)}")
print(f"Field values: {field_values}")
continue
# Return empty list if no valid prompts
if not formatted_prompts:
print("No valid prompts generated")
return []
# Print a sample of the formatted prompts for debugging
if formatted_prompts:
print(f"Sample formatted prompt: {formatted_prompts[0][:200]}...")
# Split into batches
batches = [formatted_prompts[i:i + batch_size] for i in range(0, len(formatted_prompts), batch_size)]
results = []
# Process each batch
for i, batch in enumerate(batches):
start_time = time.time()
try:
# Use the provided inference model to generate responses
print(f"Sending batch {i+1} of {len(batches)} to model")
# Call the inference model with the batch of prompts and params
batch_results = inf_model.generate_text(prompt=batch, params=params)
results.extend(batch_results)
except Exception as e:
print(f"Error in batch {i+1}: {str(e)}")
continue
end_time = time.time()
inference_time = end_time - start_time
print(f"Inference time for Batch {i+1}: {inference_time:.2f} seconds")
return results
def append_llm_results_to_dataframe(target_dataframe, fields_to_process, llm_results, selection_table, column_name=None):
"""
Add LLM processing results directly to the target DataFrame using selection indices
Args:
target_dataframe (pandas.DataFrame): DataFrame to modify in-place
fields_to_process (list): List of document dictionaries that were processed
llm_results (list): Results from the process_with_llm function
selection_table: Table selection containing indices of rows to update
column_name (str, optional): Custom name for the new column
"""
column_name = column_name or f"Added Column {len(list(target_dataframe))}"
# Initialize the new column with empty strings if it doesn't exist
if column_name not in target_dataframe.columns:
target_dataframe[column_name] = ""
# Safety checks
if not isinstance(llm_results, list) or not llm_results:
print("No LLM results to add")
return
# Get indices from selection table
if selection_table is not None and not selection_table.empty:
selected_indices = selection_table.index.tolist()
# Make sure we have the right number of results for the selected rows
if len(selected_indices) != len(llm_results):
print(f"Warning: Number of results ({len(llm_results)}) doesn't match selected rows ({len(selected_indices)})")
# Add results to the DataFrame at the selected indices
for idx, result in zip(selected_indices, llm_results):
try:
if idx < len(target_dataframe):
target_dataframe.at[idx, column_name] = result
else:
print(f"Warning: Selected index {idx} exceeds DataFrame length")
except Exception as e:
print(f"Error adding result to DataFrame: {str(e)}")
else:
print("No selection table provided or empty selection")
def add_llm_results_to_dataframe(original_df, fields_to_process, llm_results, column_name=None):
"""
Add LLM processing results to a copy of the original DataFrame
Args:
original_df (pandas.DataFrame): Original DataFrame
fields_to_process (list): List of document dictionaries that were processed
llm_results (list): Results from the process_with_llm function
Returns:
pandas.DataFrame: Copy of original DataFrame with added "Added Column {len(list(original_df))}" column or a custom name
"""
import pandas as pd
column_name = column_name or f"Added Column {len(list(original_df))}"
# Create a copy of the original DataFrame
result_df = original_df.copy()
# Initialize the new column with empty strings
result_df[column_name] = ""
# Safety checks
if not isinstance(llm_results, list) or not llm_results:
print("No LLM results to add")
return result_df
# Add results to the DataFrame
for i, (doc, result) in enumerate(zip(fields_to_process, llm_results)):
try:
# Find the matching row in the DataFrame
# This assumes the order of fields_to_process matches the original DataFrame
if i < len(result_df):
result_df.at[i, column_name] = result
else:
print(f"Warning: Result index {i} exceeds DataFrame length")
except Exception as e:
print(f"Error adding result to DataFrame: {str(e)}")
continue
return result_df
def display_answers_as_markdown(answers, mo):
"""
Takes a list of answers and displays each one as markdown using mo.md()
Args:
answers (list): List of text answers from the LLM
mo: The existing marimo module from the environment
Returns:
list: List of markdown elements
"""
# Handle case where answers is None or empty
if not answers:
return [mo.md("No answers available")]
# Create markdown for each answer
markdown_elements = []
for i, answer in enumerate(answers):
# Create a formatted markdown element with answer number and content
md_element = mo.md(f"""\n\n---\n\n# Answer {i+1}\n\n{answer}""")
markdown_elements.append(md_element)
return markdown_elements
def display_answers_stacked(answers, mo):
"""
Takes a list of answers and displays them stacked vertically using mo.vstack()
Args:
answers (list): List of text answers from the LLM
mo: The existing marimo module from the environment
Returns:
element: A vertically stacked collection of markdown elements
"""
# Get individual markdown elements
md_elements = display_answers_as_markdown(answers, mo)
# Add separator between each answer
separator = mo.md("---")
elements_with_separators = []
for i, elem in enumerate(md_elements):
elements_with_separators.append(elem)
if i < len(md_elements) - 1: # Don't add separator after the last element
elements_with_separators.append(separator)
# Return a vertically stacked collection
return mo.vstack(elements_with_separators, align="start", gap="2") |