Spaces:
Build error
Build error
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from sklearn.preprocessing import LabelEncoder, StandardScaler | |
from sklearn.model_selection import train_test_split | |
import tensorflow as tf | |
from tensorflow.keras.models import Sequential, Model | |
from tensorflow.keras.layers import LSTM, Dense, Input, MultiHeadAttention, LayerNormalization, GlobalAveragePooling1D | |
from tensorflow.keras.optimizers import Adam | |
import joblib | |
import os | |
import openai | |
from stable_baselines3 import PPO | |
from stable_baselines3.common.vec_env import DummyVecEnv | |
import gym | |
from gym import spaces | |
# Set page config | |
st.set_page_config(page_title="Advanced Dynamic Game Pricing App", layout="wide") | |
# OpenAI API key | |
openai.api_key = "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" | |
# Function to load or create data | |
def load_data(): | |
if os.path.exists('game_data.csv'): | |
return pd.read_csv('game_data.csv') | |
else: | |
# Sample dataset with time series data | |
data = { | |
'game_id': np.repeat(range(1, 21), 50), | |
'date': np.tile(pd.date_range(start='2020-01-01', periods=50), 20), | |
'genre': np.repeat(np.random.choice(['RPG', 'FPS', 'Strategy', 'Puzzle', 'Sports'], 20), 50), | |
'region': np.repeat(np.random.choice(['Africa', 'NA', 'EU', 'Asia', 'SA'], 20), 50), | |
'demand_index': np.random.uniform(0.1, 1.0, 1000), | |
'competitor_price': np.random.uniform(20, 60, 1000), | |
'past_sales': np.random.randint(100, 1000, 1000), | |
'price': np.random.uniform(25, 65, 1000) | |
} | |
df = pd.DataFrame(data) | |
df.to_csv('game_data.csv', index=False) | |
return df | |
# Load data | |
df = load_data() | |
# LSTM Model | |
def create_lstm_model(input_shape): | |
model = Sequential([ | |
LSTM(64, return_sequences=True, input_shape=input_shape), | |
LSTM(32), | |
Dense(1) | |
]) | |
model.compile(optimizer='adam', loss='mse') | |
return model | |
# Transformer Model | |
def create_transformer_model(input_shape): | |
inputs = Input(shape=input_shape) | |
x = transformer_encoder(inputs, head_size=256, num_heads=4, ff_dim=4, dropout=0.1) | |
x = GlobalAveragePooling1D()(x) | |
outputs = Dense(1)(x) | |
model = Model(inputs, outputs) | |
# Compile the model | |
model.compile(optimizer='adam', loss='mse') | |
return model | |
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0): | |
x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs) | |
x = LayerNormalization(epsilon=1e-6)(x) | |
res = x + inputs | |
x = Dense(ff_dim, activation="relu")(res) | |
x = Dense(inputs.shape[-1])(x) | |
return LayerNormalization(epsilon=1e-6)(x + res) | |
# RL Environment | |
class PricingEnv(gym.Env): | |
def __init__(self, data): | |
super(PricingEnv, self).__init__() | |
self.data = data | |
self.current_step = 0 | |
self.max_steps = len(data) - 1 | |
self.action_space = gym.spaces.Box(low=0, high=100, shape=(1,), dtype=np.float32) | |
self.observation_space = gym.spaces.Box(low=0, high=np.inf, shape=(6,), dtype=np.float32) | |
def step(self, action): | |
reward = self._get_reward(action) | |
self.current_step += 1 | |
done = self.current_step >= self.max_steps | |
obs = self._get_observation() | |
return obs, reward, done, {} # Removed the 'truncated' flag for compatibility | |
def reset(self): | |
self.current_step = 0 | |
return self._get_observation() | |
def _get_observation(self): | |
if self.current_step > self.max_steps: | |
# If we've gone past the end of the data, return the last valid observation | |
step = self.max_steps | |
else: | |
step = self.current_step | |
obs = self.data.iloc[step][['demand_index', 'competitor_price', 'past_sales', 'genre_encoded', 'region_encoded']].values | |
return np.append(obs, step) | |
def _get_reward(self, action): | |
if self.current_step > self.max_steps: | |
return 0 # Or some other appropriate value for going out of bounds | |
price = action[0] | |
actual_price = self.data.iloc[self.current_step]['price'] | |
return -abs(price - actual_price) | |
# Function to get LLM analysis | |
def get_llm_analysis(game_info, market_info): | |
prompt = f""" | |
Analyze the following game and market information for pricing strategy: | |
Game Information: | |
{game_info} | |
Market Information: | |
{market_info} | |
Based on this information, suggest a pricing strategy and any factors that might influence the game's price. | |
Provide your analysis in a structured format with clear recommendations. | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-4", | |
messages=[ | |
{"role": "system", "content": "You are an expert in game pricing and market trends."}, | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=300, | |
n=1, | |
stop=None, | |
temperature=0.7, | |
) | |
return response['choices'][0]['message']['content'] | |
# Sidebar for navigation | |
page = st.sidebar.selectbox("Choose a page", ["Data Explorer", "Model Training", "Price Prediction"]) | |
if page == "Data Explorer": | |
st.title("Data Explorer") | |
st.write(df) | |
st.subheader("Data Statistics") | |
st.write(df.describe()) | |
st.subheader("Data Visualization") | |
fig, ax = plt.subplots(1, 2, figsize=(15, 5)) | |
ax[0].scatter(df['competitor_price'], df['price']) | |
ax[0].set_xlabel('Competitor Price') | |
ax[0].set_ylabel('Price') | |
ax[0].set_title('Competitor Price vs Price') | |
ax[1].scatter(df['demand_index'], df['price']) | |
ax[1].set_xlabel('Demand Index') | |
ax[1].set_ylabel('Price') | |
ax[1].set_title('Demand Index vs Price') | |
st.pyplot(fig) | |
elif page == "Model Training": | |
st.title("Model Training") | |
# Data preprocessing | |
le_genre = LabelEncoder() | |
df['genre_encoded'] = le_genre.fit_transform(df['genre']) | |
le_region = LabelEncoder() | |
df['region_encoded'] = le_region.fit_transform(df['region']) | |
features = ['genre_encoded', 'region_encoded', 'demand_index', 'competitor_price', 'past_sales'] | |
X = df[features] | |
y = df['price'] | |
scaler = StandardScaler() | |
X_scaled = scaler.fit_transform(X) | |
# Reshape data for LSTM | |
X_lstm = X_scaled.reshape((X_scaled.shape[0], 1, X_scaled.shape[1])) | |
# Split the data | |
X_train, X_test, y_train, y_test = train_test_split(X_lstm, y, test_size=0.2, random_state=42) | |
# Model training | |
if st.button("Train Models"): | |
with st.spinner("Training LSTM model..."): | |
lstm_model = create_lstm_model((1, X_train.shape[2])) | |
lstm_history = lstm_model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=0) | |
with st.spinner("Training Transformer model..."): | |
transformer_model = create_transformer_model((1, X_train.shape[2])) | |
transformer_history = transformer_model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, verbose=0) | |
with st.spinner("Training RL model..."): | |
env = DummyVecEnv([lambda: PricingEnv(df)]) | |
rl_model = PPO("MlpPolicy", env, verbose=0) | |
rl_model.learn(total_timesteps=10000) | |
st.success("All models trained successfully!") | |
# Plot training history | |
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5)) | |
ax1.plot(lstm_history.history['loss'], label='LSTM Training Loss') | |
ax1.plot(lstm_history.history['val_loss'], label='LSTM Validation Loss') | |
ax1.set_xlabel('Epoch') | |
ax1.set_ylabel('Loss') | |
ax1.legend() | |
ax1.set_title('LSTM Training History') | |
ax2.plot(transformer_history.history['loss'], label='Transformer Training Loss') | |
ax2.plot(transformer_history.history['val_loss'], label='Transformer Validation Loss') | |
ax2.set_xlabel('Epoch') | |
ax2.set_ylabel('Loss') | |
ax2.legend() | |
ax2.set_title('Transformer Training History') | |
st.pyplot(fig) | |
# Save models and preprocessing objects | |
lstm_model.save('lstm_model.h5') | |
transformer_model.save('transformer_model.h5') | |
rl_model.save('rl_model') | |
joblib.dump(scaler, 'scaler.pkl') | |
joblib.dump(le_genre, 'le_genre.pkl') | |
joblib.dump(le_region, 'le_region.pkl') | |
st.info("Models and preprocessing objects saved.") | |
elif page == "Price Prediction": | |
st.title("Price Prediction") | |
# Load saved models and objects | |
if os.path.exists('lstm_model.h5') and os.path.exists('transformer_model.h5') and os.path.exists('rl_model.zip'): | |
lstm_model = tf.keras.models.load_model('lstm_model.h5') | |
transformer_model = tf.keras.models.load_model('transformer_model.h5') | |
rl_model = PPO.load('rl_model') | |
scaler = joblib.load('scaler.pkl') | |
le_genre = joblib.load('le_genre.pkl') | |
le_region = joblib.load('le_region.pkl') | |
# User input | |
genre = st.selectbox("Select Genre", le_genre.classes_) | |
region = st.selectbox("Select Region", le_region.classes_) | |
demand_index = st.slider("Demand Index", 0.1, 1.0, 0.5) | |
competitor_price = st.slider("Competitor Price", 20.0, 60.0, 40.0) | |
past_sales = st.slider("Past Sales", 100, 1000, 500) | |
# Prepare input for prediction | |
input_data = np.array([ | |
le_genre.transform([genre])[0], | |
le_region.transform([region])[0], | |
demand_index, | |
competitor_price, | |
past_sales | |
]) | |
input_scaled = scaler.transform(input_data.reshape(1, -1)).flatten() | |
input_with_step = np.append(input_scaled, 0) # Add a step value (0 for prediction) | |
# Make predictions | |
if st.button("Predict Price"): | |
lstm_price = lstm_model.predict(input_scaled.reshape(1, 1, -1))[0][0] | |
transformer_price = transformer_model.predict(input_scaled.reshape(1, 1, -1))[0][0] | |
rl_price = rl_model.predict(input_with_step)[0] | |
# Extract the single float value from the RL prediction | |
rl_price_value = rl_price.item() if isinstance(rl_price, np.ndarray) else rl_price | |
# Display results | |
st.success(f"LSTM Predicted Price: ${lstm_price:.2f}") | |
st.success(f"Transformer Predicted Price: ${transformer_price:.2f}") | |
st.success(f"RL Predicted Price: ${rl_price_value:.2f}") | |
# Get LLM analysis | |
game_info = f"Genre: {genre}, Region: {region}, Past Sales: {past_sales}" | |
market_info = f"Demand Index: {demand_index}, Competitor Price: {competitor_price}" | |
llm_analysis = get_llm_analysis(game_info, market_info) | |
st.subheader("LLM Pricing Analysis:") | |
st.write(llm_analysis) | |
# Visualize the predictions | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
models = ['LSTM', 'Transformer', 'RL', 'Competitor'] | |
prices = [lstm_price, transformer_price, rl_price, competitor_price] | |
ax.bar(models, prices) | |
ax.set_ylabel('Price ($)') | |
ax.set_title('Price Comparison') | |
st.pyplot(fig) | |
st.info("Consider all model predictions and the LLM analysis to make a final pricing decision.") | |
else: | |
st.warning("Please train the models first!") | |
st.sidebar.info("This app demonstrates advanced dynamic pricing for game codes using LSTMs, Transformers, RL, and LLM analysis.") |