Spaces:
Runtime error
Runtime error
File size: 5,683 Bytes
b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb b7d100a 795e3eb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import load_model
import gradio as gr
import biosppy.signals.ecg as ecg
from PIL import Image
import traceback
# Create uploads directory
UPLOAD_FOLDER = "/tmp/uploads"
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Load the pre-trained model (assumes ecgScratchEpoch2.hdf5 is in the root directory)
try:
model = load_model("ecgScratchEpoch2.hdf5")
except Exception as e:
raise Exception(f"Failed to load model: {str(e)}")
def image_to_signal(image):
"""Convert an ECG image to a 1D signal and save as CSV."""
try:
# Convert Gradio image (PIL) to OpenCV format
img = np.array(image)
img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# Resize to a standard size
img = cv2.resize(img, (1000, 500))
# Apply thresholding to isolate waveform
_, binary = cv2.threshold(img, 200, 255, cv2.THRESH_BINARY_INV)
# Find contours
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
raise ValueError("No waveform detected in the image")
# Use the largest contour
contour = max(contours, key=cv2.contourArea)
# Extract y-coordinates along x-axis
signal = []
width = img.shape[1]
for x in range(width):
column = contour[contour[:, :, 0] == x]
if len(column) > 0:
y = np.mean(column[:, :, 1])
signal.append(y)
else:
signal.append(signal[-1] if signal else 0)
# Normalize signal
signal = np.array(signal)
signal = (signal - np.min(signal)) / (np.max(signal) - np.min(signal)) * 1000
# Save to CSV
csv_path = os.path.join(UPLOAD_FOLDER, "converted_signal.csv")
df = pd.DataFrame(signal, columns=[" Sample Value"])
df.to_csv(csv_path, index=False)
return csv_path
except Exception as e:
raise Exception(f"Image processing error: {str(e)}")
def model_predict(csv_path):
"""Predict ECG arrhythmia classes from a CSV file."""
try:
output = []
APC, NORMAL, LBB, PVC, PAB, RBB, VEB = [], [], [], [], [], [], []
result = {"APC": APC, "Normal": NORMAL, "LBB": LBB, "PAB": PAB, "PVC": PVC, "RBB": RBB, "VEB": VEB}
kernel = np.ones((4, 4), np.uint8)
csv = pd.read_csv(csv_path)
csv_data = csv[" Sample Value"]
data = np.array(csv_data)
signals = []
count = 1
peaks = ecg.christov_segmenter(signal=data, sampling_rate=200)[0]
indices = []
for i in peaks[1:-1]:
diff1 = abs(peaks[count - 1] - i)
diff2 = abs(peaks[count + 1] - i)
x = peaks[count - 1] + diff1 // 2
y = peaks[count + 1] - diff2 // 2
signal = data[x:y]
signals.append(signal)
count += 1
indices.append((x, y))
for signal, index in zip(signals, indices):
if len(signal) > 10:
img = np.zeros((128, 128))
for i in range(len(signal)):
img[i, int(signal[i] / 10)] = 255
img = cv2.dilate(img, kernel, iterations=1)
img = img.reshape(128, 128, 1)
prediction = model.predict(np.array([img]), verbose=0).argmax()
classes = ["Normal", "APC", "LBB", "PAB", "PVC", "RBB", "VEB"]
result[classes[prediction]].append(index)
output.append({"file": csv_path, "results": result})
return output
except Exception as e:
raise Exception(f"Prediction error: {str(e)}")
def classify_ecg(file):
"""Main function to handle file uploads (CSV or image)."""
try:
if file is None:
return "No file uploaded."
# Save uploaded file
file_path = os.path.join(UPLOAD_FOLDER, "uploaded_file")
if isinstance(file, str): # CSV file path
file_path += ".csv"
with open(file_path, "wb") as f:
with open(file, "rb") as src:
f.write(src.read())
else: # Image file (PIL Image from Gradio)
file_path += ".png"
file.save(file_path)
# Check file type
ext = file_path.rsplit(".", 1)[1].lower()
if ext in ["png", "jpg", "jpeg"]:
csv_path = image_to_signal(file)
elif ext == "csv":
csv_path = file_path
else:
return "Unsupported file type. Use CSV, PNG, or JPG."
# Run prediction
results = model_predict(csv_path)
# Format output
output = ""
for result in results:
output += f"File: {result['file']}\n"
for key, value in result["results"].items():
if value:
output += f"{key}: {value}\n"
return output
except Exception as e:
return f"Error: {str(e)}\n{traceback.format_exc()}"
# Gradio interface
iface = gr.Interface(
fn=classify_ecg,
inputs=gr.File(label="Upload ECG Image (PNG/JPG) or CSV"),
outputs=gr.Textbox(label="Classification Results"),
title="ECG Arrhythmia Classification",
description="Upload an ECG image (PNG/JPG) or CSV file to classify arrhythmias. Images will be converted to CSV before processing.",
)
if __name__ == "__main__":
iface.launch(server_name="0.0.0.0", server_port=7860) |