JinhuaL1ANG's picture
v1
9a6dac6
import os
import numpy as np
from sklearn.cluster import KMeans
from scipy.stats import norm
from matplotlib import pyplot as plt
import pickle as pkl
class NDB:
def __init__(
self,
training_data=None,
number_of_bins=100,
significance_level=0.05,
z_threshold=None,
whitening=False,
max_dims=None,
cache_folder=None,
):
"""
NDB Evaluation Class
:param training_data: Optional - the training samples - array of m x d floats (m samples of dimension d)
:param number_of_bins: Number of bins (clusters) default=100
:param significance_level: The statistical significance level for the two-sample test
:param z_threshold: Allow defining a threshold in terms of difference/SE for defining a bin as statistically different
:param whitening: Perform data whitening - subtract mean and divide by per-dimension std
:param max_dims: Max dimensions to use in K-means. By default derived automatically from d
:param bins_file: Optional - file to write / read-from the clusters (to avoid re-calculation)
"""
self.number_of_bins = number_of_bins
self.significance_level = significance_level
self.z_threshold = z_threshold
self.whitening = whitening
self.ndb_eps = 1e-6
self.training_mean = 0.0
self.training_std = 1.0
self.max_dims = max_dims
self.cache_folder = cache_folder
self.bin_centers = None
self.bin_proportions = None
self.ref_sample_size = None
self.used_d_indices = None
self.results_file = None
self.test_name = "ndb_{}_bins_{}".format(
self.number_of_bins, "whiten" if self.whitening else "orig"
)
self.cached_results = {}
if self.cache_folder:
self.results_file = os.path.join(
cache_folder, self.test_name + "_results.pkl"
)
if os.path.isfile(self.results_file):
# print('Loading previous results from', self.results_file, ':')
self.cached_results = pkl.load(open(self.results_file, "rb"))
# print(self.cached_results.keys())
if training_data is not None or cache_folder is not None:
bins_file = None
if cache_folder:
os.makedirs(cache_folder, exist_ok=True)
bins_file = os.path.join(cache_folder, self.test_name + ".pkl")
self.construct_bins(training_data, bins_file)
def construct_bins(self, training_samples, bins_file):
"""
Performs K-means clustering of the training samples
:param training_samples: An array of m x d floats (m samples of dimension d)
"""
# if self.__read_from_bins_file(bins_file):
# return
n, d = training_samples.shape
k = self.number_of_bins
# print("k is",k)
if self.whitening:
self.training_mean = np.mean(training_samples, axis=0)
self.training_std = np.std(training_samples, axis=0) + self.ndb_eps
if self.max_dims is None and d > 1000:
# To ran faster, perform binning on sampled data dimension (i.e. don't use all channels of all pixels)
self.max_dims = d // 6
whitened_samples = (training_samples - self.training_mean) / self.training_std
d_used = d if self.max_dims is None else min(d, self.max_dims)
self.used_d_indices = np.random.choice(d, d_used, replace=False)
# print('Performing K-Means clustering of {} samples in dimension {} / {} to {} clusters ...'.format(n, d_used, d, k))
# print('Can take a couple of minutes...')
if n // k > 1000:
print(
"Training data size should be ~500 times the number of bins (for reasonable speed and accuracy)"
)
clusters = KMeans(n_clusters=k, max_iter=100).fit(
whitened_samples[:, self.used_d_indices]
)
bin_centers = np.zeros([k, d])
for i in range(k):
bin_centers[i, :] = np.mean(
whitened_samples[clusters.labels_ == i, :], axis=0
)
# Organize bins by size
label_vals, label_counts = np.unique(clusters.labels_, return_counts=True)
bin_order = np.argsort(-label_counts)
self.bin_proportions = label_counts[bin_order] / np.sum(label_counts)
self.bin_centers = bin_centers[bin_order, :]
self.ref_sample_size = n
self.__write_to_bins_file(bins_file)
# print('Done.')
def evaluate(self, query_samples, model_label=None):
"""
Assign each sample to the nearest bin center (in L2). Pre-whiten if required. and calculate the NDB
(Number of statistically Different Bins) and JS divergence scores.
:param query_samples: An array of m x d floats (m samples of dimension d)
:param model_label: optional label string for the evaluated model, allows plotting results of multiple models
:return: results dictionary containing NDB and JS scores and array of labels (assigned bin for each query sample)
"""
n = query_samples.shape[0]
query_bin_proportions, query_bin_assignments = self.__calculate_bin_proportions(
query_samples
)
# print("query",query_bin_proportions)
# print(query_bin_proportions)
# print("self",self.bin_proportions)
different_bins = NDB.two_proportions_z_test(
self.bin_proportions,
self.ref_sample_size,
query_bin_proportions,
n,
significance_level=self.significance_level,
z_threshold=self.z_threshold,
)
# print("different",different_bins)
ndb = np.count_nonzero(different_bins)
print("ndb", ndb)
js = NDB.jensen_shannon_divergence(self.bin_proportions, query_bin_proportions)
results = {
"NDB": ndb,
"JS": js,
"Proportions": query_bin_proportions,
"N": n,
"Bin-Assignment": query_bin_assignments,
"Different-Bins": different_bins,
}
if model_label:
print("Results for {} samples from {}: ".format(n, model_label), end="")
self.cached_results[model_label] = results
if self.results_file:
# print('Storing result to', self.results_file)
pkl.dump(self.cached_results, open(self.results_file, "wb"))
print("NDB =", ndb, "NDB/K =", ndb / self.number_of_bins, ", JS =", js)
return results
def print_results(self):
print(
"NSB results (K={}{}):".format(
self.number_of_bins, ", data whitening" if self.whitening else ""
)
)
for model in sorted(list(self.cached_results.keys())):
res = self.cached_results[model]
print(
"%s: NDB = %d, NDB/K = %.3f, JS = %.4f"
% (model, res["NDB"], res["NDB"] / self.number_of_bins, res["JS"])
)
def plot_results(self, models_to_plot=None):
"""
Plot the binning proportions of different methods
:param models_to_plot: optional list of model labels to plot
"""
K = self.number_of_bins
w = 1.0 / (len(self.cached_results) + 1)
assert K == self.bin_proportions.size
assert self.cached_results
# Used for plotting only
def calc_se(p1, n1, p2, n2):
p = (p1 * n1 + p2 * n2) / (n1 + n2)
return np.sqrt(p * (1 - p) * (1 / n1 + 1 / n2))
if not models_to_plot:
models_to_plot = sorted(list(self.cached_results.keys()))
# Visualize the standard errors using the train proportions and size and query sample size
train_se = calc_se(
self.bin_proportions,
self.ref_sample_size,
self.bin_proportions,
self.cached_results[models_to_plot[0]]["N"],
)
plt.bar(
np.arange(0, K) + 0.5,
height=train_se * 2.0,
bottom=self.bin_proportions - train_se,
width=1.0,
label="Train$\pm$SE",
color="gray",
)
ymax = 0.0
for i, model in enumerate(models_to_plot):
results = self.cached_results[model]
label = "%s (%i : %.4f)" % (model, results["NDB"], results["JS"])
ymax = max(ymax, np.max(results["Proportions"]))
if K <= 70:
plt.bar(
np.arange(0, K) + (i + 1.0) * w,
results["Proportions"],
width=w,
label=label,
)
else:
plt.plot(
np.arange(0, K) + 0.5, results["Proportions"], "--*", label=label
)
plt.legend(loc="best")
plt.ylim((0.0, min(ymax, np.max(self.bin_proportions) * 4.0)))
plt.grid(True)
plt.title(
"Binning Proportions Evaluation Results for {} bins (NDB : JS)".format(K)
)
plt.show()
def __calculate_bin_proportions(self, samples):
if self.bin_centers is None:
print(
"First run construct_bins on samples from the reference training data"
)
# print("as1",samples.shape[1])
# print("as2",self.bin_centers.shape[1])
assert samples.shape[1] == self.bin_centers.shape[1]
n, d = samples.shape
k = self.bin_centers.shape[0]
D = np.zeros([n, k], dtype=samples.dtype)
# print('Calculating bin assignments for {} samples...'.format(n))
whitened_samples = (samples - self.training_mean) / self.training_std
for i in range(k):
print(".", end="", flush=True)
D[:, i] = np.linalg.norm(
whitened_samples[:, self.used_d_indices]
- self.bin_centers[i, self.used_d_indices],
ord=2,
axis=1,
)
print()
labels = np.argmin(D, axis=1)
probs = np.zeros([k])
label_vals, label_counts = np.unique(labels, return_counts=True)
probs[label_vals] = label_counts / n
return probs, labels
def __read_from_bins_file(self, bins_file):
if bins_file and os.path.isfile(bins_file):
print("Loading binning results from", bins_file)
bins_data = pkl.load(open(bins_file, "rb"))
self.bin_proportions = bins_data["proportions"]
self.bin_centers = bins_data["centers"]
self.ref_sample_size = bins_data["n"]
self.training_mean = bins_data["mean"]
self.training_std = bins_data["std"]
self.used_d_indices = bins_data["d_indices"]
return True
return False
def __write_to_bins_file(self, bins_file):
if bins_file:
print("Caching binning results to", bins_file)
bins_data = {
"proportions": self.bin_proportions,
"centers": self.bin_centers,
"n": self.ref_sample_size,
"mean": self.training_mean,
"std": self.training_std,
"d_indices": self.used_d_indices,
}
pkl.dump(bins_data, open(bins_file, "wb"))
@staticmethod
def two_proportions_z_test(p1, n1, p2, n2, significance_level, z_threshold=None):
# Per http://stattrek.com/hypothesis-test/difference-in-proportions.aspx
# See also http://www.itl.nist.gov/div898/software/dataplot/refman1/auxillar/binotest.htm
p = (p1 * n1 + p2 * n2) / (n1 + n2)
se = np.sqrt(p * (1 - p) * (1 / n1 + 1 / n2))
z = (p1 - p2) / se
# print("z",abs(z))
# Allow defining a threshold in terms as Z (difference relative to the SE) rather than in p-values.
if z_threshold is not None:
return abs(z) > z_threshold
p_values = 2.0 * norm.cdf(-1.0 * np.abs(z)) # Two-tailed test
return p_values < significance_level
@staticmethod
def jensen_shannon_divergence(p, q):
"""
Calculates the symmetric Jensen–Shannon divergence between the two PDFs
"""
m = (p + q) * 0.5
return 0.5 * (NDB.kl_divergence(p, m) + NDB.kl_divergence(q, m))
@staticmethod
def kl_divergence(p, q):
"""
The Kullback–Leibler divergence.
Defined only if q != 0 whenever p != 0.
"""
assert np.all(np.isfinite(p))
assert np.all(np.isfinite(q))
assert not np.any(np.logical_and(p != 0, q == 0))
p_pos = p > 0
return np.sum(p[p_pos] * np.log(p[p_pos] / q[p_pos]))
if __name__ == "__main__":
dim = 100
k = 100
n_train = k * 100
n_test = k * 10
train_samples = np.random.uniform(size=[n_train, dim])
ndb = NDB(training_data=train_samples, number_of_bins=k, whitening=True)
test_samples = np.random.uniform(high=1.0, size=[n_test, dim])
ndb.evaluate(test_samples, model_label="Test")
test_samples = np.random.uniform(high=0.9, size=[n_test, dim])
ndb.evaluate(test_samples, model_label="Good")
test_samples = np.random.uniform(high=0.75, size=[n_test, dim])
ndb.evaluate(test_samples, model_label="Bad")
ndb.plot_results(models_to_plot=["Test", "Good", "Bad"])