import gradio as gr import torch import joblib import numpy as np from itertools import product import torch.nn as nn import matplotlib.pyplot as plt import io from PIL import Image ############################################################################### # 1. MODEL DEFINITION ############################################################################### class VirusClassifier(nn.Module): def __init__(self, input_shape: int): super(VirusClassifier, self).__init__() self.network = nn.Sequential( nn.Linear(input_shape, 64), nn.GELU(), nn.BatchNorm1d(64), nn.Dropout(0.3), nn.Linear(64, 32), nn.GELU(), nn.BatchNorm1d(32), nn.Dropout(0.3), nn.Linear(32, 32), nn.GELU(), nn.Linear(32, 2) ) def forward(self, x): return self.network(x) ############################################################################### # 2. FASTA PARSING & K-MER FEATURE ENGINEERING ############################################################################### def parse_fasta(text): """Parse FASTA formatted text into a list of (header, sequence).""" sequences = [] current_header = None current_sequence = [] for line in text.strip().split('\n'): line = line.strip() if not line: continue if line.startswith('>'): if current_header: sequences.append((current_header, ''.join(current_sequence))) current_header = line[1:] current_sequence = [] else: current_sequence.append(line.upper()) if current_header: sequences.append((current_header, ''.join(current_sequence))) return sequences def sequence_to_kmer_vector(sequence: str, k: int = 4) -> np.ndarray: """Convert a sequence to a k-mer frequency vector.""" kmers = [''.join(p) for p in product("ACGT", repeat=k)] kmer_dict = {km: i for i, km in enumerate(kmers)} vec = np.zeros(len(kmers), dtype=np.float32) for i in range(len(sequence) - k + 1): kmer = sequence[i:i+k] if kmer in kmer_dict: vec[kmer_dict[kmer]] += 1 total_kmers = len(sequence) - k + 1 if total_kmers > 0: vec = vec / total_kmers return vec ############################################################################### # 3. SHAP-VALUE (ABLATION) CALCULATION ############################################################################### def calculate_shap_values(model, x_tensor): """ Calculate SHAP values using a simple ablation approach. Returns shap values and model prediction. """ model.eval() with torch.no_grad(): # Get baseline prediction baseline_output = model(x_tensor) baseline_probs = torch.softmax(baseline_output, dim=1) baseline_prob = baseline_probs[0, 1].item() # Probability of 'human' class # Calculate impact of zeroing each feature shap_values = [] x_zeroed = x_tensor.clone() for i in range(x_tensor.shape[1]): original_value = x_zeroed[0, i].item() x_zeroed[0, i] = 0.0 output = model(x_zeroed) probs = torch.softmax(output, dim=1) prob = probs[0, 1].item() impact = baseline_prob - prob shap_values.append(impact) x_zeroed[0, i] = original_value # restore return np.array(shap_values), baseline_prob ############################################################################### # 4. PER-BASE SHAP AGGREGATION ############################################################################### def compute_positionwise_scores(sequence, shap_values, k=4): """ Returns an array of per-base SHAP contributions by averaging the k-mer SHAP values of all k-mers covering that base. """ # Create the list of k-mers (in lexicographic order) kmers = [''.join(p) for p in product("ACGT", repeat=k)] kmer_dict = {km: i for i, km in enumerate(kmers)} seq_len = len(sequence) shap_sums = np.zeros(seq_len, dtype=np.float32) coverage = np.zeros(seq_len, dtype=np.float32) for i in range(seq_len - k + 1): kmer = sequence[i:i+k] if kmer in kmer_dict: val = shap_values[kmer_dict[kmer]] shap_sums[i : i + k] += val coverage[i : i + k] += 1 with np.errstate(divide='ignore', invalid='ignore'): shap_means = np.where(coverage > 0, shap_sums / coverage, 0.0) return shap_means ############################################################################### # 5. HEATMAP PLOTS ############################################################################### def plot_linear_heatmap(shap_means, title="Per-base SHAP Heatmap"): """ Plots a 1D heatmap of per-base SHAP contributions. Negative = push toward Non-Human, Positive = push toward Human. """ heatmap_data = shap_means.reshape(1, -1) # shape (1, seq_len) fig, ax = plt.subplots(figsize=(12, 2)) cax = ax.imshow(heatmap_data, aspect='auto', cmap='RdBu_r') cbar = plt.colorbar(cax, orientation='horizontal', pad=0.2) cbar.set_label('SHAP Contribution') ax.set_yticks([]) ax.set_xlabel('Position in Sequence') ax.set_title(title) plt.tight_layout() return fig def get_top_signal_region(shap_means, window_size=500): """ Find the window of length `window_size` that has the highest sum of absolute SHAP values. Returns (start_index, end_index). """ seq_len = len(shap_means) if window_size >= seq_len: return 0, seq_len # entire sequence if window too large abs_values = np.abs(shap_means) max_sum = -1 max_start = 0 # Slide a window over shap_means current_sum = np.sum(abs_values[:window_size]) max_sum = current_sum for start in range(1, seq_len - window_size + 1): # Remove the leftmost base, add the new rightmost base current_sum = current_sum - abs_values[start-1] + abs_values[start + window_size - 1] if current_sum > max_sum: max_sum = current_sum max_start = start return max_start, max_start + window_size def plot_zoomed_heatmap(shap_means, window_size=500, title="Zoomed SHAP Region"): """ Finds the region with the largest absolute SHAP sum in a fixed window, then plots a 1D heatmap of just that sub-region. """ start, end = get_top_signal_region(shap_means, window_size) sub_means = shap_means[start:end].reshape(1, -1) fig, ax = plt.subplots(figsize=(12, 2)) cax = ax.imshow(sub_means, aspect='auto', cmap='RdBu_r') cbar = plt.colorbar(cax, orientation='horizontal', pad=0.2) cbar.set_label('SHAP Contribution') ax.set_yticks([]) ax.set_xlabel(f'Position in Sequence (zoomed in {start} - {end})') ax.set_title(title) plt.tight_layout() return fig ############################################################################### # 6. OTHER PLOT: TOP-K K-MER BAR PLOT ############################################################################### def create_importance_bar_plot(shap_values, kmers, top_k=10): """Create a bar plot of the most important k-mers.""" plt.rcParams.update({'font.size': 10}) fig = plt.figure(figsize=(10, 5)) # Sort by absolute importance indices = np.argsort(np.abs(shap_values))[-top_k:] values = shap_values[indices] features = [kmers[i] for i in indices] colors = ['#ff9999' if v > 0 else '#99ccff' for v in values] plt.barh(range(len(values)), values, color=colors) plt.yticks(range(len(values)), features) plt.xlabel('SHAP value (impact on model output)') plt.title(f'Top {top_k} Most Influential k-mers') plt.gca().invert_yaxis() return fig ############################################################################### # 7. HELPER FUNCTION: FIG TO IMAGE ############################################################################### def fig_to_image(fig): """Convert a Matplotlib figure to a PIL Image.""" import io buf = io.BytesIO() fig.savefig(buf, format='png', bbox_inches='tight', dpi=150) buf.seek(0) img = Image.open(buf) plt.close(fig) return img ############################################################################### # 8. MAIN PREDICTION FUNCTION ############################################################################### def predict(file_obj, top_kmers=10, fasta_text="", zoom_window=500): """Main prediction function for Gradio interface.""" # Handle input if fasta_text.strip(): text = fasta_text.strip() elif file_obj is not None: try: with open(file_obj, 'r') as f: text = f.read() except Exception as e: return f"Error reading file: {str(e)}", None, None, None else: return "Please provide a FASTA sequence.", None, None, None # Parse FASTA sequences = parse_fasta(text) if not sequences: return "No valid FASTA sequences found.", None, None, None header, seq = sequences[0] # Load model and scaler device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') try: model = VirusClassifier(256).to(device) model.load_state_dict(torch.load('model.pt', map_location=device)) scaler = joblib.load('scaler.pkl') except Exception as e: return f"Error loading model: {str(e)}", None, None, None # Generate features freq_vector = sequence_to_kmer_vector(seq) scaled_vector = scaler.transform(freq_vector.reshape(1, -1)) x_tensor = torch.FloatTensor(scaled_vector).to(device) # Calculate SHAP values and get prediction shap_values, prob_human = calculate_shap_values(model, x_tensor) # Prediction text results = [ f"Sequence: {header}", f"Prediction: {'Human' if prob_human > 0.5 else 'Non-human'} Origin", f"Confidence: {max(prob_human, 1 - prob_human):.3f}", f"Human Probability: {prob_human:.3f}" ] # Create k-mer list (4-mers in lexicographic order) kmers = [''.join(p) for p in product("ACGT", repeat=4)] # 1) Top-k k-mer bar plot importance_fig = create_importance_bar_plot(shap_values, kmers, top_kmers) importance_img = fig_to_image(importance_fig) # 2) Full-genome per-base SHAP heatmap shap_means = compute_positionwise_scores(seq, shap_values, k=4) heatmap_fig = plot_linear_heatmap(shap_means, title="Genome-wide Per-base SHAP") heatmap_img = fig_to_image(heatmap_fig) # 3) Zoomed region (optional, using the largest absolute SHAP region) if zoom_window > 0: zoom_fig = plot_zoomed_heatmap(shap_means, window_size=zoom_window, title=f"Top SHAP Region (window={zoom_window})") zoom_img = fig_to_image(zoom_fig) else: zoom_img = None return "\n".join(results), importance_img, heatmap_img, zoom_img ############################################################################### # 9. BUILD GRADIO INTERFACE ############################################################################### css = """ .gradio-container { font-family: 'IBM Plex Sans', sans-serif; } """ with gr.Blocks(css=css) as iface: gr.Markdown(""" # Virus Host Classifier Predicts whether a viral sequence is of human or non-human origin using k-mer analysis. """) with gr.Row(): with gr.Column(scale=1): file_input = gr.File( label="Upload FASTA file", file_types=[".fasta", ".fa", ".txt"], type="filepath" ) text_input = gr.Textbox( label="Or paste FASTA sequence", placeholder=">sequence_name\nACGTACGT...", lines=5 ) top_k = gr.Slider( minimum=5, maximum=30, value=10, step=1, label="Number of top k-mers to display" ) zoom_window = gr.Slider( minimum=0, maximum=5000, value=500, step=100, label="Zoom Window Size (0 to disable zoom plot)" ) submit_btn = gr.Button("Analyze Sequence", variant="primary") with gr.Column(scale=2): results_box = gr.Textbox(label="Analysis Results", lines=5) kmer_plot = gr.Image(label="Top k-mer SHAP") full_heatmap = gr.Image(label="Genome-wide SHAP Heatmap") zoomed_heatmap = gr.Image(label="Zoomed SHAP Region (largest signal)") submit_btn.click( predict, inputs=[file_input, top_k, text_input, zoom_window], outputs=[results_box, kmer_plot, full_heatmap, zoomed_heatmap] ) gr.Markdown(""" ### Visualization Guide - **Top k-mer SHAP**: Shows the most influential k-mers and their SHAP values. - **Genome-wide SHAP Heatmap**: Per-base SHAP values across the entire sequence. - Red = push toward human - Blue = push toward non-human - **Zoomed SHAP Region**: Shows the subregion of length 'Zoom Window Size' that has the highest absolute SHAP sum. """) if __name__ == "__main__": iface.launch()