import numpy as np import torch import torch.utils.data from torch import nn from pathlib import Path from scipy.stats import iqr import os #Note: due to issues with the numpy rng for multiprocessing #(https://github.com/pytorch/pytorch/issues/5059) that could be #fixed by a custom worker_init_fn we use random throughout for convenience import random from skimage import transform import warnings warnings.filterwarnings("ignore", category=UserWarning) from scipy.signal import butter, sosfilt, sosfiltfilt, sosfreqz #https://stackoverflow.com/questions/12093594/how-to-implement-band-pass-butterworth-filter-with-scipy-signal-butter def butter_filter(lowcut=10, highcut=20, fs=50, order=5, btype='band'): # image processing Butterworth filter """returns butterworth filter with given specifications""" nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq sos = butter(order, [low, high] if btype == "band" else (low if btype == "low" else high), analog=False, btype=btype, output='sos') return sos def butter_filter_frequency_response(filter): """returns frequency response of a given filter (result of call of butter_filter)""" w, h = sosfreqz(filter) # gain vs. freq(Hz) # plt.plot((fs * 0.5 / np.pi) * w, abs(h)) return w, h def apply_butter_filter(data, filter, forwardbackward=True): # The function provides options for handling the edges of the signal. """pass filter from call of butter_filter to data (assuming time axis at dimension 0)""" if forwardbackward: return sosfiltfilt(filter, data, axis=0) else: data = sosfilt(filter, data, axis=0) def dataset_add_chunk_col(df, col="data"): """add a chunk column to the dataset df""" df["chunk"] = df.groupby(col).cumcount() def dataset_add_length_col(df, col="data", data_folder=None): """add a length column to the dataset df""" df[col + "_length"] = df[col].apply(lambda x: len(np.load(x if data_folder is None else data_folder / x))) def dataset_add_labels_col(df, col="label", data_folder=None): """add a column with unique labels in column col""" df[col + "_labels"] = df[col].apply( lambda x: list(np.unique(np.load(x if data_folder is None else data_folder / x)))) def dataset_add_mean_col(df, col="data", axis=(0), data_folder=None): """adds a column with mean""" df[col + "_mean"] = df[col].apply( lambda x: np.mean(np.load(x if data_folder is None else data_folder / x), axis=axis)) def dataset_add_median_col(df, col="data", axis=(0), data_folder=None): """adds a column with median""" df[col + "_median"] = df[col].apply( lambda x: np.median(np.load(x if data_folder is None else data_folder / x), axis=axis)) def dataset_add_std_col(df, col="data", axis=(0), data_folder=None): """adds a column with mean""" df[col + "_std"] = df[col].apply( lambda x: np.std(np.load(x if data_folder is None else data_folder / x), axis=axis)) def dataset_add_iqr_col(df, col="data", axis=(0), data_folder=None): """adds a column with mean""" df[col + "_iqr"] = df[col].apply(lambda x: iqr(np.load(x if data_folder is None else data_folder / x), axis=axis)) def dataset_get_stats(df, col="data", median=False): """creates weighted means and stds from mean, std and length cols of the df""" mean = np.average(np.stack(df[col + ("_median" if median is True else "_mean")], axis=0), axis=0, weights=np.array(df[col + "_length"])) std = np.average(np.stack(df[col + ("_iqr" if median is True else "_std")], axis=0), axis=0, weights=np.array(df[col + "_length"])) return mean, std def npys_to_memmap(npys, target_filename, delete_npys=False): memmap = None start = [] length = [] files = [] ids = [] for idx, npy in enumerate(npys): data = np.load(npy) if memmap is None: memmap = np.memmap(target_filename, dtype=data.dtype, mode='w+', shape=data.shape) start.append(0) length.append(data.shape[0]) else: start.append(start[-1] + length[-1]) length.append(data.shape[0]) memmap = np.memmap(target_filename, dtype=data.dtype, mode='r+', shape=tuple([start[-1] + length[-1]] + [l for l in data.shape[1:]])) ids.append(idx) memmap[start[-1]:start[-1] + length[-1]] = data[:] memmap.flush() if delete_npys is True: npy.unlink() del memmap np.savez(target_filename.parent / (target_filename.stem + "_meta.npz"), start=start, length=length, shape=[start[-1] + length[-1]] + [l for l in data.shape[1:]], dtype=data.dtype) def reformat_as_memmap(df, target_filename, data_folder=None, annotation=False, delete_npys=False): npys_data = [] npys_label = [] for id, row in df.iterrows(): npys_data.append(data_folder / row["data"] if data_folder is not None else row["data"]) if annotation: npys_label.append(data_folder / row["label"] if data_folder is not None else row["label"]) npys_to_memmap(npys_data, target_filename, delete_npys=delete_npys) if annotation: npys_to_memmap(npys_label, target_filename.parent / (target_filename.stem + "_label.npy"), delete_npys=delete_npys) # replace data(filename) by integer df_mapped = df.copy() df_mapped["data_original"] = df_mapped.data df_mapped["data"] = np.arange(len(df_mapped)) df_mapped.to_pickle(target_filename.parent / ("df_" + target_filename.stem + ".pkl")) return df_mapped # TimeseriesDatasetCrops class TimeseriesDatasetCrops(torch.utils.data.Dataset): """timeseries dataset with partial crops.""" def __init__(self, df, output_size, chunk_length, min_chunk_length, memmap_filename=None, npy_data=None, random_crop=True, data_folder=None, num_classes=2, copies=0, col_lbl="label", stride=None, start_idx=0, annotation=False, transforms=None): """ accepts three kinds of input: 1) filenames pointing to aligned numpy arrays [timesteps,channels,...] for data and either integer labels or filename pointing to numpy arrays[timesteps,...] e.g. for annotations 2) memmap_filename to memmap for data [concatenated,...] and labels- label column in df corresponds to index in this memmap 3) npy_data [samples,ts,...] (either path or np.array directly- also supporting variable length input) - label column in df corresponds to sampleid transforms: list of callables (transformations) (applied in the specified order i.e. leftmost element first) """ if transforms is None: transforms = [] assert not ((memmap_filename is not None) and (npy_data is not None)) # require integer entries if using memmap or npy assert (memmap_filename is None and npy_data is None) or df.data.dtype == np.int64 self.timeseries_df = df self.output_size = output_size self.data_folder = data_folder self.transforms = transforms self.annotation = annotation self.col_lbl = col_lbl self.c = num_classes self.mode = "files" self.memmap_filename = memmap_filename if memmap_filename is not None: self.mode = "memmap" memmap_meta = np.load(memmap_filename.parent / (memmap_filename.stem + "_meta.npz")) self.memmap_start = memmap_meta["start"] self.memmap_shape = tuple(memmap_meta["shape"]) self.memmap_length = memmap_meta["length"] self.memmap_dtype = np.dtype(str(memmap_meta["dtype"])) self.memmap_file_process_dict = {} if annotation: memmap_meta_label = np.load(memmap_filename.parent / (memmap_filename.stem + "_label_meta.npz")) self.memmap_filename_label = memmap_filename.parent / (memmap_filename.stem + "_label.npy") self.memmap_shape_label = tuple(memmap_meta_label["shape"]) self.memmap_file_process_dict_label = {} self.memmap_dtype_label = np.dtype(str(memmap_meta_label["dtype"])) elif npy_data is not None: self.mode = "npy" if isinstance(npy_data, np.ndarray) or isinstance(npy_data, list): self.npy_data = np.array(npy_data) assert (annotation is False) else: self.npy_data = np.load(npy_data) if annotation: self.npy_data_label = np.load(npy_data.parent / (npy_data.stem + "_label.npy")) self.random_crop = random_crop self.df_idx_mapping = [] self.start_idx_mapping = [] self.end_idx_mapping = [] for df_idx, (id, row) in enumerate(df.iterrows()): if self.mode == "files": data_length = row["data_length"] elif self.mode == "memmap": data_length = self.memmap_length[row["data"]] else: # npy data_length = len(self.npy_data[row["data"]]) if chunk_length == 0: # do not split idx_start = [start_idx] idx_end = [data_length] else: idx_start = list(range(start_idx, data_length, chunk_length if stride is None else stride)) idx_end = [min(l + chunk_length, data_length) for l in idx_start] # remove final chunk(s) if too short for i in range(len(idx_start)): if idx_end[i] - idx_start[i] < min_chunk_length: del idx_start[i:] del idx_end[i:] break # append to lists for _ in range(copies + 1): for i_s, i_e in zip(idx_start, idx_end): self.df_idx_mapping.append(df_idx) self.start_idx_mapping.append(i_s) self.end_idx_mapping.append(i_e) def __len__(self): return len(self.df_idx_mapping) def __getitem__(self, idx): df_idx = self.df_idx_mapping[idx] start_idx = self.start_idx_mapping[idx] end_idx = self.end_idx_mapping[idx] # determine crop idxs timesteps = end_idx - start_idx assert (timesteps >= self.output_size) if self.random_crop: # random crop if timesteps == self.output_size: start_idx_crop = start_idx else: start_idx_crop = start_idx + random.randint(0, timesteps - self.output_size - 1) # np.random.randint(0, timesteps - self.output_size) else: start_idx_crop = start_idx + (timesteps - self.output_size) // 2 end_idx_crop = start_idx_crop + self.output_size # print(idx,start_idx,end_idx,start_idx_crop,end_idx_crop) # load the actual data if self.mode == "files": # from separate files data_filename = self.timeseries_df.iloc[df_idx]["data"] if self.data_folder is not None: data_filename = self.data_folder / data_filename data = np.load(data_filename)[ start_idx_crop:end_idx_crop] # data type has to be adjusted when saving to npy ID = data_filename.stem if self.annotation is True: label_filename = self.timeseries_df.iloc[df_idx][self.col_lbl] if self.data_folder is not None: label_filename = self.data_folder / label_filename label = np.load(label_filename)[ start_idx_crop:end_idx_crop] # data type has to be adjusted when saving to npy else: label = self.timeseries_df.iloc[df_idx][self.col_lbl] # input type has to be adjusted in the dataframe elif self.mode == "memmap": # from one memmap file ID = self.timeseries_df.iloc[df_idx]["data_original"].stem memmap_idx = self.timeseries_df.iloc[df_idx][ "data"] # grab the actual index (Note the df to create the ds might be a subset of the original df used to create the memmap) idx_offset = self.memmap_start[memmap_idx] pid = os.getpid() # print("idx",idx,"ID",ID,"idx_offset",idx_offset,"start_idx_crop",start_idx_crop,"df_idx", self.df_idx_mapping[idx],"pid",pid) mem_file = self.memmap_file_process_dict.get(pid, None) # each process owns its handler. if mem_file is None: # print("memmap_shape", self.memmap_shape) mem_file = np.memmap(self.memmap_filename, self.memmap_dtype, mode='r', shape=self.memmap_shape) self.memmap_file_process_dict[pid] = mem_file data = np.copy(mem_file[idx_offset + start_idx_crop: idx_offset + end_idx_crop]) # print(mem_file[idx_offset + start_idx_crop: idx_offset + end_idx_crop]) if self.annotation: mem_file_label = self.memmap_file_process_dict_label.get(pid, None) # each process owns its handler. if mem_file_label is None: mem_file_label = np.memmap(self.memmap_filename_label, self.memmap_dtype, mode='r', shape=self.memmap_shape_label) self.memmap_file_process_dict_label[pid] = mem_file_label label = np.copy(mem_file_label[idx_offset + start_idx_crop: idx_offset + end_idx_crop]) else: label = self.timeseries_df.iloc[df_idx][self.col_lbl] else: # single npy array ID = self.timeseries_df.iloc[df_idx]["data"] data = self.npy_data[ID][start_idx_crop:end_idx_crop] if self.annotation: label = self.npy_data_label[ID][start_idx_crop:end_idx_crop] else: label = self.timeseries_df.iloc[df_idx][self.col_lbl] sample = {'data': data, 'label': label, 'ID': ID} for t in self.transforms: sample = t(sample) return sample def get_sampling_weights(self, class_weight_dict, length_weighting=False, group_by_col=None): assert (self.annotation is False) assert (length_weighting is False or group_by_col is None) weights = np.zeros(len(self.df_idx_mapping), dtype=np.float32) length_per_class = {} length_per_group = {} for iw, (i, s, e) in enumerate(zip(self.df_idx_mapping, self.start_idx_mapping, self.end_idx_mapping)): label = self.timeseries_df.iloc[i][self.col_lbl] weight = class_weight_dict[label] if length_weighting: if label in length_per_class.keys(): length_per_class[label] += e - s else: length_per_class[label] = e - s if group_by_col is not None: group = self.timeseries_df.iloc[i][group_by_col] if group in length_per_group.keys(): length_per_group[group] += e - s else: length_per_group[group] = e - s weights[iw] = weight if length_weighting: # need second pass to properly take into account the total length per class for iw, (i, s, e) in enumerate(zip(self.df_idx_mapping, self.start_idx_mapping, self.end_idx_mapping)): label = self.timeseries_df.iloc[i][self.col_lbl] weights[iw] = (e - s) / length_per_class[label] * weights[iw] if group_by_col is not None: for iw, (i, s, e) in enumerate(zip(self.df_idx_mapping, self.start_idx_mapping, self.end_idx_mapping)): group = self.timeseries_df.iloc[i][group_by_col] weights[iw] = (e - s) / length_per_group[group] * weights[iw] weights = weights / np.min(weights) # normalize smallest weight to 1 return weights def get_id_mapping(self): return self.df_idx_mapping class RandomCrop(object): """ Crop randomly the image in a sample (deprecated). """ def __init__(self, output_size, annotation=False): self.output_size = output_size self.annotation = annotation def __call__(self, sample): data, label, ID = sample['data'], sample['label'], sample['ID'] timesteps = len(data) assert (timesteps >= self.output_size) if timesteps == self.output_size: start = 0 else: start = random.randint(0, timesteps - self.output_size - 1) # np.random.randint(0, timesteps - self.output_size) data = data[start: start + self.output_size] if self.annotation: label = label[start: start + self.output_size] return {'data': data, 'label': label, "ID": ID} class CenterCrop(object): """ Center crop the image in a sample (deprecated). """ def __init__(self, output_size, annotation=False): self.output_size = output_size self.annotation = annotation def __call__(self, sample): data, label, ID = sample['data'], sample['label'], sample['ID'] timesteps = len(data) start = (timesteps - self.output_size) // 2 data = data[start: start + self.output_size] if self.annotation: label = label[start: start + self.output_size] return {'data': data, 'label': label, "ID": ID} class GaussianNoise(object): """ Add gaussian noise to sample. """ def __init__(self, scale=0.1): self.scale = scale def __call__(self, sample): if self.scale == 0: return sample else: data, label, ID = sample['data'], sample['label'], sample['ID'] data = data + np.reshape(np.array([random.gauss(0, self.scale) for _ in range(np.prod(data.shape))]), data.shape) # np.random.normal(scale=self.scale,size=data.shape).astype(np.float32) return {'data': data, 'label': label, "ID": ID} class Rescale(object): """Rescale by factor. """ def __init__(self, scale=0.5, interpolation_order=3): self.scale = scale self.interpolation_order = interpolation_order def __call__(self, sample): if self.scale == 1: return sample else: data, label, ID = sample['data'], sample['label'], sample['ID'] timesteps_new = int(self.scale * len(data)) data = transform.resize(data, (timesteps_new, data.shape[1]), order=self.interpolation_order).astype( np.float32) return {'data': data, 'label': label, "ID": ID} class ToTensor(object): """Convert ndarrays in sample to Tensors.""" def __init__(self, transpose_data1d=True): self.transpose_data1d = transpose_data1d def __call__(self, sample): def _to_tensor(data, transpose_data1d=False): if ( len(data.shape) == 2 and transpose_data1d is True): # swap channel and time axis for direct application of pytorch's 1d convs data = data.transpose((1, 0)) if isinstance(data, np.ndarray): return torch.from_numpy(data) else: # default_collate will take care of it return data data, label, ID = sample['data'], sample['label'], sample['ID'] if not isinstance(data, tuple): data = _to_tensor(data, self.transpose_data1d) else: data = tuple(_to_tensor(x, self.transpose_data1d) for x in data) if not isinstance(label, tuple): label = _to_tensor(label) else: label = tuple(_to_tensor(x) for x in label) return data, label # returning as a tuple (potentially of lists) class Normalize(object): """ Normalize using given stats. """ def __init__(self, stats_mean, stats_std, input=True, channels=None): if channels is None: channels = [] self.stats_mean = np.expand_dims(stats_mean.astype(np.float32), axis=0) if stats_mean is not None else None self.stats_std = np.expand_dims(stats_std.astype(np.float32), axis=0) + 1e-8 if stats_std is not None else None self.input = input if len(channels) > 0: for i in range(len(stats_mean)): if not (i in channels): self.stats_mean[:, i] = 0 self.stats_std[:, i] = 1 def __call__(self, sample): if self.input: data = sample['data'] else: data = sample['label'] if self.stats_mean is not None: data = data - self.stats_mean if self.stats_std is not None: data = data / self.stats_std if self.input: return {'data': data, 'label': sample['label'], "ID": sample['ID']} else: return {'data': sample['data'], 'label': data, "ID": sample['ID']} class ButterFilter(object): """ Normalize using given stats. """ def __init__(self, lowcut=50, highcut=50, fs=100, order=5, btype='band', forwardbackward=True, input=True): self.filter = butter_filter(lowcut, highcut, fs, order, btype) self.input = input self.forwardbackward = forwardbackward def __call__(self, sample): if self.input: data = sample['data'] else: data = sample['label'] # check multiple axis if self.forwardbackward: data = sosfiltfilt(self.filter, data, axis=0) else: data = sosfilt(self.filter, data, axis=0) if self.input: return {'data': data, 'label': sample['label'], "ID": sample['ID']} else: return {'data': sample['data'], 'label': data, "ID": sample['ID']} class ChannelFilter(object): """ Select certain channels. """ def __init__(self, channels=None, input=True): if channels is None: channels = [0] self.channels = channels self.input = input def __call__(self, sample): if self.input: return {'data': sample['data'][:, self.channels], 'label': sample['label'], "ID": sample['ID']} else: return {'data': sample['data'], 'label': sample['label'][:, self.channels], "ID": sample['ID']} class Transform(object): """ Transforms data using a given function i.e. data_new = func(data) for input is True else label_new = func(label) """ def __init__(self, func, input=False): self.func = func self.input = input def __call__(self, sample): if self.input: return {'data': self.func(sample['data']), 'label': sample['label'], "ID": sample['ID']} else: return {'data': sample['data'], 'label': self.func(sample['label']), "ID": sample['ID']} class TupleTransform(object): """ Transforms data using a given function (operating on both data and label and return a tuple) i.e. data_new, label_new = func(data_old, label_old) """ def __init__(self, func, input=False): self.func = func def __call__(self, sample): data_new, label_new = self.func(sample['data'], sample['label']) return {'data': data_new, 'label': label_new, "ID": sample['ID']} # MIL and ensemble models def aggregate_predictions(preds, targs=None, idmap=None, aggregate_fn=np.mean, verbose=True): """ aggregates potentially multiple predictions per sample (can also pass targs for convenience) idmap: idmap as returned by TimeSeriesCropsDataset's get_id_mapping preds: ordered predictions as returned by learn.get_preds() aggregate_fn: function that is used to aggregate multiple predictions per sample (most commonly np.amax or np.mean) """ if idmap is not None and len(idmap) != len(np.unique(idmap)): if verbose: print("aggregating predictions...") preds_aggregated = [] targs_aggregated = [] for i in np.unique(idmap): preds_local = preds[np.where(idmap == i)[0]] preds_aggregated.append(aggregate_fn(preds_local, axis=0)) if targs is not None: targs_local = targs[np.where(idmap == i)[0]] assert (np.all(targs_local == targs_local[0])) # all labels have to agree targs_aggregated.append(targs_local[0]) if targs is None: return np.array(preds_aggregated) else: return np.array(preds_aggregated), np.array(targs_aggregated) else: if targs is None: return preds else: return preds, targs class milwrapper(nn.Module): def __init__(self, model, input_size, n, stride=None, softmax=True): super().__init__() self.n = n self.input_size = input_size self.model = model self.softmax = softmax self.stride = input_size if stride is None else stride def forward(self, x): # bs,ch,seq for i in range(self.n): pred_single = self.model(x[:, :, i * self.stride:i * self.stride + self.input_size]) pred_single = nn.functional.softmax(pred_single, dim=1) if i == 0: pred = pred_single else: pred += pred_single return pred / self.n class ensemblewrapper(nn.Module): def __init__(self, model, checkpts): super().__init__() self.model = model self.checkpts = checkpts def forward(self, x): # bs,ch,seq for i, c in enumerate(self.checkpts): state = torch.load(Path("./models/") / f'{c}.pth', map_location=x.device) self.model.load_state_dict(state['model'], strict=True) pred_single = self.model(x) pred_single = nn.functional.softmax(pred_single, dim=1) if (i == 0): pred = pred_single else: pred += pred_single return pred / len(self.checkpts)