|
import time |
|
import streamlit as st |
|
import numpy as np |
|
from pathlib import Path |
|
from experiments.gmm_dataset import GeneralizedGaussianMixture |
|
import plotly.graph_objects as go |
|
from plotly.subplots import make_subplots |
|
from typing import List, Tuple |
|
import torch |
|
import os |
|
import sys |
|
import matplotlib.pyplot as plt |
|
|
|
|
|
torch.classes.__path__ = [os.path.join(torch.__path__[0], torch.classes.__file__ or "")] |
|
|
|
|
|
pykan_path = Path(__file__).parent.parent / 'third_party' / 'pykan' |
|
sys.path.append(str(pykan_path)) |
|
|
|
|
|
from kan import KAN |
|
from kan.utils import create_dataset, ex_round |
|
|
|
|
|
torch.set_default_dtype(torch.float64) |
|
|
|
def show_kan_prediction(model, device, samples, placeholder, phase_name): |
|
"""显示KAN的预测结果""" |
|
|
|
x = np.linspace(-5, 5, 100) |
|
y = np.linspace(-5, 5, 100) |
|
X, Y = np.meshgrid(x, y) |
|
xy = np.column_stack((X.ravel(), Y.ravel())) |
|
|
|
|
|
grid_points = torch.from_numpy(xy).to(device) |
|
with torch.no_grad(): |
|
Z_kan = model(grid_points).cpu().numpy().reshape(X.shape) |
|
|
|
|
|
fig_kan = make_subplots( |
|
rows=1, cols=2, |
|
specs=[[{'type': 'surface'}, {'type': 'contour'}]], |
|
subplot_titles=('KAN预测的3D概率密度曲面', 'KAN预测的等高线图') |
|
) |
|
|
|
|
|
surface_kan = go.Surface( |
|
x=X, y=Y, z=Z_kan, |
|
colorscale='viridis', |
|
showscale=True, |
|
colorbar=dict(x=0.45) |
|
) |
|
fig_kan.add_trace(surface_kan, row=1, col=1) |
|
|
|
|
|
contour_kan = go.Contour( |
|
x=x, y=y, z=Z_kan, |
|
colorscale='viridis', |
|
showscale=True, |
|
colorbar=dict(x=1.0), |
|
contours=dict( |
|
showlabels=True, |
|
labelfont=dict(size=12) |
|
) |
|
) |
|
fig_kan.add_trace(contour_kan, row=1, col=2) |
|
|
|
|
|
if samples is not None: |
|
samples = samples.cpu().numpy() if torch.is_tensor(samples) else samples |
|
fig_kan.add_trace( |
|
go.Scatter( |
|
x=samples[:, 0], y=samples[:, 1], |
|
mode='markers', |
|
marker=dict( |
|
size=8, |
|
color='yellow', |
|
line=dict(color='black', width=1) |
|
), |
|
name='训练点' |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
fig_kan.update_layout( |
|
title='KAN预测分布', |
|
showlegend=True, |
|
width=1200, |
|
height=600, |
|
scene=dict( |
|
xaxis_title='X', |
|
yaxis_title='Y', |
|
zaxis_title='密度' |
|
) |
|
) |
|
|
|
|
|
fig_kan.update_xaxes(title_text='X', row=1, col=2) |
|
fig_kan.update_yaxes(title_text='Y', row=1, col=2) |
|
|
|
|
|
|
|
placeholder.plotly_chart(fig_kan, |
|
use_container_width=False, |
|
key=f"kan_plot_{phase_name}_{time.time()}") |
|
|
|
def create_gmm_plot(dataset, centers, K, samples=None): |
|
"""创建GMM分布的可视化图形""" |
|
|
|
x = np.linspace(-5, 5, 100) |
|
y = np.linspace(-5, 5, 100) |
|
X, Y = np.meshgrid(x, y) |
|
xy = np.column_stack((X.ravel(), Y.ravel())) |
|
|
|
|
|
Z = dataset.pdf(xy).reshape(X.shape) |
|
|
|
|
|
fig = make_subplots( |
|
rows=1, cols=2, |
|
specs=[[{'type': 'surface'}, {'type': 'contour'}]], |
|
subplot_titles=('3D概率密度曲面', '等高线图与分量中心') |
|
) |
|
|
|
|
|
surface = go.Surface( |
|
x=X, y=Y, z=Z, |
|
colorscale='viridis', |
|
showscale=True, |
|
colorbar=dict(x=0.45) |
|
) |
|
fig.add_trace(surface, row=1, col=1) |
|
|
|
|
|
contour = go.Contour( |
|
x=x, y=y, z=Z, |
|
colorscale='viridis', |
|
showscale=True, |
|
colorbar=dict(x=1.0), |
|
contours=dict( |
|
showlabels=True, |
|
labelfont=dict(size=12) |
|
) |
|
) |
|
fig.add_trace(contour, row=1, col=2) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=centers[:K, 0], y=centers[:K, 1], |
|
mode='markers+text', |
|
marker=dict(size=10, color='red'), |
|
text=[f'C{i+1}' for i in range(K)], |
|
textposition="top center", |
|
name='分量中心' |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
if samples is not None: |
|
fig.add_trace( |
|
go.Scatter( |
|
x=samples[:, 0], y=samples[:, 1], |
|
mode='markers+text', |
|
marker=dict( |
|
size=8, |
|
color='yellow', |
|
line=dict(color='black', width=1) |
|
), |
|
text=[f'S{i+1}' for i in range(len(samples))], |
|
textposition="bottom center", |
|
name='采样点' |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
fig.update_layout( |
|
title='广义高斯混合分布', |
|
showlegend=True, |
|
width=1200, |
|
height=600, |
|
scene=dict( |
|
xaxis_title='X', |
|
yaxis_title='Y', |
|
zaxis_title='密度' |
|
) |
|
) |
|
|
|
|
|
fig.update_xaxes(title_text='X', row=1, col=2) |
|
fig.update_yaxes(title_text='Y', row=1, col=2) |
|
|
|
return fig |
|
|
|
def train_kan(samples, gmm_dataset, device='cuda'): |
|
"""训练KAN网络""" |
|
if torch.cuda.is_available() and device == 'cuda': |
|
device = torch.device('cuda') |
|
else: |
|
device = torch.device('cpu') |
|
st.info(f"使用设备: {device} 训练网络") |
|
|
|
|
|
samples = torch.from_numpy(samples).to(device) |
|
|
|
labels = torch.from_numpy(gmm_dataset.pdf(samples.cpu().numpy())).reshape(-1, 1).to(device) |
|
|
|
|
|
model = KAN(width=[2,5,1], grid=3, k=3, seed=42, device=device) |
|
|
|
train_size = int(0.8 * samples.shape[0]) |
|
train_dataset = { |
|
'train_input': samples[:train_size], |
|
'train_label': labels[:train_size], |
|
'test_input': samples[train_size:], |
|
'test_label': labels[train_size:] |
|
} |
|
|
|
|
|
|
|
|
|
|
|
st.write("网络图形结构:") |
|
kan_network_arch_placeholder = st.empty() |
|
|
|
|
|
progress_container = st.container() |
|
|
|
|
|
total_steps = 50 |
|
steps_per_update = 10 |
|
|
|
def calculate_error(model, x, y): |
|
"""计算预测误差""" |
|
with torch.no_grad(): |
|
pred = model(x) |
|
return torch.mean((pred - y) ** 2).item() |
|
|
|
def train_phase(phase_name, steps, lamb=None, show_plot=True): |
|
with progress_container: |
|
progress_bar = st.progress(0) |
|
status_text = st.empty() |
|
|
|
for step in range(0, steps, steps_per_update): |
|
|
|
if lamb is not None: |
|
model.fit(train_dataset, opt="LBFGS", steps=steps_per_update, lamb=lamb) |
|
else: |
|
model.fit(train_dataset, opt="LBFGS", steps=steps_per_update) |
|
|
|
|
|
progress = (step + steps_per_update) / steps |
|
progress_bar.progress(progress) |
|
|
|
|
|
train_error = calculate_error(model, train_dataset['train_input'], train_dataset['train_label']) |
|
test_error = calculate_error(model, train_dataset['test_input'], train_dataset['test_label']) |
|
|
|
status_text.markdown(f""" |
|
### {phase_name} |
|
| 项目 | 值 | |
|
|:---|:---| |
|
| 进度 | {progress:.0%} | |
|
| 训练误差 | {train_error:.8f} | |
|
| 测试误差 | {test_error:.8f} | |
|
""") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if show_plot: |
|
try: |
|
model.plot() |
|
kan_fig = plt.gcf() |
|
|
|
|
|
|
|
kan_network_arch_placeholder.pyplot(kan_fig, use_container_width=False) |
|
|
|
except Exception as e: |
|
if step == 0: |
|
st.warning(f"注意:网络结构图显示失败 ({str(e)})") |
|
|
|
|
|
|
|
show_kan_prediction(model, device, samples, kan_distribution_plot_placeholder, phase_name) |
|
|
|
with progress_container: |
|
st.markdown("#### 训练过程") |
|
error_text = st.empty() |
|
|
|
|
|
|
|
with st.spinner("参数调整中..."): |
|
train_phase("第一阶段: 正则化训练", total_steps, lamb=0.001, show_plot=True) |
|
|
|
|
|
with st.spinner("正在进行网络剪枝优化..."): |
|
model = model.prune() |
|
progress_container.info("网络剪枝完成") |
|
|
|
with st.spinner("参数调整中..."): |
|
train_phase("第二阶段: 剪枝适应性训练", total_steps, show_plot=True) |
|
|
|
with st.spinner("正在进行网格精细化..."): |
|
model = model.refine(10) |
|
progress_container.info("网格精细化完成") |
|
|
|
with st.spinner("参数调整中..."): |
|
train_phase("第三阶段: 网格适应性训练", total_steps, show_plot=True) |
|
|
|
with st.spinner("符号简化中..."): |
|
|
|
|
|
lib = ['x','x^2','x^3','x^4','exp','log','sqrt','tanh','sin','abs'] |
|
model.auto_symbolic(lib=lib) |
|
|
|
progress_container.info("符号简化完成") |
|
|
|
with st.spinner("参数调整中..."): |
|
train_phase("第四阶段:符号适应性训练", total_steps, show_plot=True) |
|
|
|
from kan.utils import ex_round |
|
from sympy import latex |
|
s= ex_round(model.symbolic_formula()[0][0],4) |
|
|
|
st.write("网络公式:") |
|
st.latex(latex(s)) |
|
|
|
|
|
train_error = calculate_error(model, train_dataset['train_input'], train_dataset['train_label']) |
|
test_error = calculate_error(model, train_dataset['test_input'], train_dataset['test_label']) |
|
error_text.markdown(f""" |
|
#### 训练结果 |
|
- 训练集误差: {train_error:.6f} |
|
- 测试集误差: {test_error:.6f} |
|
""") |
|
|
|
progress_container.success("🎉 训练完成!") |
|
return model |
|
|
|
def init_session_state(): |
|
"""初始化session state""" |
|
if 'prev_K' not in st.session_state: |
|
st.session_state.prev_K = 3 |
|
if 'p' not in st.session_state: |
|
st.session_state.p = 2.0 |
|
if 'centers' not in st.session_state: |
|
st.session_state.centers = np.array([[-2, -2], [0, 0], [2, 2]], dtype=np.float64) |
|
if 'scales' not in st.session_state: |
|
st.session_state.scales = np.array([[0.3, 0.3], [0.2, 0.2], [0.4, 0.4]], dtype=np.float64) |
|
if 'weights' not in st.session_state: |
|
st.session_state.weights = np.ones(3, dtype=np.float64) / 3 |
|
if 'sample_points' not in st.session_state: |
|
st.session_state.sample_points = None |
|
if 'kan_model' not in st.session_state: |
|
st.session_state.kan_model = None |
|
|
|
def create_default_parameters(K: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
|
"""创建默认参数""" |
|
|
|
x = np.linspace(-3, 3, K) |
|
y = np.linspace(-3, 3, K) |
|
centers = np.column_stack((x, y)) |
|
|
|
|
|
scales = np.ones((K, 2), dtype=np.float64) * 3 |
|
weights = np.random.random(size=K).astype(np.float64) |
|
weights /= weights.sum() |
|
return centers, scales, weights |
|
|
|
def generate_latex_formula(p: float, K: int, centers: np.ndarray, |
|
scales: np.ndarray, weights: np.ndarray) -> str: |
|
"""生成LaTeX公式""" |
|
formula = r"P(x) = \sum_{k=1}^{" + str(K) + r"} \pi_k P_{\theta_k}(x) \\" |
|
formula += r"P_{\theta_k}(x) = \eta_k \exp(-s_k d_k(x)) = \frac{p}{2\alpha_k \Gamma(1/p) }\exp(-\frac{|x-c_k|^p}{\alpha_k^p})= \frac{p}{2\alpha_k \Gamma(1/p) }\exp(-|\frac{x-c_k}{\alpha_k}|^p) \\" |
|
formula += r"\text{where: }" |
|
|
|
for k in range(K): |
|
c = centers[k] |
|
s = scales[k] |
|
w = weights[k] |
|
component = f"P_{{\\theta_{k+1}}}(x) = \\frac{{{p:.1f}}}{{2\\alpha_{k+1} \\Gamma(1/{p:.1f})}}\\exp(-|\\frac{{x-({c[0]:.1f}, {c[1]:.1f})}}{{{s[0]:.1f}, {s[1]:.1f}}}|^{{{p:.1f}}}) \\\\" |
|
formula += component |
|
formula += f"\\pi_{k+1} = {w:.2f} \\\\" |
|
|
|
return formula |
|
|
|
st.set_page_config(page_title="GMM Distribution Visualization", layout="wide") |
|
st.title("广义高斯混合分布可视化") |
|
|
|
|
|
init_session_state() |
|
|
|
|
|
with st.sidebar: |
|
st.header("分布参数") |
|
|
|
|
|
st.session_state.p = st.slider("形状参数 (p)", 0.1, 5.0, st.session_state.p, 0.1, |
|
help="p=1: 拉普拉斯分布, p=2: 高斯分布, p→∞: 均匀分布") |
|
K = st.slider("分量数 (K)", 1, 5, st.session_state.prev_K) |
|
|
|
|
|
if K != st.session_state.prev_K: |
|
centers, scales, weights = create_default_parameters(K) |
|
st.session_state.centers = centers |
|
st.session_state.scales = scales |
|
st.session_state.weights = weights |
|
st.session_state.prev_K = K |
|
|
|
|
|
st.subheader("高级设置") |
|
show_advanced = st.checkbox("显示分量参数", value=False) |
|
|
|
if show_advanced: |
|
|
|
centers_list: List[List[float]] = [] |
|
scales_list: List[List[float]] = [] |
|
weights_list: List[float] = [] |
|
|
|
for k in range(K): |
|
st.write(f"分量 {k+1}") |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
cx = st.number_input(f"中心X_{k+1}", -5.0, 5.0, float(st.session_state.centers[k][0]), 0.1) |
|
cy = st.number_input(f"中心Y_{k+1}", -5.0, 5.0, float(st.session_state.centers[k][1]), 0.1) |
|
with col2: |
|
sx = st.number_input(f"尺度X_{k+1}", 0.1, 3.0, float(st.session_state.scales[k][0]), 0.1) |
|
sy = st.number_input(f"尺度Y_{k+1}", 0.1, 3.0, float(st.session_state.scales[k][1]), 0.1) |
|
w = st.slider(f"权重_{k+1}", 0.0, 1.0, float(st.session_state.weights[k]), 0.1) |
|
|
|
centers_list.append([cx, cy]) |
|
scales_list.append([sx, sy]) |
|
weights_list.append(w) |
|
|
|
centers = np.array(centers_list, dtype=np.float64) |
|
scales = np.array(scales_list, dtype=np.float64) |
|
weights = np.array(weights_list, dtype=np.float64) |
|
weights = weights / weights.sum() |
|
|
|
st.session_state.centers = centers |
|
st.session_state.scales = scales |
|
st.session_state.weights = weights |
|
else: |
|
centers = st.session_state.centers |
|
scales = st.session_state.scales |
|
weights = st.session_state.weights |
|
|
|
|
|
st.subheader("采样设置") |
|
n_samples = st.slider("采样点数", 5, 1000, 100) |
|
if st.button("重新采样"): |
|
|
|
gmm = GeneralizedGaussianMixture( |
|
D=2, |
|
K=K, |
|
p=st.session_state.p, |
|
centers=centers[:K], |
|
scales=scales[:K], |
|
weights=weights[:K] |
|
) |
|
|
|
samples, _ = gmm.generate_samples(n_samples) |
|
st.session_state.sample_points = samples |
|
st.session_state.kan_model = None |
|
|
|
|
|
dataset = GeneralizedGaussianMixture( |
|
D=2, |
|
K=K, |
|
p=st.session_state.p, |
|
centers=centers[:K], |
|
scales=scales[:K], |
|
weights=weights[:K] |
|
) |
|
|
|
|
|
x = np.linspace(-5, 5, 100) |
|
y = np.linspace(-5, 5, 100) |
|
X, Y = np.meshgrid(x, y) |
|
xy = np.column_stack((X.ravel(), Y.ravel())) |
|
|
|
|
|
Z = dataset.pdf(xy).reshape(X.shape) |
|
|
|
|
|
fig = make_subplots( |
|
rows=1, cols=2, |
|
specs=[[{'type': 'surface'}, {'type': 'contour'}]], |
|
subplot_titles=('3D概率密度曲面', '等高线图与分量中心') |
|
) |
|
|
|
|
|
surface = go.Surface( |
|
x=X, y=Y, z=Z, |
|
colorscale='viridis', |
|
showscale=True, |
|
colorbar=dict(x=0.45) |
|
) |
|
fig.add_trace(surface, row=1, col=1) |
|
|
|
|
|
contour = go.Contour( |
|
x=x, y=y, z=Z, |
|
colorscale='viridis', |
|
showscale=True, |
|
colorbar=dict(x=1.0), |
|
contours=dict( |
|
showlabels=True, |
|
labelfont=dict(size=12) |
|
) |
|
) |
|
fig.add_trace(contour, row=1, col=2) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=centers[:K, 0], y=centers[:K, 1], |
|
mode='markers+text', |
|
marker=dict(size=10, color='red'), |
|
text=[f'C{i+1}' for i in range(K)], |
|
textposition="top center", |
|
name='分量中心' |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
if st.session_state.sample_points is not None: |
|
samples = st.session_state.sample_points |
|
|
|
probs = dataset.pdf(samples) |
|
|
|
posteriors = [] |
|
for sample in samples: |
|
component_probs = [ |
|
weights[k] * np.exp(-np.sum(((sample - centers[k]) / scales[k])**st.session_state.p)) |
|
for k in range(K) |
|
] |
|
total = sum(component_probs) |
|
posteriors.append([p/total for p in component_probs]) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=samples[:, 0], y=samples[:, 1], |
|
mode='markers+text', |
|
marker=dict( |
|
size=8, |
|
color='yellow', |
|
line=dict(color='black', width=1) |
|
), |
|
text=[f'S{i+1}' for i in range(len(samples))], |
|
textposition="bottom center", |
|
name='采样点' |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
fig.update_layout( |
|
title='广义高斯混合分布', |
|
showlegend=True, |
|
width=1200, |
|
height=600, |
|
scene=dict( |
|
xaxis_title='X', |
|
yaxis_title='Y', |
|
zaxis_title='密度' |
|
) |
|
) |
|
|
|
|
|
fig.update_xaxes(title_text='X', row=1, col=2) |
|
fig.update_yaxes(title_text='Y', row=1, col=2) |
|
|
|
|
|
st.plotly_chart(fig, use_container_width=False) |
|
|
|
|
|
|
|
if st.session_state.sample_points is not None: |
|
st.markdown("---") |
|
st.subheader("KAN网络训练与预测") |
|
|
|
kan_distribution_plot_placeholder = st.empty() |
|
|
|
|
|
col1, col2, col3 = st.columns([1, 2, 1]) |
|
with col1: |
|
if st.button("拟合KAN", use_container_width=False): |
|
with st.spinner('训练KAN网络中...'): |
|
st.session_state.kan_model = train_kan(st.session_state.sample_points, dataset) |
|
st.balloons() |
|
|
|
with col3: |
|
if st.session_state.kan_model is not None: |
|
if st.button("清除KAN结果", use_container_width=False): |
|
st.session_state.kan_model = None |
|
st.rerun() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
if st.session_state.sample_points is not None: |
|
|
|
samples = st.session_state.sample_points |
|
probs = dataset.pdf(samples) |
|
posteriors = [] |
|
for sample in samples: |
|
component_probs = [ |
|
weights[k] * np.exp(-np.sum(((sample - centers[k]) / scales[k])**st.session_state.p)) |
|
for k in range(K) |
|
] |
|
total = sum(component_probs) |
|
posteriors.append([p/total for p in component_probs]) |
|
|
|
with st.expander("采样点信息"): |
|
|
|
point_data = [] |
|
for i, (sample, prob, post) in enumerate(zip(samples, probs, posteriors)): |
|
row = { |
|
'采样点': f'S{i+1}', |
|
'X坐标': f'{sample[0]:.2f}', |
|
'Y坐标': f'{sample[1]:.2f}', |
|
'概率密度': f'{prob:.4f}' |
|
} |
|
|
|
for k in range(K): |
|
row[f'分量{k+1}后验概率'] = f'{post[k]:.4f}' |
|
point_data.append(row) |
|
|
|
|
|
st.dataframe(point_data) |
|
|
|
|
|
with st.expander("分布参数说明"): |
|
st.markdown(""" |
|
- **形状参数 (p)**:控制分布的形状 |
|
- p = 1: 拉普拉斯分布 |
|
- p = 2: 高斯分布 |
|
- p → ∞: 均匀分布 |
|
- **分量参数**:每个分量由以下参数确定 |
|
- 中心 (μ): 峰值位置,通过X和Y坐标确定 |
|
- 尺度 (α): 分布的展宽程度,X和Y方向可不同 |
|
- 权重 (π): 混合系数,所有分量权重和为1 |
|
""") |
|
|
|
|
|
with st.expander("分布概率密度函数公式"): |
|
st.latex(generate_latex_formula(st.session_state.p, K, centers[:K], scales[:K], weights[:K])) |