# Importing necessary libraries import streamlit as st st.set_page_config( page_title="Saved Scenarios", page_icon="⚖️", layout="wide", initial_sidebar_state="collapsed", ) import io import sys import json import pickle import zipfile import traceback import numpy as np import pandas as pd from scenario import numerize from openpyxl import Workbook from post_gres_cred import db_cred from log_application import log_message from utilities import ( project_selection, update_db, set_header, load_local_css, name_formating, ) schema = db_cred["schema"] load_local_css("styles.css") set_header() # Initialize project name session state if "project_name" not in st.session_state: st.session_state["project_name"] = None # Fetch project dictionary if "project_dct" not in st.session_state: project_selection() st.stop() # Display Username and Project Name if "username" in st.session_state and st.session_state["username"] is not None: cols1 = st.columns([2, 1]) with cols1[0]: st.markdown(f"**Welcome {st.session_state['username']}**") with cols1[1]: st.markdown(f"**Current Project: {st.session_state['project_name']}**") # Function to get saved scenarios dictionary def get_saved_scenarios_dict(): return st.session_state["project_dct"]["saved_scenarios"][ "saved_scenarios_dict" ] # Function to format values based on their size def format_value(value): return round(value, 4) if value < 1 else round(value, 1) # Function to recursively convert non-serializable types to serializable ones def convert_to_serializable(obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {key: convert_to_serializable(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(element) for element in obj] elif isinstance(obj, (int, float, str, bool, type(None))): return obj else: # Fallback: convert the object to a string return str(obj) # Function to generate zip file of current scenario @st.cache_data(show_spinner=False) def download_as_zip( df, scenario_data, excel_name="optimization_results.xlsx", json_name="scenario_params.json", ): # Create an in-memory bytes buffer for the ZIP file buffer = io.BytesIO() # Create a ZipFile object in memory with zipfile.ZipFile(buffer, "w") as zip_file: # Save the DataFrame to an Excel file in the zip using openpyxl excel_buffer = io.BytesIO() workbook = Workbook() sheet = workbook.active sheet.title = "Results" # Write DataFrame headers for col_num, column_title in enumerate(df.columns, 1): sheet.cell(row=1, column=col_num, value=column_title) # Write DataFrame data for row_num, row_data in enumerate(df.values, 2): for col_num, cell_value in enumerate(row_data, 1): sheet.cell(row=row_num, column=col_num, value=cell_value) # Save the workbook to the in-memory buffer workbook.save(excel_buffer) excel_buffer.seek(0) # Rewind the buffer to the beginning zip_file.writestr(excel_name, excel_buffer.getvalue()) # Save the dictionary to a JSON file in the zip json_buffer = io.BytesIO() json_buffer.write( json.dumps(convert_to_serializable(scenario_data), indent=4).encode("utf-8") ) json_buffer.seek(0) # Rewind the buffer to the beginning zip_file.writestr(json_name, json_buffer.getvalue()) buffer.seek(0) # Rewind the buffer to the beginning return buffer # Function to delete the selected scenario from the saved scenarios dictionary def delete_selected_scenarios(selected_scenario): if ( selected_scenario in st.session_state["project_dct"]["saved_scenarios"]["saved_scenarios_dict"] ): del st.session_state["project_dct"]["saved_scenarios"]["saved_scenarios_dict"][ selected_scenario ] try: # Page Title st.title("Saved Scenarios") # Placeholder to display scenarios name scenarios_name_placeholder = st.empty() # Get saved scenarios dictionary and scenario name list saved_scenarios_dict = get_saved_scenarios_dict() scenarios_list = list(saved_scenarios_dict.keys()) # Check if the list of saved scenarios is empty if len(scenarios_list) == 0: # Display a warning message if no scenarios are saved st.warning("No scenarios saved. Please save a scenario to load.", icon="⚠️") # Log message log_message( "warning", "No scenarios saved. Please save a scenario to load.", "Saved Scenarios", ) st.stop() # Columns for scenario selection and save progress select_scenario_col, save_progress_col = st.columns(2) save_message_display_placeholder = st.container() # Display a dropdown saved scenario list selected_scenario = select_scenario_col.selectbox( "Pick a Scenario", sorted(scenarios_list), key="selected_scenario" ) # Save page progress with save_progress_col: st.write("###") with save_message_display_placeholder, st.spinner("Saving Progress ..."): if save_progress_col.button("Save Progress", use_container_width=True): # Update DB update_db( prj_id=st.session_state["project_number"], page_nam="Saved Scenarios", file_nam="project_dct", pkl_obj=pickle.dumps(st.session_state["project_dct"]), schema=schema, ) # Display success message st.success("Progress saved successfully!", icon="💾") st.toast("Progress saved successfully!", icon="💾") # Log message log_message("info", "Progress saved successfully!", "Saved Scenarios") selected_scenario_data = saved_scenarios_dict[selected_scenario] # Scenarios Name metrics_name = selected_scenario_data["metrics_selected"] panel_name = selected_scenario_data["panel_selected"] optimization_name = selected_scenario_data["optimization"] multiplier = selected_scenario_data["multiplier"] timeframe = selected_scenario_data["timeframe"] # Display the scenario details with bold "Metric," "Panel," and "Optimization" scenarios_name_placeholder.markdown( f"**Metric**: {name_formating(metrics_name)}; **Panel**: {name_formating(panel_name)}; **Fix**: {name_formating(optimization_name)}; **Timeframe**: {name_formating(timeframe)}" ) # Create columns for download and delete buttons download_col, delete_col = st.columns(2) save_message_display_placeholder = st.container() # Channel List channels_list = list(selected_scenario_data["channels"].keys()) # List to hold data for all channels channels_data = [] # Iterate through each channel and gather required data for channel in channels_list: channel_conversion_rate = selected_scenario_data["channels"][channel][ "conversion_rate" ] channel_actual_spends = ( selected_scenario_data["channels"][channel]["actual_total_spends"] * channel_conversion_rate ) channel_optimized_spends = ( selected_scenario_data["channels"][channel]["modified_total_spends"] * channel_conversion_rate ) channel_actual_metrics = selected_scenario_data["channels"][channel][ "actual_total_sales" ] channel_optimized_metrics = selected_scenario_data["channels"][channel][ "modified_total_sales" ] channel_roi_mroi_data = selected_scenario_data["channel_roi_mroi"][channel] # Extract the ROI and MROI data actual_roi = channel_roi_mroi_data["actual_roi"] optimized_roi = channel_roi_mroi_data["optimized_roi"] actual_mroi = channel_roi_mroi_data["actual_mroi"] optimized_mroi = channel_roi_mroi_data["optimized_mroi"] # Calculate spends per metric spends_per_metrics_actual = channel_actual_spends / channel_actual_metrics spends_per_metrics_optimized = ( channel_optimized_spends / channel_optimized_metrics ) # Append the collected data as a dictionary to the list channels_data.append( { "Channel Name": channel, "Spends Actual": numerize(channel_actual_spends / multiplier), "Spends Optimized": numerize(channel_optimized_spends / multiplier), f"{name_formating(metrics_name)} Actual": numerize( channel_actual_metrics / multiplier ), f"{name_formating(metrics_name)} Optimized": numerize( channel_optimized_metrics / multiplier ), "ROI Actual": format_value(actual_roi), "ROI Optimized": format_value(optimized_roi), "MROI Actual": format_value(actual_mroi), "MROI Optimized": format_value(optimized_mroi), f"Spends per {name_formating(metrics_name)} Actual": round( spends_per_metrics_actual, 2 ), f"Spends per {name_formating(metrics_name)} Optimized": round( spends_per_metrics_optimized, 2 ), } ) # Create a DataFrame from the collected data df = pd.DataFrame(channels_data) # Display the DataFrame st.dataframe(df, hide_index=True) # Generate download able data for selected scenario buffer = download_as_zip( df, selected_scenario_data, excel_name="optimization_results.xlsx", json_name="scenario_params.json", ) # Provide the buffer as a downloadable ZIP file file_name = f"{selected_scenario}_scenario_data.zip" if download_col.download_button( label="Download", data=buffer, file_name=file_name, mime="application/zip", use_container_width=True, ): # Log message log_message( "info", f"FILE_NAME: {file_name} has been successfully downloaded.", "Saved Scenarios", ) # Button to trigger the deletion of the selected scenario if delete_col.button( "Delete", use_container_width=True, on_click=delete_selected_scenarios, args=(selected_scenario,), ): # Display success message with save_message_display_placeholder: st.success( "Selected scenario successfully deleted. Click the 'Save Progress' button to ensure your changes are updated!", icon="🗑️", ) st.toast( "Selected scenario successfully deleted. Click the 'Save Progress' button to ensure your changes are updated!", icon="🗑️", ) # Log message log_message( "info", "Selected scenario successfully deleted.", "Saved Scenarios" ) except Exception as e: # Capture the error details exc_type, exc_value, exc_traceback = sys.exc_info() error_message = "".join( traceback.format_exception(exc_type, exc_value, exc_traceback) ) # Log message log_message("error", f"An error occurred: {error_message}.", "Saved Scenarios") # Display a warning message st.warning( "Oops! Something went wrong. Please try refreshing the tool or creating a new project.", icon="⚠️", )