import h5py import numpy as np # If STRICT_WARNING = True, the program exits when negative values are detected in ibf, obf, or rho # This is important to check because negative values are unphysical. STRICT_WARNING = True def verify_nonnegative(fname, ibf, obf, rho): """ Check ibf, obf, and rho for negative values. """ found_warning = False if np.any(ibf < 0): print(f'Warning: negative values detected in array "ibf" in {fname}; min val: {ibf.min()}') found_warning = True elif np.any(obf < 0): print(f'Warning: negative values detected in array "obf" in {fname}') found_warning = True elif np.any(rho < 0): print(f'Warning: negative values detected in array "rho" in {fname}') found_warning = True if found_warning and STRICT_WARNING: print(f'Exiting program. To avoid exiting on this warning, set STRICT_WARNING to False in {__file__.name}') exit() def get_data_from_file(fname, nx2, nx3, Nrange = None): """ Retrieves training X from the given HDF5 file. Assumes that the PDE parameters are stored in datasets with their respective names, i.e., 'ell', 'a1', 'a2'. Likewise, the density rho(x1,x2) and boundary X ibf(x2,x3) / obf(x2,x3) are stored in datasets 'rho', 'ibf', and 'obf'. Args: nx2 (int): Second grid dimension nx3 (int): Third grid dimension fname (str): Path to the HDF5 file containing the X. Nrange (tuple, optional): A tuple of two integers specifying the range of X to extract (start, end). Defaults to None. Returns: tuple: Tuple of extracted X Raises: ValueError: If the file does not contain the required datasets. """ if not isinstance(fname, str): raise TypeError('Filename must be a string.') type_check1 = not (Nrange is None or isinstance(Nrange, (tuple, list))) type_check2 = False if isinstance(Nrange, (tuple, list)): type_check2 = len(Nrange) != 2 if not type_check2: type_check2 = not all((isinstance(i, int) or i is None) for i in Nrange) if type_check1 or type_check2: raise TypeError('Nrange must be a length-2 tuple or list of integers.') if Nrange is None: N1, N2 = None, None else: N1, N2 = Nrange # Check that all datasets are present dset_names = ['ell', 'a1', 'a2', 'rho', 'ibf', 'obf'] with h5py.File(fname, 'r') as input_file: missing_keys = [key for key in dset_names if key not in input_file.keys()] if missing_keys: raise ValueError(f"Missing / incorrectly labeled datasets in file {fname}.'" f"Could not find datasets: {', '.join(missing_keys)}") ell = input_file['ell'][N1:N2] a2 = input_file['a2'][N1:N2] # minor axis of outer boundary a1 = input_file['a1'][N1:N2] # minor axis of inner boundary eccentricity = np.ones_like(a1) - a1 # eccentricity of inner boundary rho = input_file['rho'][N1:N2] ibf = input_file['ibf'][N1:N2] # boundary X on inner boundary obf = input_file['obf'][N1:N2] # boundary X on outer boundary verify_nonnegative(fname, ibf, obf, rho) # Combine 'ibf' and 'obf' into single array N = rho.shape[0] bf = np.zeros((N, 2 * nx2, nx3 // 2), dtype = np.float32) bf[:, :nx2, :] = ibf bf[:, nx2:, :] = obf return a2, ell, eccentricity, bf, rho def reshape_and_stack(a2, ell, ecc): a2 = a2.reshape((-1, 1)) ell = ell.reshape((-1, 1)) ecc = ecc.reshape((-1, 1)) return np.hstack([a2, ell, ecc]) def apply_normalization(bf, rho): fac = np.average(np.abs(rho), axis = (1, 2)) fac = fac.reshape((-1, 1, 1)) bf /= fac rho /= fac return bf, rho def load_data(files, nx2, nx3, ell_min, ell_max, a2_min, a2_max, Nrange_list = None, params_slice = None, normalize_data = False): """ Loads X from the specified files and processes it for use with the STNN. Args: nx2 (int): Second grid dimension nx3 (int): Third grid dimension ell_min / ell_max (float): Minimum / maximum value of 'ell' over parameter space a2_min / a2_max (float): Minimum / maximum value of 'a2' over parameter space files (str or list of str): List of file paths containing the X Nrange_list (list of tuples, optional): Slice indices for the extracting X from the corresponding file. If given, must have the same number of elements as 'file_list'. Defaults to None. params_slice (slice, optional): Boolean array for selecting X over a subset of parameter space (ell, a1, a2). Defaults to None. normalize_data (bool, optional): Flag to normalize 'bf' and 'rho'. Defaults to False. Returns: tuple: A tuple containing the values of ell, a1, a2, bf, and rho. The parameters ell, a1, a2 are combined into a single array 'params'. """ if isinstance(files, (list, tuple)) and len(files) == 0: raise ValueError(f'List of files provided to "load_data" is empty.') if not isinstance(files, (list, tuple)): files = [files] if Nrange_list is None or len(Nrange_list) == 0: # Default Nrange_list = [None for _ in range(len(files))] else: # User-specified; check shapes if not isinstance(Nrange_list, (list, tuple)): Nrange_list = [Nrange_list] if len(files) != len(Nrange_list): raise ValueError('List of input files must have same length as list of Nrange tuples') a2_list = [] ell_list = [] ecc_list = [] bf_list = [] rho_list = [] # Get X from each file and add to the lists for file, Nrange in zip(files, Nrange_list): a2, ell, ecc, bf, rho = get_data_from_file(file, nx2, nx3, Nrange = Nrange) a2_list.append(a2) ell_list.append(ell) ecc_list.append(ecc) bf_list.append(bf) rho_list.append(rho) a2 = np.concatenate(a2_list) ell = np.concatenate(ell_list) ecc = np.concatenate(ecc_list) bf = np.vstack(bf_list) rho = np.vstack(rho_list) # Map ell and a2 values onto [0, 1] ell = (ell - ell_min) / (ell_max - ell_min) a2 = (a2 - a2_min) / (a2_max - a2_min) params = reshape_and_stack(a2, ell, ecc) if not params_slice is None: # Extract subset of X, if params_slice is given params = params[params_slice, ...] bf = bf[params_slice, ...] rho = rho[params_slice, ...] if normalize_data: bf, rho = apply_normalization(bf, rho) return params, bf, rho def load_training_data(file_list, nx2, nx3, ell_min, ell_max, a2_min, a2_max, Nrange_list = None, params_slice = None, test_size = 0.1, random_state = 23, normalize_data = True): """ Loads training X from specified files and preprocesses it for use with training the STNN. This function wraps the 'load_data' function, adding additional steps specific to preparing training X. Args: nx2 (int): Second grid dimension nx3 (int): Third grid dimension ell_min / ell_max (float): Minimum / maximum value of 'ell' over parameter space a2_min / a2_max (float): Minimum / maximum value of 'a2' over parameter space file_list (list of str): List of file paths containing the X Nrange_list (list of tuples, optional): Slice indices for the extracting X from the corresponding file. If given, must have the same number of elements as 'file_list'. Defaults to None. params_slice (slice, optional): Boolean array for selecting X over a subset of parameter space (ell, a1, a2). Defaults to None. test_size (float, optional): Size of the test/validation dataset as a fraction of the total dataset size. Defaults to 0.1. random_state (int, optional): Random seed used to select the train-test split. Defaults to 23. normalize_data (bool, optional): Flag to normalize 'bf' and 'rho'. Defaults to False. Returns: tuple: A tuple containing the values of ell, a1, a2, bf, and rho. The parameters ell, a1, a2 are combined into a single array 'params'. """ params, bf, rho = load_data(file_list, nx2, nx3, ell_min, ell_max, a2_min, a2_max, Nrange_list = Nrange_list, params_slice = params_slice, normalize_data = normalize_data) (rho_train, rho_test, Y_train, Y_test) = train_test_split(rho, [params, bf], test_size = test_size, random_state = random_state) params_train = Y_train[0] params_test = Y_test[0] bf_train = Y_train[1] bf_test = Y_test[1] print('Finished loading training X:') print(f' params_train.shape:\t{params_train.shape}') print(f' bf_train.shape:\t{bf_train.shape}') print(f' rho_train.shape:\t{rho_train.shape}') print(f' params_test.shape:\t{params_test.shape}') print(f' bf_test.shape:\t{bf_test.shape}') print(f' rho_test.shape:\t{rho_test.shape}') # Compute min/max extent of training X in parameter space. # Note that 'params' is denormalized before computing the max/min. min_a2 = np.min(a2_min + (a2_max - a2_min) * params[:, 0]) min_ell = np.min(ell_min + (ell_max - ell_min) * params[:, 1]) min_ecc = np.min(params[:, 2]) max_a2 = np.max(a2_min + (a2_max - a2_min) * params[:, 0]) max_ell = np.max(ell_min + (ell_max - ell_min) * params[:, 1]) max_ecc = np.max(params[:, 2]) print('') print(f' Number of circle samples (train):\t{np.sum(params[:, 2] < 1e-7)}') print(f' Number of ellipse samples (train):\t{np.sum(params[:, 2] > 0)}') print(f' Min .. Max in training X:') print(f' ell:\t{min_ell:.2f} .. {max_ell:.2f}') print(f' a2:\t{min_a2:.2f} .. {max_a2:.2f}') print(f' ecc:\t{min_ecc:.2f} .. {max_ecc:.2f}') print('-------------------------------------------') return params_train, bf_train, rho_train, params_test, bf_test, rho_test def train_test_split(X, Y, test_size = 0.1, random_state = None): """ Split (X, Y) pairs into random train and test subsets. Args: X (np.ndarray or list of arrays): Input dataset Y (np.ndarray or list of arrays): Labels for the dataset test_size (float): Proportion of the dataset to include in the test split random_state (int): Controls the shuffling applied to the X and Y before applying the split Returns: X_train, X_test, Y_train, Y_test: Lists containing train-test split of the dataset. The format is the same as the input X. For example, if 'X' is an array and 'Y' is a list of arrays, then X_train and X_test will be arrays, and Y_train and Y_test will be lists of arrays. Note: This function is included primarilyto reduce module dependency requirements, and it may not be memory-efficient for large datasets. sklearn.model_selection.train_test_split has similar functionality and may be preferred for performance-critical applications. """ if len(X) == 0 or len(Y) == 0: raise ValueError("Input arrays/lists X and Y cannot be empty.") input_X_is_array = isinstance(X, np.ndarray) input_Y_is_array = isinstance(Y, np.ndarray) if input_X_is_array: X = [X] if input_Y_is_array: Y = [Y] total_samples = X[0].shape[0] # Check for consistent number of samples across all datasets if any(x.shape[0] != total_samples for x in X) or any(y.shape[0] != total_samples for y in Y): raise ValueError('Inconsistent number of samples.') Ntest = int(test_size * total_samples) if Ntest < 1 or Ntest > total_samples: raise ValueError('Size of test dataset cannot be less than 1 or greater than the total number of samples.') if random_state is not None: np.random.seed(random_state) # Shuffle indices indices = np.arange(total_samples) np.random.shuffle(indices) # Apply shuffled indices to all datasets shuffled_X = [x[indices] for x in X] shuffled_Y = [y[indices] for y in Y] # Split X and Y X_train = [x[:-Ntest] for x in shuffled_X] X_test = [x[-Ntest:] for x in shuffled_X] Y_train = [y[:-Ntest] for y in shuffled_Y] Y_test = [y[-Ntest:] for y in shuffled_Y] # Convert back to arrays if original input was array if input_X_is_array: X_train, X_test = X_train[0], X_test[0] if input_Y_is_array: Y_train, Y_test = Y_train[0], Y_test[0] return X_train, X_test, Y_train, Y_test