agh123's picture
chore: minor tab title correction
edb74b9
"""
Core module for data visualization components.
"""
import streamlit as st
import plotly.express as px
import pandas as pd
from typing import Optional, Dict, List, Set
from ..core.glicko2_ranking import analyze_glicko2_rankings
import plotly.graph_objects as go
import numpy as np
def clean_device_id(device_id: str) -> str:
"""Extract clean device name from normalized ID by removing platform prefix"""
if device_id.startswith("iOS/"):
return device_id[4:] # Remove "iOS/"
return device_id
def get_quant_name(factor: float) -> str:
"""Get human-readable name for quantization factor"""
if pd.isna(factor):
return "Unknown"
if factor >= 1.0:
return "No Quantization (F16/F32)"
quant_map = {
0.8: "[i]Q8_x",
0.6: "[i]Q6_x",
0.5: "[i]Q5_x",
0.4: "[i]Q4_x",
0.3: "[i]Q3_x",
0.2: "[i]Q2_x",
0.1: "[i]Q1_x",
}
return quant_map.get(factor, f"Q{int(factor*10)}_x")
def create_performance_plot(
df: pd.DataFrame, metric: str, title: str, hover_data: List[str] = None
):
"""Create a performance comparison plot"""
if df.empty:
return None
if hover_data is None:
hover_data = [
"CPU Cores",
"Peak Memory (GB)",
]
fig = px.bar(
df,
x="Device",
y=metric,
color="Platform",
title=title,
template="plotly_white",
barmode="group",
hover_data=hover_data,
)
fig.update_layout(
xaxis_title="Device",
yaxis_title="Token/sec" if "Token" in metric else metric,
legend_title="Platform",
plot_bgcolor="white",
height=400,
)
return fig
def filter_dataframe(df: pd.DataFrame, filters: Dict) -> pd.DataFrame:
"""Apply all filters to the dataframe"""
if df.empty:
return df
filtered_df = df.copy()
# Basic filters
if filters["model"] != "All":
filtered_df = filtered_df[filtered_df["Model ID"] == filters["model"]]
if filters["platform"] != "All":
filtered_df = filtered_df[filtered_df["Platform"] == filters["platform"]]
if filters["device"] != "All":
filtered_df = filtered_df[filtered_df["Device"] == filters["device"]]
# Flash Attention filter
if filters["flash_attn"] != "All":
filtered_df = filtered_df[filtered_df["flash_attn"] == filters["flash_attn"]]
# Cache Type filters
if filters["cache_type_k"] != "All":
filtered_df = filtered_df[
filtered_df["cache_type_k"] == filters["cache_type_k"]
]
if filters["cache_type_v"] != "All":
filtered_df = filtered_df[
filtered_df["cache_type_v"] == filters["cache_type_v"]
]
# Range filters
pp_min, pp_max = filters["pp_range"]
if pp_min is not None and pp_max is not None:
pp_values = filtered_df["PP Config"]
filtered_df = filtered_df[(pp_values >= pp_min) & (pp_values <= pp_max)]
tg_min, tg_max = filters["tg_range"]
if tg_min is not None and tg_max is not None:
tg_values = filtered_df["TG Config"]
filtered_df = filtered_df[(tg_values >= tg_min) & (tg_values <= tg_max)]
n_threads_min, n_threads_max = filters["n_threads"]
if n_threads_min is not None and n_threads_max is not None:
n_threads = filtered_df["n_threads"]
filtered_df = filtered_df[
(n_threads >= n_threads_min) & (n_threads <= n_threads_max)
]
n_gpu_layers_min, n_gpu_layers_max = filters["n_gpu_layers"]
if n_gpu_layers_min is not None and n_gpu_layers_max is not None:
n_gpu_layers = filtered_df["n_gpu_layers"]
filtered_df = filtered_df[
(n_gpu_layers >= n_gpu_layers_min) & (n_gpu_layers <= n_gpu_layers_max)
]
# Version filter
if filters.get("Version") != "All" and filters.get("Version"):
filtered_df = filtered_df[filtered_df["Version"] == filters["Version"]]
return filtered_df
def render_leaderboard_table(df: pd.DataFrame, filters: Dict):
"""Render the leaderboard table with grouped and formatted data"""
if df.empty:
st.warning("No data available for the selected filters.")
return
# Apply filters
filtered_df = filter_dataframe(df, filters)
if filtered_df.empty:
st.warning("No data matches the selected filters.")
return
# Define the preferred column order (grouped logically)
column_order = [
# Device Info
"Device",
"Platform",
"CPU Cores",
"Total Memory (GB)",
"Peak Memory (GB)",
"Memory Usage (%)",
# Benchmark Results
"PP Config",
"PP Avg (t/s)",
"PP Std (t/s)",
"TG Config",
"TG Avg (t/s)",
"TG Std (t/s)",
# Model Config
"Model ID",
"Model Size",
"n_threads",
"flash_attn",
"cache_type_k",
"cache_type_v",
"n_context",
"n_batch",
"n_ubatch",
"Version",
]
# Group by selected columns
grouping_cols = filters["grouping"]
if not grouping_cols:
grouping_cols = ["Model ID", "Device", "Platform"] # Default grouping
# Create aggregations (excluding grouping columns)
agg_dict = {
col: agg
for col, agg in {
"Prompt Processing": ["mean", "std"],
"Token Generation": ["mean", "std"],
"Peak Memory (GB)": "mean",
"Total Memory (GB)": "first",
"CPU Cores": "first",
"Model Size": "first",
"Version": lambda x: ", ".join(sorted(set(x))),
"n_gpu_layers": lambda x: ", ".join(sorted(set(str(x)))),
}.items()
if col not in grouping_cols
}
# Group and aggregate
grouped_df = filtered_df.groupby(grouping_cols).agg(agg_dict).reset_index()
# Flatten column names
grouped_df.columns = [
col[0] if col[1] == "" else f"{col[0]} ({col[1]})" for col in grouped_df.columns
]
# Rename columns for display
column_mapping = {
"Prompt Processing (mean)": "PP Avg (t/s)",
"Prompt Processing (std)": "PP Std (t/s)",
"Token Generation (mean)": "TG Avg (t/s)",
"Token Generation (std)": "TG Std (t/s)",
"Memory Usage (%) (mean)": "Memory Usage (%)",
"Peak Memory (GB) (mean)": "Peak Memory (GB)",
"PP Config (first)": "PP Config",
"TG Config (first)": "TG Config",
"Model Size (first)": "Model Size",
"CPU Cores (first)": "CPU Cores",
"Total Memory (GB) (first)": "Total Memory (GB)",
"n_threads (first)": "n_threads",
"flash_attn (first)": "flash_attn",
"cache_type_k (first)": "cache_type_k",
"cache_type_v (first)": "cache_type_v",
"n_context (first)": "n_context",
"n_batch (first)": "n_batch",
"n_ubatch (first)": "n_ubatch",
"Version (<lambda>)": "Version",
}
grouped_df = grouped_df.rename(columns=column_mapping)
# Filter visible columns
visible_cols = filters["visible_columns"]
if visible_cols:
# Map the user-friendly names to actual column names
column_name_mapping = {
"Device": "Device",
"Platform": "Platform",
"CPU Cores": "CPU Cores",
"Total Memory (GB)": "Total Memory (GB)",
"Peak Memory (GB)": "Peak Memory (GB)",
"Memory Usage (%)": "Memory Usage (%)",
"PP Config": "PP Config",
"TG Config": "TG Config",
"Prompt Processing (mean)": "PP Avg (t/s)",
"Token Generation (mean)": "TG Avg (t/s)",
"Prompt Processing (std)": "PP Std (t/s)",
"Token Generation (std)": "TG Std (t/s)",
"Model": "Model ID",
"Model Size": "Model Size",
"Model ID": "Model ID",
"n_threads": "n_threads",
"flash_attn": "flash_attn",
"cache_type_k": "cache_type_k",
"cache_type_v": "cache_type_v",
"n_context": "n_context",
"n_batch": "n_batch",
"n_ubatch": "n_ubatch",
"Version": "Version",
}
# Convert visible columns and grouping columns to their mapped names
mapped_visible = {column_name_mapping.get(col, col) for col in visible_cols}
mapped_grouping = {
column_name_mapping.get(col, col) for col in filters["grouping"]
}
# Combine both sets to get unique columns
all_cols = mapped_visible | mapped_grouping
# Create final display columns list
display_cols = []
# Get all available columns we want to display
available_cols = set(all_cols)
# Add columns in the predefined order
for col in column_order:
if col in available_cols:
display_cols.append(col)
# Add any remaining columns that weren't in our predefined order
remaining_cols = sorted(list(available_cols - set(display_cols)))
display_cols.extend(remaining_cols)
else:
# Default columns if none selected
display_cols = column_order[:8]
# Display the filtered and grouped table
st.markdown("#### 📊 Benchmark Results")
st.dataframe(
grouped_df[display_cols],
use_container_width=True,
height=min(
600, (len(grouped_df) + 1) * 35 + 40
), # Dynamic height based on content
hide_index=False,
column_config={
"Rank": st.column_config.NumberColumn(
"Rank",
help="Device ranking based on performance score",
),
"Device": st.column_config.TextColumn(
"Device",
help="Device brand and model",
),
"Best Score": st.column_config.NumberColumn(
"Score", help="Overall performance score (0-100)", format="%.2f"
),
"Best TG Speed": st.column_config.NumberColumn(
"Best TG Speed (t/s)",
help="Best token generation speed",
format="%.2f",
),
"Best PP Speed": st.column_config.NumberColumn(
"Best PP Speed (t/s)",
help="Best prompt processing speed",
format="%.2f",
),
},
)
def create_device_radar_chart(g2_confident_display: pd.DataFrame, top_n: int = 10):
"""Create a radar chart comparing the top N devices across different performance metrics."""
# Select top N devices
top_devices = g2_confident_display.nlargest(top_n, "Rating")
# Normalize metrics to 0-100 scale for better visualization
metrics = ["Rating", "Token Rating", "Prompt Rating"]
for metric in metrics:
min_val = top_devices[metric].min()
max_val = top_devices[metric].max()
top_devices[f"{metric}_normalized"] = (
(top_devices[metric] - min_val) / (max_val - min_val)
) * 100
# Create radar chart
fig = go.Figure()
# Add a trace for each device
for idx, row in top_devices.iterrows():
fig.add_trace(
go.Scatterpolar(
r=[
row["Rating_normalized"],
row["Token Rating_normalized"],
row["Prompt Rating_normalized"],
row["Rating_normalized"], # Close the shape
],
theta=["Overall", "Token Gen", "Prompt Proc", "Overall"],
fill="toself",
name=f"{row['Device']} ({row['Platform']})",
line=dict(
color=px.colors.qualitative.Set1[
idx % len(px.colors.qualitative.Set1)
]
),
hovertemplate="<b>%{name}</b><br>"
+ "Overall: %{r[0]:.1f}%<br>"
+ "Token Gen: %{r[1]:.1f}%<br>"
+ "Prompt Proc: %{r[2]:.1f}%<br>"
+ "<extra></extra>",
)
)
# Update layout
fig.update_layout(
polar=dict(
radialaxis=dict(visible=True, range=[0, 100], tickfont=dict(size=10)),
angularaxis=dict(tickfont=dict(size=12)),
),
showlegend=True,
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
title=dict(
text=f"Performance Comparison of Top {top_n} Devices",
x=0.5,
y=0.95,
font=dict(size=16),
),
margin=dict(t=100, l=50, r=50, b=50),
height=600,
)
return fig
def create_ranking_ladder(g2_confident_display: pd.DataFrame, top_n: int = 30):
"""Create a ranking ladder visualization showing device positions and confidence intervals."""
# Select top N devices
top_devices = g2_confident_display.nlargest(top_n, "Rating").copy()
# Create y-axis positions (rank 1 at top)
top_devices["rank_position"] = np.arange(1, len(top_devices) + 1)
# Create figure
fig = go.Figure()
# Add confidence intervals
for idx, row in top_devices.iterrows():
# Add confidence interval bars
fig.add_trace(
go.Scatter(
x=[
row["Rating"] - row["Rating Deviation"],
row["Rating"] + row["Rating Deviation"],
],
y=[row["rank_position"], row["rank_position"]],
mode="lines",
line=dict(color="rgba(0,0,0,0.3)", width=8),
showlegend=False,
hoverinfo="skip",
)
)
# Add rating points
for platform in top_devices["Platform"].unique():
platform_devices = top_devices[top_devices["Platform"] == platform]
fig.add_trace(
go.Scatter(
x=platform_devices["Rating"],
y=platform_devices["rank_position"],
mode="markers+text",
marker=dict(
size=12,
color=px.colors.qualitative.Set1[
list(top_devices["Platform"].unique()).index(platform)
% len(px.colors.qualitative.Set1)
],
),
text=platform_devices["Device"],
textposition="middle right",
textfont=dict(
color="rgba(0,0,0,1.0)", # Full black for maximum contrast
size=12, # Slightly larger
family="Arial Black, sans-serif", # Bold font
),
name=platform,
hovertemplate="<b>%{text}</b><br>"
+ "Rank: #%{y:.0f}<br>"
+ "Rating: %{x:.0f}<br>"
+ "Deviation: ±%{customdata[0]:.0f}<br>"
+ "<extra></extra>",
customdata=platform_devices[["Rating Deviation"]].values,
)
)
# Update layout
fig.update_layout(
# title=dict(
# text=f"Device Ranking Ladder (Top {top_n})",
# x=0.4,
# y=0.95,
# font=dict(size=16, family="Arial, sans-serif", color="rgba(0,0,0,1.0)"),
# ),
xaxis=dict(
title="Rating",
showgrid=True,
gridwidth=1,
gridcolor="rgba(0,0,0,0.1)",
autorange="reversed", # Reverse x-axis to show highest values on left
title_font=dict(
size=14, family="Arial, sans-serif", color="rgba(0,0,0,1.0)"
),
),
yaxis=dict(
title="Rank",
showgrid=True,
gridwidth=1,
gridcolor="rgba(0,0,0,0.1)",
tickmode="array",
tickvals=top_devices["rank_position"],
ticktext=[f"#{i}" for i in range(1, len(top_devices) + 1)],
autorange="reversed", # This will put rank 1 at the top
title_font=dict(
size=14, family="Arial, sans-serif", color="rgba(0,0,0,1.0)"
),
),
showlegend=True,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1,
font=dict(size=12, family="Arial, sans-serif", color="rgba(0,0,0,1.0)"),
),
margin=dict(t=100, l=50, r=100, b=50), # Reduced right margin from 200 to 100
height=800,
hovermode="closest",
paper_bgcolor="rgba(255,255,255,1)", # Pure white background
plot_bgcolor="rgba(255,255,255,1)", # Pure white plot area
autosize=True, # Enable responsive sizing
)
return fig
def render_device_rankings(df: pd.DataFrame):
"""Render device rankings using Glicko-2 algorithm."""
if df.empty:
st.warning("No data available for device rankings.")
return
# Calculate Glicko-2 rankings automatically
with st.spinner("Calculating Glicko-2 rankings..."):
try:
g2_all, g2_confident = analyze_glicko2_rankings(
df,
min_matches=5, # Default minimum matches
min_gpu_layers=20, # Default minimum GPU layers
)
# Display performance overview
# st.subheader("🏆 Performance Overview")
# Get top device from Glicko-2 rankings
top_device = g2_confident.index[0] if not g2_confident.empty else "N/A"
top_device_clean = (
clean_device_id(top_device) if top_device != "N/A" else "N/A"
)
# Calculate total unique devices and models
total_devices = df["Normalized Device ID"].nunique()
total_models = df["Model ID"].nunique()
# Display metrics in columns
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
st.metric("🏆 Top Device", top_device_clean)
with col2:
st.metric("Total Devices", total_devices)
with col3:
st.metric("Total Models", total_models)
# st.markdown("---")
# Display confident rankings
if not g2_confident.empty:
# st.subheader("📱 Device Rankings")
# Create a copy and handle the index
g2_confident_display = g2_confident.copy()
# Get the device ID column name
device_id_col = g2_confident_display.index.name or "device"
g2_confident_display = g2_confident_display.reset_index()
# Get platform information from the original dataframe
platform_map = (
df.groupby("Normalized Device ID")["Platform"].first().to_dict()
)
g2_confident_display["Platform"] = g2_confident_display[
device_id_col
].map(platform_map)
# Get model size range from the original dataframe
model_sizes = df.groupby("Normalized Device ID")["Model Size"].agg(
["min", "max"]
)
g2_confident_display["Model Size Range"] = g2_confident_display[
device_id_col
].apply(
lambda x: f"{model_sizes.loc[x, 'min']:.1f}B - {model_sizes.loc[x, 'max']:.1f}B"
)
# Add clean device name
g2_confident_display["Device"] = g2_confident_display[
device_id_col
].apply(clean_device_id)
# Round numeric columns to whole numbers
numeric_cols = [
"combined_rating",
"combined_rd",
"token_rating",
"prompt_rating",
]
for col in numeric_cols:
if col in g2_confident_display.columns:
g2_confident_display[col] = (
g2_confident_display[col].round(0).astype(int)
)
# Select and order columns for display
display_cols = [
"Device",
"Platform",
"combined_rating",
"combined_rd",
"token_rating",
"prompt_rating",
"Model Size Range",
]
# Rename columns for better display
rename_map = {
"combined_rating": "Rating",
"combined_rd": "Rating Deviation",
"token_rating": "Token Rating",
"prompt_rating": "Prompt Rating",
}
g2_confident_display = g2_confident_display.rename(columns=rename_map)
# Sort by Rating
g2_confident_display = g2_confident_display.sort_values(
"Rating", ascending=False
)
# Add rank column
g2_confident_display = g2_confident_display.reset_index(drop=True)
g2_confident_display.index = g2_confident_display.index + 1
g2_confident_display = g2_confident_display.rename_axis("Rank")
tab1, tab2 = st.tabs(
[
"Ranking Ladder",
"Ranking Table",
]
)
with tab1:
# Display the ranking ladder
st.plotly_chart(
create_ranking_ladder(g2_confident_display, top_n=30),
use_container_width=True,
)
with tab2:
# Display the table
st.dataframe(
g2_confident_display[
[
"Device",
"Platform",
"Rating",
"Rating Deviation",
"Token Rating",
"Prompt Rating",
"Model Size Range",
]
],
use_container_width=True,
height=min(600, (len(g2_confident_display) + 1) * 35 + 40),
hide_index=False,
)
else:
st.warning(
"No confident rankings available. Try adjusting the minimum matches threshold."
)
except Exception as e:
st.error(f"Error calculating Glicko-2 rankings: {str(e)}")