Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import numpy as np | |
from sklearn.cluster import KMeans | |
from scipy.stats import norm | |
from matplotlib import pyplot as plt | |
import pickle as pkl | |
class NDB: | |
def __init__( | |
self, | |
training_data=None, | |
number_of_bins=100, | |
significance_level=0.05, | |
z_threshold=None, | |
whitening=False, | |
max_dims=None, | |
cache_folder=None, | |
): | |
""" | |
NDB Evaluation Class | |
:param training_data: Optional - the training samples - array of m x d floats (m samples of dimension d) | |
:param number_of_bins: Number of bins (clusters) default=100 | |
:param significance_level: The statistical significance level for the two-sample test | |
:param z_threshold: Allow defining a threshold in terms of difference/SE for defining a bin as statistically different | |
:param whitening: Perform data whitening - subtract mean and divide by per-dimension std | |
:param max_dims: Max dimensions to use in K-means. By default derived automatically from d | |
:param bins_file: Optional - file to write / read-from the clusters (to avoid re-calculation) | |
""" | |
self.number_of_bins = number_of_bins | |
self.significance_level = significance_level | |
self.z_threshold = z_threshold | |
self.whitening = whitening | |
self.ndb_eps = 1e-6 | |
self.training_mean = 0.0 | |
self.training_std = 1.0 | |
self.max_dims = max_dims | |
self.cache_folder = cache_folder | |
self.bin_centers = None | |
self.bin_proportions = None | |
self.ref_sample_size = None | |
self.used_d_indices = None | |
self.results_file = None | |
self.test_name = "ndb_{}_bins_{}".format( | |
self.number_of_bins, "whiten" if self.whitening else "orig" | |
) | |
self.cached_results = {} | |
if self.cache_folder: | |
self.results_file = os.path.join( | |
cache_folder, self.test_name + "_results.pkl" | |
) | |
if os.path.isfile(self.results_file): | |
# print('Loading previous results from', self.results_file, ':') | |
self.cached_results = pkl.load(open(self.results_file, "rb")) | |
# print(self.cached_results.keys()) | |
if training_data is not None or cache_folder is not None: | |
bins_file = None | |
if cache_folder: | |
os.makedirs(cache_folder, exist_ok=True) | |
bins_file = os.path.join(cache_folder, self.test_name + ".pkl") | |
self.construct_bins(training_data, bins_file) | |
def construct_bins(self, training_samples, bins_file): | |
""" | |
Performs K-means clustering of the training samples | |
:param training_samples: An array of m x d floats (m samples of dimension d) | |
""" | |
# if self.__read_from_bins_file(bins_file): | |
# return | |
n, d = training_samples.shape | |
k = self.number_of_bins | |
# print("k is",k) | |
if self.whitening: | |
self.training_mean = np.mean(training_samples, axis=0) | |
self.training_std = np.std(training_samples, axis=0) + self.ndb_eps | |
if self.max_dims is None and d > 1000: | |
# To ran faster, perform binning on sampled data dimension (i.e. don't use all channels of all pixels) | |
self.max_dims = d // 6 | |
whitened_samples = (training_samples - self.training_mean) / self.training_std | |
d_used = d if self.max_dims is None else min(d, self.max_dims) | |
self.used_d_indices = np.random.choice(d, d_used, replace=False) | |
# print('Performing K-Means clustering of {} samples in dimension {} / {} to {} clusters ...'.format(n, d_used, d, k)) | |
# print('Can take a couple of minutes...') | |
if n // k > 1000: | |
print( | |
"Training data size should be ~500 times the number of bins (for reasonable speed and accuracy)" | |
) | |
clusters = KMeans(n_clusters=k, max_iter=100).fit( | |
whitened_samples[:, self.used_d_indices] | |
) | |
bin_centers = np.zeros([k, d]) | |
for i in range(k): | |
bin_centers[i, :] = np.mean( | |
whitened_samples[clusters.labels_ == i, :], axis=0 | |
) | |
# Organize bins by size | |
label_vals, label_counts = np.unique(clusters.labels_, return_counts=True) | |
bin_order = np.argsort(-label_counts) | |
self.bin_proportions = label_counts[bin_order] / np.sum(label_counts) | |
self.bin_centers = bin_centers[bin_order, :] | |
self.ref_sample_size = n | |
self.__write_to_bins_file(bins_file) | |
# print('Done.') | |
def evaluate(self, query_samples, model_label=None): | |
""" | |
Assign each sample to the nearest bin center (in L2). Pre-whiten if required. and calculate the NDB | |
(Number of statistically Different Bins) and JS divergence scores. | |
:param query_samples: An array of m x d floats (m samples of dimension d) | |
:param model_label: optional label string for the evaluated model, allows plotting results of multiple models | |
:return: results dictionary containing NDB and JS scores and array of labels (assigned bin for each query sample) | |
""" | |
n = query_samples.shape[0] | |
query_bin_proportions, query_bin_assignments = self.__calculate_bin_proportions( | |
query_samples | |
) | |
# print("query",query_bin_proportions) | |
# print(query_bin_proportions) | |
# print("self",self.bin_proportions) | |
different_bins = NDB.two_proportions_z_test( | |
self.bin_proportions, | |
self.ref_sample_size, | |
query_bin_proportions, | |
n, | |
significance_level=self.significance_level, | |
z_threshold=self.z_threshold, | |
) | |
# print("different",different_bins) | |
ndb = np.count_nonzero(different_bins) | |
print("ndb", ndb) | |
js = NDB.jensen_shannon_divergence(self.bin_proportions, query_bin_proportions) | |
results = { | |
"NDB": ndb, | |
"JS": js, | |
"Proportions": query_bin_proportions, | |
"N": n, | |
"Bin-Assignment": query_bin_assignments, | |
"Different-Bins": different_bins, | |
} | |
if model_label: | |
print("Results for {} samples from {}: ".format(n, model_label), end="") | |
self.cached_results[model_label] = results | |
if self.results_file: | |
# print('Storing result to', self.results_file) | |
pkl.dump(self.cached_results, open(self.results_file, "wb")) | |
print("NDB =", ndb, "NDB/K =", ndb / self.number_of_bins, ", JS =", js) | |
return results | |
def print_results(self): | |
print( | |
"NSB results (K={}{}):".format( | |
self.number_of_bins, ", data whitening" if self.whitening else "" | |
) | |
) | |
for model in sorted(list(self.cached_results.keys())): | |
res = self.cached_results[model] | |
print( | |
"%s: NDB = %d, NDB/K = %.3f, JS = %.4f" | |
% (model, res["NDB"], res["NDB"] / self.number_of_bins, res["JS"]) | |
) | |
def plot_results(self, models_to_plot=None): | |
""" | |
Plot the binning proportions of different methods | |
:param models_to_plot: optional list of model labels to plot | |
""" | |
K = self.number_of_bins | |
w = 1.0 / (len(self.cached_results) + 1) | |
assert K == self.bin_proportions.size | |
assert self.cached_results | |
# Used for plotting only | |
def calc_se(p1, n1, p2, n2): | |
p = (p1 * n1 + p2 * n2) / (n1 + n2) | |
return np.sqrt(p * (1 - p) * (1 / n1 + 1 / n2)) | |
if not models_to_plot: | |
models_to_plot = sorted(list(self.cached_results.keys())) | |
# Visualize the standard errors using the train proportions and size and query sample size | |
train_se = calc_se( | |
self.bin_proportions, | |
self.ref_sample_size, | |
self.bin_proportions, | |
self.cached_results[models_to_plot[0]]["N"], | |
) | |
plt.bar( | |
np.arange(0, K) + 0.5, | |
height=train_se * 2.0, | |
bottom=self.bin_proportions - train_se, | |
width=1.0, | |
label="Train$\pm$SE", | |
color="gray", | |
) | |
ymax = 0.0 | |
for i, model in enumerate(models_to_plot): | |
results = self.cached_results[model] | |
label = "%s (%i : %.4f)" % (model, results["NDB"], results["JS"]) | |
ymax = max(ymax, np.max(results["Proportions"])) | |
if K <= 70: | |
plt.bar( | |
np.arange(0, K) + (i + 1.0) * w, | |
results["Proportions"], | |
width=w, | |
label=label, | |
) | |
else: | |
plt.plot( | |
np.arange(0, K) + 0.5, results["Proportions"], "--*", label=label | |
) | |
plt.legend(loc="best") | |
plt.ylim((0.0, min(ymax, np.max(self.bin_proportions) * 4.0))) | |
plt.grid(True) | |
plt.title( | |
"Binning Proportions Evaluation Results for {} bins (NDB : JS)".format(K) | |
) | |
plt.show() | |
def __calculate_bin_proportions(self, samples): | |
if self.bin_centers is None: | |
print( | |
"First run construct_bins on samples from the reference training data" | |
) | |
# print("as1",samples.shape[1]) | |
# print("as2",self.bin_centers.shape[1]) | |
assert samples.shape[1] == self.bin_centers.shape[1] | |
n, d = samples.shape | |
k = self.bin_centers.shape[0] | |
D = np.zeros([n, k], dtype=samples.dtype) | |
# print('Calculating bin assignments for {} samples...'.format(n)) | |
whitened_samples = (samples - self.training_mean) / self.training_std | |
for i in range(k): | |
print(".", end="", flush=True) | |
D[:, i] = np.linalg.norm( | |
whitened_samples[:, self.used_d_indices] | |
- self.bin_centers[i, self.used_d_indices], | |
ord=2, | |
axis=1, | |
) | |
print() | |
labels = np.argmin(D, axis=1) | |
probs = np.zeros([k]) | |
label_vals, label_counts = np.unique(labels, return_counts=True) | |
probs[label_vals] = label_counts / n | |
return probs, labels | |
def __read_from_bins_file(self, bins_file): | |
if bins_file and os.path.isfile(bins_file): | |
print("Loading binning results from", bins_file) | |
bins_data = pkl.load(open(bins_file, "rb")) | |
self.bin_proportions = bins_data["proportions"] | |
self.bin_centers = bins_data["centers"] | |
self.ref_sample_size = bins_data["n"] | |
self.training_mean = bins_data["mean"] | |
self.training_std = bins_data["std"] | |
self.used_d_indices = bins_data["d_indices"] | |
return True | |
return False | |
def __write_to_bins_file(self, bins_file): | |
if bins_file: | |
print("Caching binning results to", bins_file) | |
bins_data = { | |
"proportions": self.bin_proportions, | |
"centers": self.bin_centers, | |
"n": self.ref_sample_size, | |
"mean": self.training_mean, | |
"std": self.training_std, | |
"d_indices": self.used_d_indices, | |
} | |
pkl.dump(bins_data, open(bins_file, "wb")) | |
def two_proportions_z_test(p1, n1, p2, n2, significance_level, z_threshold=None): | |
# Per http://stattrek.com/hypothesis-test/difference-in-proportions.aspx | |
# See also http://www.itl.nist.gov/div898/software/dataplot/refman1/auxillar/binotest.htm | |
p = (p1 * n1 + p2 * n2) / (n1 + n2) | |
se = np.sqrt(p * (1 - p) * (1 / n1 + 1 / n2)) | |
z = (p1 - p2) / se | |
# print("z",abs(z)) | |
# Allow defining a threshold in terms as Z (difference relative to the SE) rather than in p-values. | |
if z_threshold is not None: | |
return abs(z) > z_threshold | |
p_values = 2.0 * norm.cdf(-1.0 * np.abs(z)) # Two-tailed test | |
return p_values < significance_level | |
def jensen_shannon_divergence(p, q): | |
""" | |
Calculates the symmetric Jensen–Shannon divergence between the two PDFs | |
""" | |
m = (p + q) * 0.5 | |
return 0.5 * (NDB.kl_divergence(p, m) + NDB.kl_divergence(q, m)) | |
def kl_divergence(p, q): | |
""" | |
The Kullback–Leibler divergence. | |
Defined only if q != 0 whenever p != 0. | |
""" | |
assert np.all(np.isfinite(p)) | |
assert np.all(np.isfinite(q)) | |
assert not np.any(np.logical_and(p != 0, q == 0)) | |
p_pos = p > 0 | |
return np.sum(p[p_pos] * np.log(p[p_pos] / q[p_pos])) | |
if __name__ == "__main__": | |
dim = 100 | |
k = 100 | |
n_train = k * 100 | |
n_test = k * 10 | |
train_samples = np.random.uniform(size=[n_train, dim]) | |
ndb = NDB(training_data=train_samples, number_of_bins=k, whitening=True) | |
test_samples = np.random.uniform(high=1.0, size=[n_test, dim]) | |
ndb.evaluate(test_samples, model_label="Test") | |
test_samples = np.random.uniform(high=0.9, size=[n_test, dim]) | |
ndb.evaluate(test_samples, model_label="Good") | |
test_samples = np.random.uniform(high=0.75, size=[n_test, dim]) | |
ndb.evaluate(test_samples, model_label="Bad") | |
ndb.plot_results(models_to_plot=["Test", "Good", "Bad"]) | |