MMO_Demo / classes.py
Samkeet-Blend360
Email
e8d72c9
import numpy as np
from scipy.optimize import minimize, LinearConstraint, NonlinearConstraint
from collections import OrderedDict
import pandas as pd
from numerize.numerize import numerize
# from gekko import GEKKO
def class_to_dict(class_instance):
attr_dict = {}
if isinstance(class_instance, Channel):
attr_dict["type"] = "Channel"
attr_dict["name"] = class_instance.name
attr_dict["dates"] = class_instance.dates
attr_dict["spends"] = class_instance.actual_spends
attr_dict["conversion_rate"] = class_instance.conversion_rate
attr_dict["modified_spends"] = class_instance.modified_spends
attr_dict["modified_sales"] = class_instance.modified_sales
attr_dict["response_curve_type"] = class_instance.response_curve_type
attr_dict["response_curve_params"] = class_instance.response_curve_params
attr_dict["penalty"] = class_instance.penalty
attr_dict["bounds"] = class_instance.bounds
attr_dict["actual_total_spends"] = class_instance.actual_total_spends
attr_dict["actual_total_sales"] = class_instance.actual_total_sales
attr_dict["modified_total_spends"] = class_instance.modified_total_spends
attr_dict["modified_total_sales"] = class_instance.modified_total_sales
# attr_dict["actual_mroi"] = class_instance.get_marginal_roi("actual")
# attr_dict["modified_mroi"] = class_instance.get_marginal_roi("modified")
elif isinstance(class_instance, Scenario):
attr_dict["type"] = "Scenario"
attr_dict["name"] = class_instance.name
channels = []
for channel in class_instance.channels.values():
channels.append(class_to_dict(channel))
attr_dict["channels"] = channels
attr_dict["constant"] = class_instance.constant
attr_dict["correction"] = class_instance.correction
attr_dict["actual_total_spends"] = class_instance.actual_total_spends
attr_dict["actual_total_sales"] = class_instance.actual_total_sales
attr_dict["modified_total_spends"] = class_instance.modified_total_spends
attr_dict["modified_total_sales"] = class_instance.modified_total_sales
return attr_dict
def class_from_dict(attr_dict):
if attr_dict["type"] == "Channel":
return Channel.from_dict(attr_dict)
elif attr_dict["type"] == "Scenario":
return Scenario.from_dict(attr_dict)
class Channel:
def __init__(
self,
name,
dates,
spends,
sales,
response_curve_type,
response_curve_params,
bounds,channel_bounds_min,channel_bounds_max,
conversion_rate=1,
modified_spends=None,
modified_sales=None,
penalty=True,
):
self.name = name
self.dates = dates
self.conversion_rate = conversion_rate
self.actual_spends = spends.copy()
self.actual_sales = sales.copy()
if modified_spends is None:
self.modified_spends = self.actual_spends.copy()
else:
self.modified_spends = modified_spends
if modified_sales is None:
# self.modified_sales = self.calculate_sales()
self.modified_sales = self.actual_sales.copy()
else:
self.modified_sales = self.calculate_sales()
# self.modified_spends = modified_spends
self.response_curve_type = response_curve_type
self.response_curve_params = response_curve_params
self.bounds = bounds
self.channel_bounds_min = channel_bounds_min
self.channel_bounds_max = channel_bounds_max
self.penalty = penalty
self.upper_limit = self.actual_spends.max() + self.actual_spends.std()
self.power = np.ceil(np.log(self.actual_spends.max()) / np.log(10)) - 3
# self.actual_sales = None
# self.actual_sales = self.response_curve(self.actual_spends)#sales.copy()#
self.actual_total_spends = self.actual_spends.sum()
self.actual_total_sales = self.actual_sales.sum()
self.modified_total_spends = self.modified_spends.sum()
self.modified_total_sales = self.modified_sales.sum()
self.delta_spends = self.modified_total_spends - self.actual_total_spends
self.delta_sales = self.modified_total_sales - self.actual_total_sales
# # # # print(self.actual_total_spends)
def update_penalty(self, penalty):
self.penalty = penalty
def _modify_spends(self, spends_array, total_spends):
return spends_array * total_spends / spends_array.sum()
def modify_spends(self, total_spends):
# # # # print(total_spends)
self.modified_spends[0] = total_spends
# (
# self.modified_spends * total_spends / self.modified_spends.sum()
# )
# # # # print("in spends")
# # # # print(self.modified_spends,self.modified_spends.sum())
def calculate_sales(self):
# # # # print("in calc_sales")
# # # # print(self.modified_spends)
return self.response_curve(self.modified_spends)
def hill_equation(x, Kd, n):
return x**n / (Kd**n + x**n)
def response_curve(self, x):
# # # # print(x)
# if self.penalty:
# # # # print("in penalty")
# x = np.where(
# x < self.upper_limit,
# x,
# self.upper_limit + (x - self.upper_limit) * self.upper_limit / x,
# )
if self.response_curve_type == "hill-eq":
# dividing_parameter = check_dividing_parameter()
# # # # print("lalala")
# # # # # print(self.name)\
# # # # print(len(x))
# # # # print("in response curve function")
# # # # print(x)
if len(x) == 1:
dividing_rate = 1
# # # # print(dividing_rate)
# x = np.sum(x)
else:
dividing_rate = self.response_curve_params["num_pos_obsv"]
# dividing_rate = self.response_curve_params["num_pos_obsv"]
# x = np.sum(x)
# dividing_rate = 104
dividing_rate = self.response_curve_params["num_pos_obsv"]
Kd= self.response_curve_params["Kd"]
n= self.response_curve_params["n"]
x_min= self.response_curve_params["x_min"]
x_max= self.response_curve_params["x_max"]
y_min= self.response_curve_params["y_min"]
y_max= self.response_curve_params['y_max']
# # # # # print(x_min)
# # # # # print(Kd,n,x_min,x_max,y_min,y_max)
# # # # # print(np.sum(x)/104)
x_inp = ( (x/dividing_rate) - x_min) / (x_max - x_min)
# # # # # print("x",x)
# # # # # print("x_inp",x_inp)
x_out = x_inp**n / (Kd**n + x_inp**n) #self.hill_equation(x_inp,Kd, n)
# # # # # print("x_out",x_out)
x_val_inv = (x_out*x_max + (1 - x_out) * x_min)
sales = (x_val_inv*y_min/y_max)*dividing_rate
# sales = ((x_max - x_min)*x_out + x_min)*dividing_rate
sales[np.isnan(sales)] = 0
# # # # # print(sales)
# # # # # print(np.sum(sales))
# # # # # print("sales",sales)
# # # # print("aa")
# # # # print(sales)
# # # # print("aa1")
if self.response_curve_type == "s-curve":
if self.power >= 0:
x = x / 10**self.power
x = x.astype("float64")
K = self.response_curve_params["Kd"]
b = self.response_curve_params["b"]
a = self.response_curve_params["a"]
x0 = self.response_curve_params["x0"]
sales = K / (1 + b * np.exp(-a * (x - x0)))
if self.response_curve_type == "linear":
beta = self.response_curve_params["beta"]
sales = beta * x
return sales
def get_marginal_roi(self, flag):
K = self.response_curve_params["K"]
a = self.response_curve_params["a"]
# x = self.modified_total_spends
# if self.power >= 0 :
# x = x / 10**self.power
# x = x.astype('float64')
# return K*b*a*np.exp(-a*(x-x0)) / (1 + b * np.exp(-a*(x - x0)))**2
if flag == "actual":
y = self.response_curve(self.actual_spends)
# spends_array = self.actual_spends
# total_spends = self.actual_total_spends
# total_sales = self.actual_total_sales
else:
y = self.response_curve(self.modified_spends)
# spends_array = self.modified_spends
# total_spends = self.modified_total_spends
# total_sales = self.modified_total_sales
# spends_inc_1 = self._modify_spends(spends_array, total_spends+1)
mroi = a * (y) * (1 - y / K)
return mroi.sum() / len(self.modified_spends)
# spends_inc_1 = self.spends_array + 1
# new_total_sales = self.response_curve(spends_inc_1).sum()
# return (new_total_sales - total_sales) / len(self.modified_spends)
def update(self, total_spends):
self.modify_spends(total_spends)
self.modified_sales = self.calculate_sales()
self.modified_total_spends = self.modified_spends.sum()
self.modified_total_sales = self.modified_sales.sum()
self.delta_spends = self.modified_total_spends - self.actual_total_spends
self.delta_sales = self.modified_total_sales - self.actual_total_sales
def update_bounds_min(self, modified_bound):
self.channel_bounds_min = modified_bound
def update_bounds_max(self, modified_bound):
self.channel_bounds_max = modified_bound
def intialize(self):
self.new_spends = self.old_spends
def __str__(self):
return f"{self.name},{self.actual_total_sales}, {self.modified_total_spends}"
@classmethod
def from_dict(cls, attr_dict):
return Channel(
name=attr_dict["name"],
dates=attr_dict["dates"],
spends=attr_dict["spends"],
bounds=attr_dict["bounds"],
modified_spends=attr_dict["modified_spends"],
response_curve_type=attr_dict["response_curve_type"],
response_curve_params=attr_dict["response_curve_params"],
penalty=attr_dict["penalty"],
)
def update_response_curves(self, response_curve_params):
self.response_curve_params = response_curve_params
class Scenario:
def __init__(self, name, channels, constant, correction):
self.name = name
self.channels = channels
self.constant = constant
self.correction = correction
self.actual_total_spends = self.calculate_modified_total_spends()
self.actual_total_sales = self.calculate_actual_total_sales()
self.modified_total_sales = self.calculate_modified_total_sales()
self.modified_total_spends = self.calculate_modified_total_spends()
self.delta_spends = self.modified_total_spends - self.actual_total_spends
self.delta_sales = self.modified_total_sales - self.actual_total_sales
def update_penalty(self, value):
for channel in self.channels.values():
channel.update_penalty(value)
def calculate_modified_total_spends(self):
total_actual_spends = 0.0
for channel in self.channels.values():
total_actual_spends += channel.actual_total_spends * 1.0
return total_actual_spends
def calculate_modified_total_spends(self):
total_modified_spends = 0.0
for channel in self.channels.values():
# import streamlit as st
# st.write(channel.modified_total_spends )
total_modified_spends += (
channel.modified_total_spends * 1.0
)
return total_modified_spends
def calculate_actual_total_sales(self):
total_actual_sales = 0#self.constant.sum() + self.correction.sum()
# # # # print("a")
for channel in self.channels.values():
total_actual_sales += channel.actual_total_sales
# # # # # print(channel.actual_total_sales)
# # # # # print(total_actual_sales)
return total_actual_sales
def calculate_modified_total_sales(self):
total_modified_sales = 0 #self.constant.sum() + self.correction.sum()
# # # # print(total_modified_sales)
for channel in self.channels.values():
# # # # print(channel,channel.modified_total_sales)
total_modified_sales += channel.modified_total_sales
return total_modified_sales
def update(self, channel_name, modified_spends):
# # # # print("in updtw")
self.channels[channel_name].update(modified_spends)
self.modified_total_sales = self.calculate_modified_total_sales()
self.modified_total_spends = self.calculate_modified_total_spends()
self.delta_spends = self.modified_total_spends - self.actual_total_spends
self.delta_sales = self.modified_total_sales - self.actual_total_sales
def update_bounds_min(self, channel_name,modified_bound):
# self.modify_spends(total_spends)
self.channels[channel_name].update_bounds_min(modified_bound)
def update_bounds_max(self, channel_name,modified_bound):
# self.modify_spends(total_spends)
self.channels[channel_name].update_bounds_max(modified_bound)
# def optimize_spends(self, sales_percent, channels_list, algo="COBYLA"):
# desired_sales = self.actual_total_sales * (1 + sales_percent / 100.0)
# def constraint(x):
# for ch, spends in zip(channels_list, x):
# self.update(ch, spends)
# return self.modified_total_sales - desired_sales
# bounds = []
# for ch in channels_list:
# bounds.append(
# (1 + np.array([-50.0, 100.0]) / 100.0)
# * self.channels[ch].actual_total_spends
# )
# initial_point = []
# for bound in bounds:
# initial_point.append(bound[0])
# power = np.ceil(np.log(sum(initial_point)) / np.log(10))
# constraints = [NonlinearConstraint(constraint, -1.0, 1.0)]
# res = minimize(
# lambda x: sum(x) / 10 ** (power),
# bounds=bounds,
# x0=initial_point,
# constraints=constraints,
# method=algo,
# options={"maxiter": int(2e7), "catol": 1},
# )
# for channel_name, modified_spends in zip(channels_list, res.x):
# self.update(channel_name, modified_spends)
# return zip(channels_list, res.x)
def optimize_spends(self, sales_percent, channels_list, algo="trust-constr"):
import streamlit as st
num_channels = len(channels_list)
# # # # # print("%"*100)
desired_sales = self.actual_total_sales * (1 + sales_percent / 100.0)
def constraint(x):
for ch, spends in zip(channels_list, x):
self.update(ch, spends)
return self.modified_total_sales - desired_sales
# def calc_overall_bounds(channels_list):
# total_spends=0
# for ch in zip(channels_list):
# # print(ch)
# total_spends= total_spends+self.channels[ch].actual_total_spends
# return total_spends
bounds = []
for ch in channels_list:
# bounds.append(
# (1+np.array([-50.0, 100.0]) / 100.0)
# * self.channels[ch].actual_total_spends
# )
lb = (1- int(self.channels[ch].channel_bounds_min) / 100) * self.channels[ch].actual_total_spends
ub = (1+ int(self.channels[ch].channel_bounds_max) / 100) * self.channels[ch].actual_total_spends
bounds.append((lb,ub))
# # # # # print(self.channels[ch].actual_total_spends)
initial_point = []
for bound in bounds:
initial_point.append(bound[0])
# initial_point = np.nan_to_num(initial_point, nan=0.0, posinf=0.0, neginf=0.0)
power = np.ceil(np.log(sum(initial_point)) / np.log(10))
constraints = [NonlinearConstraint(constraint, -1.0, 1.0),
# LinearConstraint(np.ones((num_channels,)), lb = -50*calc_overall_bounds(channels_list), ub = 50*calc_overall_bounds(channels_list))
]
res = minimize(
lambda x: sum(x) / 10 ** (power),
bounds=bounds,
x0=initial_point,
constraints=constraints,
method=algo,
options={"maxiter": int(1e5), "xtol": 1000},
)
for channel_name, modified_spends in zip(channels_list, res.x):
self.update(channel_name, modified_spends)
return zip(channels_list, res.x)
def optimize(self, spends_percent, channels_list):
# channels_list = self.channels.keys()
num_channels = len(channels_list)
spends_constant = []
spends_constraint = 0.0
for channel_name in channels_list:
# spends_constraint += self.channels[channel_name].modified_total_spends
spends_constant.append(self.channels[channel_name].conversion_rate)
# # # # print(spends_constant)
spends_constraint += (
self.channels[channel_name].actual_total_spends+ self.channels[channel_name].delta_spends #st.session_state["total_spends_change_abs_slider_options"]
)
# # # # print("delta spends",self.channels[channel_name].delta_spends)
# spends_constraint = spends_constraint * (1 + spends_percent / 100)
constraint= LinearConstraint(np.ones((num_channels,)), lb = spends_constraint, ub = spends_constraint)
# constraint = LinearConstraint(
# np.array(spends_constant),
# lb=spends_constraint,
# ub=spends_constraint,
# )
bounds = []
old_spends = []
for channel_name in channels_list:
_channel_class = self.channels[channel_name]
channel_bounds = _channel_class.bounds
channel_actual_total_spends = _channel_class.actual_total_spends + _channel_class.delta_spends
# * (
# (1 + _channel_class.delta_spends / 100)
# )
old_spends.append(channel_actual_total_spends)
# bounds.append((1+ channel_bounds / 100) * channel_actual_total_spends)
# lb = (1- int(_channel_class.channel_bounds_min) / 100) * _channel_class.actual_total_spends
# ub = (1+ int(_channel_class.channel_bounds_max) / 100) * _channel_class.actual_total_spends
import streamlit as st
if "overall_lower_bound" in st.session_state:
try:
lower_val = float(st.session_state["overall_lower_bound"])
except (ValueError, TypeError):
lower_val = 30.0
if "overall_upper_bound" in st.session_state:
try:
upper_val = float(st.session_state["overall_upper_bound"])
except (ValueError, TypeError):
upper_val = 30.0
lb = (1 - float(lower_val) / 100) * _channel_class.actual_total_spends
ub = (1 + float(upper_val) / 100) * _channel_class.actual_total_spends
bounds.append((lb,ub))
# # # # print("aaaaaa")
# # # print((_channel_class.channel_bounds_max,_channel_class.channel_bounds_min))
# _channel_class.channel_bounds_min
# _channel_class.channel_bounds_max
def cost_func1(channel,x):
response_curve_params = pd.read_excel("response_curves_parameters.xlsx",index_col = "channel")
param_dicts = {col: response_curve_params[col].to_dict() for col in response_curve_params.columns}
Kd= param_dicts["Kd"][channel]
n= param_dicts["n"][channel]
x_min= param_dicts["x_min"][channel]
x_max= param_dicts["x_max"][channel]
y_min= param_dicts["y_min"][channel]
y_max= param_dicts['y_max'][channel]
division_parameter = param_dicts['num_pos_obsv'][channel]
x_inp = ( x/division_parameter- x_min) / (x_max - x_min)
# # # # print(x_inp)
x_out = x_inp**n / (Kd**n + x_inp**n)
x_val_inv = (x_out*x_max + (1 - x_out) * x_min)
sales = (x_val_inv*y_min/y_max)*division_parameter
if np.isnan(sales):
# # # # print(sales,channel)
sales = 0
# # # # print(sales,channel)
return sales
def objective_function(x):
sales = 0
it = 0
for channel_name, modified_spends in zip(channels_list, x):
# sales = sales + cost_func1(channel_name,modified_spends)
# # print(channel_name, modified_spends,cost_func1(channel_name, modified_spends))
it+=1
self.update(channel_name, modified_spends)
# # # # print(self.modified_total_sales)
# # # # print(channel_name, modified_spends)
return -1 * self.modified_total_sales
# # # # print(bounds)
# # # # # print("$"*100)
res = minimize(
lambda x: objective_function(x)/1e3,
method="trust-constr",
x0=old_spends,
constraints=constraint,
bounds=bounds,
options={"maxiter": int(1e7), "xtol": 100},
)
for channel_name, modified_spends in zip(channels_list, res.x):
# # # # print("aaaaaaaaaaaaaa")
self.update(channel_name, modified_spends)
# # # # print(channel_name, modified_spends,cost_func1(channel_name, modified_spends))
# # print(it)
return zip(channels_list, res.x)
def hill_equation(self,x, Kd, n):
return x**n / (Kd**n + x**n)
# def spends_optimisation(self, spends_percent,channels_list):
# m = GEKKO(remote=False)
# # Define variables
# # Initialize 13 variables with specific bounds
# response_curve_params = pd.read_excel(r"C:\Users\PragyaJatav\Downloads\Untitled Folder 2\simulator uploaded - Copy\Simulator-UOPX\response_curves_parameters.xlsx",index_col = "channel")
# param_dicts = {col: response_curve_params[col].to_dict() for col in response_curve_params.columns}
# initial_values = list(param_dicts["x_min"].values())
# current_spends = list(param_dicts["current_spends"].values())
# lower_bounds = list(param_dicts["x_min"].values())
# num_channels = len(channels_list)
# x_vars=[]
# x_vars = [m.Var(value=param_dicts["current_spends"][_], lb=param_dicts["x_min"][_]*104, ub=5*param_dicts["current_spends"][_]) for _ in channels_list]
# # # # # print(x_vars)
# # x_vars,lower_bounds
# # Define the objective function to minimize
# cost = 0
# spends = 0
# i = 0
# for i,c in enumerate(channels_list):
# # # # # # print(c)
# # # # # # print(x_vars[i])
# cost = cost + (self.cost_func(c, x_vars[i]))
# spends = spends +x_vars[i]
# m.Maximize(cost)
# # Define constraints
# m.Equation(spends == sum(current_spends)*(1 + spends_percent / 100))
# m.Equation(spends <= sum(current_spends)*0.5)
# m.Equation(spends >= sum(current_spends)*1.5)
# m.solve(disp=True)
# for i, var in enumerate(x_vars):
# # # # # print(f"x{i+1} = {var.value[0]}")
# for channel_name, modified_spends in zip(channels_list, x_vars):
# self.update(channel_name, modified_spends.value[0])
# return zip(channels_list, x_vars)
def save(self):
details = {}
actual_list = []
modified_list = []
data = {}
channel_data = []
summary_rows = []
actual_list.append(
{
"name": "Total",
"Spends": self.actual_total_spends,
"Sales": self.actual_total_sales,
}
)
modified_list.append(
{
"name": "Total",
"Spends": self.modified_total_spends,
"Sales": self.modified_total_sales,
}
)
for channel in self.channels.values():
name_mod = channel.name.replace("_", " ")
if name_mod.lower().endswith(" imp"):
name_mod = name_mod.replace("Imp", " Impressions")
summary_rows.append(
[
name_mod,
channel.actual_total_spends,
channel.modified_total_spends,
channel.actual_total_sales,
channel.modified_total_sales,
round(channel.actual_total_sales / channel.actual_total_spends, 2),
round(
channel.modified_total_sales / channel.modified_total_spends,
2,
),
channel.get_marginal_roi("actual"),
channel.get_marginal_roi("modified"),
]
)
data[channel.name] = channel.modified_spends
data["Date"] = channel.dates
data["Sales"] = (
data.get("Sales", np.zeros((len(channel.dates),)))
+ channel.modified_sales
)
actual_list.append(
{
"name": channel.name,
"Spends": channel.actual_total_spends,
"Sales": channel.actual_total_sales,
"ROI": round(
channel.actual_total_sales / channel.actual_total_spends, 2
),
}
)
modified_list.append(
{
"name": channel.name,
"Spends": channel.modified_total_spends,
"Sales": channel.modified_total_sales,
"ROI": round(
channel.modified_total_sales / channel.modified_total_spends,
2,
),
"Marginal ROI": channel.get_marginal_roi("modified"),
}
)
channel_data.append(
{
"channel": channel.name,
"spends_act": channel.actual_total_spends,
"spends_mod": channel.modified_total_spends,
"sales_act": channel.actual_total_sales,
"sales_mod": channel.modified_total_sales,
}
)
summary_rows.append(
[
"Total",
self.actual_total_spends,
self.modified_total_spends,
self.actual_total_sales,
self.modified_total_sales,
round(self.actual_total_sales / self.actual_total_spends, 2),
round(self.modified_total_sales / self.modified_total_spends, 2),
0.0,
0.0,
]
)
details["Actual"] = actual_list
details["Modified"] = modified_list
columns_index = pd.MultiIndex.from_product(
[[""], ["Channel"]], names=["first", "second"]
)
columns_index = columns_index.append(
pd.MultiIndex.from_product(
[["Spends", "NRPU", "ROI", "MROI"], ["Actual", "Simulated"]],
names=["first", "second"],
)
)
details["Summary"] = pd.DataFrame(summary_rows, columns=columns_index)
data_df = pd.DataFrame(data)
channel_list = list(self.channels.keys())
data_df = data_df[["Date", *channel_list, "Sales"]]
details["download"] = {
"data_df": data_df,
"channels_df": pd.DataFrame(channel_data),
"total_spends_act": self.actual_total_spends,
"total_sales_act": self.actual_total_sales,
"total_spends_mod": self.modified_total_spends,
"total_sales_mod": self.modified_total_sales,
}
return details
@classmethod
def from_dict(cls, attr_dict):
channels_list = attr_dict["channels"]
channels = {
channel["name"]: class_from_dict(channel) for channel in channels_list
}
return Scenario(
name=attr_dict["name"],
channels=channels,
constant=attr_dict["constant"],
correction=attr_dict["correction"],
)