""" Core module for data visualization components. """ import streamlit as st import plotly.express as px import pandas as pd from typing import Optional, Dict, List, Set from ..core.glicko2_ranking import analyze_glicko2_rankings import plotly.graph_objects as go import numpy as np def clean_device_id(device_id: str) -> str: """Extract clean device name from normalized ID by removing platform prefix""" if device_id.startswith("iOS/"): return device_id[4:] # Remove "iOS/" return device_id def get_quant_name(factor: float) -> str: """Get human-readable name for quantization factor""" if pd.isna(factor): return "Unknown" if factor >= 1.0: return "No Quantization (F16/F32)" quant_map = { 0.8: "[i]Q8_x", 0.6: "[i]Q6_x", 0.5: "[i]Q5_x", 0.4: "[i]Q4_x", 0.3: "[i]Q3_x", 0.2: "[i]Q2_x", 0.1: "[i]Q1_x", } return quant_map.get(factor, f"Q{int(factor*10)}_x") def create_performance_plot( df: pd.DataFrame, metric: str, title: str, hover_data: List[str] = None ): """Create a performance comparison plot""" if df.empty: return None if hover_data is None: hover_data = [ "CPU Cores", "Peak Memory (GB)", ] fig = px.bar( df, x="Device", y=metric, color="Platform", title=title, template="plotly_white", barmode="group", hover_data=hover_data, ) fig.update_layout( xaxis_title="Device", yaxis_title="Token/sec" if "Token" in metric else metric, legend_title="Platform", plot_bgcolor="white", height=400, ) return fig def filter_dataframe(df: pd.DataFrame, filters: Dict) -> pd.DataFrame: """Apply all filters to the dataframe""" if df.empty: return df filtered_df = df.copy() # Basic filters if filters["model"] != "All": filtered_df = filtered_df[filtered_df["Model ID"] == filters["model"]] if filters["platform"] != "All": filtered_df = filtered_df[filtered_df["Platform"] == filters["platform"]] if filters["device"] != "All": filtered_df = filtered_df[filtered_df["Device"] == filters["device"]] # Flash Attention filter if filters["flash_attn"] != "All": filtered_df = filtered_df[filtered_df["flash_attn"] == filters["flash_attn"]] # Cache Type filters if filters["cache_type_k"] != "All": filtered_df = filtered_df[ filtered_df["cache_type_k"] == filters["cache_type_k"] ] if filters["cache_type_v"] != "All": filtered_df = filtered_df[ filtered_df["cache_type_v"] == filters["cache_type_v"] ] # Range filters pp_min, pp_max = filters["pp_range"] if pp_min is not None and pp_max is not None: pp_values = filtered_df["PP Config"] filtered_df = filtered_df[(pp_values >= pp_min) & (pp_values <= pp_max)] tg_min, tg_max = filters["tg_range"] if tg_min is not None and tg_max is not None: tg_values = filtered_df["TG Config"] filtered_df = filtered_df[(tg_values >= tg_min) & (tg_values <= tg_max)] n_threads_min, n_threads_max = filters["n_threads"] if n_threads_min is not None and n_threads_max is not None: n_threads = filtered_df["n_threads"] filtered_df = filtered_df[ (n_threads >= n_threads_min) & (n_threads <= n_threads_max) ] n_gpu_layers_min, n_gpu_layers_max = filters["n_gpu_layers"] if n_gpu_layers_min is not None and n_gpu_layers_max is not None: n_gpu_layers = filtered_df["n_gpu_layers"] filtered_df = filtered_df[ (n_gpu_layers >= n_gpu_layers_min) & (n_gpu_layers <= n_gpu_layers_max) ] # Version filter if filters.get("Version") != "All" and filters.get("Version"): filtered_df = filtered_df[filtered_df["Version"] == filters["Version"]] return filtered_df def render_leaderboard_table(df: pd.DataFrame, filters: Dict): """Render the leaderboard table with grouped and formatted data""" if df.empty: st.warning("No data available for the selected filters.") return # Apply filters filtered_df = filter_dataframe(df, filters) if filtered_df.empty: st.warning("No data matches the selected filters.") return # Define the preferred column order (grouped logically) column_order = [ # Device Info "Device", "Platform", "CPU Cores", "Total Memory (GB)", "Peak Memory (GB)", "Memory Usage (%)", # Benchmark Results "PP Config", "PP Avg (t/s)", "PP Std (t/s)", "TG Config", "TG Avg (t/s)", "TG Std (t/s)", # Model Config "Model ID", "Model Size", "n_threads", "flash_attn", "cache_type_k", "cache_type_v", "n_context", "n_batch", "n_ubatch", "Version", ] # Group by selected columns grouping_cols = filters["grouping"] if not grouping_cols: grouping_cols = ["Model ID", "Device", "Platform"] # Default grouping # Create aggregations (excluding grouping columns) agg_dict = { col: agg for col, agg in { "Prompt Processing": ["mean", "std"], "Token Generation": ["mean", "std"], "Peak Memory (GB)": "mean", "Total Memory (GB)": "first", "CPU Cores": "first", "Model Size": "first", "Version": lambda x: ", ".join(sorted(set(x))), "n_gpu_layers": lambda x: ", ".join(sorted(set(str(x)))), }.items() if col not in grouping_cols } # Group and aggregate grouped_df = filtered_df.groupby(grouping_cols).agg(agg_dict).reset_index() # Flatten column names grouped_df.columns = [ col[0] if col[1] == "" else f"{col[0]} ({col[1]})" for col in grouped_df.columns ] # Rename columns for display column_mapping = { "Prompt Processing (mean)": "PP Avg (t/s)", "Prompt Processing (std)": "PP Std (t/s)", "Token Generation (mean)": "TG Avg (t/s)", "Token Generation (std)": "TG Std (t/s)", "Memory Usage (%) (mean)": "Memory Usage (%)", "Peak Memory (GB) (mean)": "Peak Memory (GB)", "PP Config (first)": "PP Config", "TG Config (first)": "TG Config", "Model Size (first)": "Model Size", "CPU Cores (first)": "CPU Cores", "Total Memory (GB) (first)": "Total Memory (GB)", "n_threads (first)": "n_threads", "flash_attn (first)": "flash_attn", "cache_type_k (first)": "cache_type_k", "cache_type_v (first)": "cache_type_v", "n_context (first)": "n_context", "n_batch (first)": "n_batch", "n_ubatch (first)": "n_ubatch", "Version ()": "Version", } grouped_df = grouped_df.rename(columns=column_mapping) # Filter visible columns visible_cols = filters["visible_columns"] if visible_cols: # Map the user-friendly names to actual column names column_name_mapping = { "Device": "Device", "Platform": "Platform", "CPU Cores": "CPU Cores", "Total Memory (GB)": "Total Memory (GB)", "Peak Memory (GB)": "Peak Memory (GB)", "Memory Usage (%)": "Memory Usage (%)", "PP Config": "PP Config", "TG Config": "TG Config", "Prompt Processing (mean)": "PP Avg (t/s)", "Token Generation (mean)": "TG Avg (t/s)", "Prompt Processing (std)": "PP Std (t/s)", "Token Generation (std)": "TG Std (t/s)", "Model": "Model ID", "Model Size": "Model Size", "Model ID": "Model ID", "n_threads": "n_threads", "flash_attn": "flash_attn", "cache_type_k": "cache_type_k", "cache_type_v": "cache_type_v", "n_context": "n_context", "n_batch": "n_batch", "n_ubatch": "n_ubatch", "Version": "Version", } # Convert visible columns and grouping columns to their mapped names mapped_visible = {column_name_mapping.get(col, col) for col in visible_cols} mapped_grouping = { column_name_mapping.get(col, col) for col in filters["grouping"] } # Combine both sets to get unique columns all_cols = mapped_visible | mapped_grouping # Create final display columns list display_cols = [] # Get all available columns we want to display available_cols = set(all_cols) # Add columns in the predefined order for col in column_order: if col in available_cols: display_cols.append(col) # Add any remaining columns that weren't in our predefined order remaining_cols = sorted(list(available_cols - set(display_cols))) display_cols.extend(remaining_cols) else: # Default columns if none selected display_cols = column_order[:8] # Display the filtered and grouped table st.markdown("#### 📊 Benchmark Results") st.dataframe( grouped_df[display_cols], use_container_width=True, height=min( 600, (len(grouped_df) + 1) * 35 + 40 ), # Dynamic height based on content hide_index=False, column_config={ "Rank": st.column_config.NumberColumn( "Rank", help="Device ranking based on performance score", ), "Device": st.column_config.TextColumn( "Device", help="Device brand and model", ), "Best Score": st.column_config.NumberColumn( "Score", help="Overall performance score (0-100)", format="%.2f" ), "Best TG Speed": st.column_config.NumberColumn( "Best TG Speed (t/s)", help="Best token generation speed", format="%.2f", ), "Best PP Speed": st.column_config.NumberColumn( "Best PP Speed (t/s)", help="Best prompt processing speed", format="%.2f", ), }, ) def create_device_radar_chart(g2_confident_display: pd.DataFrame, top_n: int = 10): """Create a radar chart comparing the top N devices across different performance metrics.""" # Select top N devices top_devices = g2_confident_display.nlargest(top_n, "Rating") # Normalize metrics to 0-100 scale for better visualization metrics = ["Rating", "Token Rating", "Prompt Rating"] for metric in metrics: min_val = top_devices[metric].min() max_val = top_devices[metric].max() top_devices[f"{metric}_normalized"] = ( (top_devices[metric] - min_val) / (max_val - min_val) ) * 100 # Create radar chart fig = go.Figure() # Add a trace for each device for idx, row in top_devices.iterrows(): fig.add_trace( go.Scatterpolar( r=[ row["Rating_normalized"], row["Token Rating_normalized"], row["Prompt Rating_normalized"], row["Rating_normalized"], # Close the shape ], theta=["Overall", "Token Gen", "Prompt Proc", "Overall"], fill="toself", name=f"{row['Device']} ({row['Platform']})", line=dict( color=px.colors.qualitative.Set1[ idx % len(px.colors.qualitative.Set1) ] ), hovertemplate="%{name}
" + "Overall: %{r[0]:.1f}%
" + "Token Gen: %{r[1]:.1f}%
" + "Prompt Proc: %{r[2]:.1f}%
" + "", ) ) # Update layout fig.update_layout( polar=dict( radialaxis=dict(visible=True, range=[0, 100], tickfont=dict(size=10)), angularaxis=dict(tickfont=dict(size=12)), ), showlegend=True, legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), title=dict( text=f"Performance Comparison of Top {top_n} Devices", x=0.5, y=0.95, font=dict(size=16), ), margin=dict(t=100, l=50, r=50, b=50), height=600, ) return fig def create_ranking_ladder(g2_confident_display: pd.DataFrame, top_n: int = 30): """Create a ranking ladder visualization showing device positions and confidence intervals.""" # Select top N devices top_devices = g2_confident_display.nlargest(top_n, "Rating").copy() # Create y-axis positions (rank 1 at top) top_devices["rank_position"] = np.arange(1, len(top_devices) + 1) # Create figure fig = go.Figure() # Add confidence intervals for idx, row in top_devices.iterrows(): # Add confidence interval bars fig.add_trace( go.Scatter( x=[ row["Rating"] - row["Rating Deviation"], row["Rating"] + row["Rating Deviation"], ], y=[row["rank_position"], row["rank_position"]], mode="lines", line=dict(color="rgba(0,0,0,0.3)", width=8), showlegend=False, hoverinfo="skip", ) ) # Add rating points for platform in top_devices["Platform"].unique(): platform_devices = top_devices[top_devices["Platform"] == platform] fig.add_trace( go.Scatter( x=platform_devices["Rating"], y=platform_devices["rank_position"], mode="markers+text", marker=dict( size=12, color=px.colors.qualitative.Set1[ list(top_devices["Platform"].unique()).index(platform) % len(px.colors.qualitative.Set1) ], ), text=platform_devices["Device"], textposition="middle right", textfont=dict( color="rgba(0,0,0,1.0)", # Full black for maximum contrast size=12, # Slightly larger family="Arial Black, sans-serif", # Bold font ), name=platform, hovertemplate="%{text}
" + "Rank: #%{y:.0f}
" + "Rating: %{x:.0f}
" + "Deviation: ±%{customdata[0]:.0f}
" + "", customdata=platform_devices[["Rating Deviation"]].values, ) ) # Update layout fig.update_layout( # title=dict( # text=f"Device Ranking Ladder (Top {top_n})", # x=0.4, # y=0.95, # font=dict(size=16, family="Arial, sans-serif", color="rgba(0,0,0,1.0)"), # ), xaxis=dict( title="Rating", showgrid=True, gridwidth=1, gridcolor="rgba(0,0,0,0.1)", autorange="reversed", # Reverse x-axis to show highest values on left title_font=dict( size=14, family="Arial, sans-serif", color="rgba(0,0,0,1.0)" ), ), yaxis=dict( title="Rank", showgrid=True, gridwidth=1, gridcolor="rgba(0,0,0,0.1)", tickmode="array", tickvals=top_devices["rank_position"], ticktext=[f"#{i}" for i in range(1, len(top_devices) + 1)], autorange="reversed", # This will put rank 1 at the top title_font=dict( size=14, family="Arial, sans-serif", color="rgba(0,0,0,1.0)" ), ), showlegend=True, legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1, font=dict(size=12, family="Arial, sans-serif", color="rgba(0,0,0,1.0)"), ), margin=dict(t=100, l=50, r=100, b=50), # Reduced right margin from 200 to 100 height=800, hovermode="closest", paper_bgcolor="rgba(255,255,255,1)", # Pure white background plot_bgcolor="rgba(255,255,255,1)", # Pure white plot area autosize=True, # Enable responsive sizing ) return fig def render_device_rankings(df: pd.DataFrame): """Render device rankings using Glicko-2 algorithm.""" if df.empty: st.warning("No data available for device rankings.") return # Calculate Glicko-2 rankings automatically with st.spinner("Calculating Glicko-2 rankings..."): try: g2_all, g2_confident = analyze_glicko2_rankings( df, min_matches=5, # Default minimum matches min_gpu_layers=20, # Default minimum GPU layers ) # Display performance overview # st.subheader("🏆 Performance Overview") # Get top device from Glicko-2 rankings top_device = g2_confident.index[0] if not g2_confident.empty else "N/A" top_device_clean = ( clean_device_id(top_device) if top_device != "N/A" else "N/A" ) # Calculate total unique devices and models total_devices = df["Normalized Device ID"].nunique() total_models = df["Model ID"].nunique() # Display metrics in columns col1, col2, col3 = st.columns([3, 1, 1]) with col1: st.metric("🏆 Top Device", top_device_clean) with col2: st.metric("Total Devices", total_devices) with col3: st.metric("Total Models", total_models) # st.markdown("---") # Display confident rankings if not g2_confident.empty: # st.subheader("📱 Device Rankings") # Create a copy and handle the index g2_confident_display = g2_confident.copy() # Get the device ID column name device_id_col = g2_confident_display.index.name or "device" g2_confident_display = g2_confident_display.reset_index() # Get platform information from the original dataframe platform_map = ( df.groupby("Normalized Device ID")["Platform"].first().to_dict() ) g2_confident_display["Platform"] = g2_confident_display[ device_id_col ].map(platform_map) # Get model size range from the original dataframe model_sizes = df.groupby("Normalized Device ID")["Model Size"].agg( ["min", "max"] ) g2_confident_display["Model Size Range"] = g2_confident_display[ device_id_col ].apply( lambda x: f"{model_sizes.loc[x, 'min']:.1f}B - {model_sizes.loc[x, 'max']:.1f}B" ) # Add clean device name g2_confident_display["Device"] = g2_confident_display[ device_id_col ].apply(clean_device_id) # Round numeric columns to whole numbers numeric_cols = [ "combined_rating", "combined_rd", "token_rating", "prompt_rating", ] for col in numeric_cols: if col in g2_confident_display.columns: g2_confident_display[col] = ( g2_confident_display[col].round(0).astype(int) ) # Select and order columns for display display_cols = [ "Device", "Platform", "combined_rating", "combined_rd", "token_rating", "prompt_rating", "Model Size Range", ] # Rename columns for better display rename_map = { "combined_rating": "Rating", "combined_rd": "Rating Deviation", "token_rating": "Token Rating", "prompt_rating": "Prompt Rating", } g2_confident_display = g2_confident_display.rename(columns=rename_map) # Sort by Rating g2_confident_display = g2_confident_display.sort_values( "Rating", ascending=False ) # Add rank column g2_confident_display = g2_confident_display.reset_index(drop=True) g2_confident_display.index = g2_confident_display.index + 1 g2_confident_display = g2_confident_display.rename_axis("Rank") tab1, tab2 = st.tabs( [ "Ranking Ladder", "Ranking Table", ] ) with tab1: # Display the ranking ladder st.plotly_chart( create_ranking_ladder(g2_confident_display, top_n=30), use_container_width=True, ) with tab2: # Display the table st.dataframe( g2_confident_display[ [ "Device", "Platform", "Rating", "Rating Deviation", "Token Rating", "Prompt Rating", "Model Size Range", ] ], use_container_width=True, height=min(600, (len(g2_confident_display) + 1) * 35 + 40), hide_index=False, ) else: st.warning( "No confident rankings available. Try adjusting the minimum matches threshold." ) except Exception as e: st.error(f"Error calculating Glicko-2 rankings: {str(e)}")