KuangDW
Add laser2.spm using Git LFS
05d3571
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# LASER Language-Agnostic SEntence Representations
# is a toolkit to calculate multilingual sentence embeddings
# and to use them for various tasks such as document classification,
# and bitext filtering
#
# --------------------------------------------------------
#
# Tool to calculate the dual approach multilingual similarity error rate (P-xSIM)
import typing as tp
from pathlib import Path
import faiss
import numpy as np
import torch
from scipy.special import softmax
from sklearn.metrics.pairwise import cosine_similarity
from stopes.eval.auto_pcp.audio_comparator import Comparator, get_model_pred
from xsim import Margin, score_margin
def get_neighbors(
x: np.ndarray, y: np.ndarray, k: int, margin: str
) -> tp.Tuple[np.ndarray, np.ndarray, int]:
x_copy = x.astype(np.float32).copy()
y_copy = y.astype(np.float32).copy()
nbex, dim = x.shape
# create index
idx_x = faiss.IndexFlatIP(dim)
idx_y = faiss.IndexFlatIP(dim)
# L2 normalization needed for cosine distance
faiss.normalize_L2(x_copy)
faiss.normalize_L2(y_copy)
idx_x.add(x_copy)
idx_y.add(y_copy)
if margin == Margin.ABSOLUTE.value:
scores, indices = idx_y.search(x_copy, k)
else:
# return cosine similarity and indices of k closest neighbors
Cos_xy, Idx_xy = idx_y.search(x_copy, k)
Cos_yx, Idx_yx = idx_x.search(y_copy, k)
# average cosines
Avg_xy = Cos_xy.mean(axis=1)
Avg_yx = Cos_yx.mean(axis=1)
scores = score_margin(Cos_xy, Idx_xy, Avg_xy, Avg_yx, margin, k)
indices = Idx_xy
return scores, indices, nbex
def get_cosine_scores(src_emb: np.ndarray, neighbor_embs: np.ndarray) -> np.ndarray:
assert src_emb.shape[0] == neighbor_embs.shape[1]
src_embs = np.repeat(
np.expand_dims(src_emb, axis=0), neighbor_embs.shape[0], axis=0
)
cosine_scores = cosine_similarity(src_embs, neighbor_embs).diagonal()
return cosine_scores
def get_comparator_scores(
src_emb: np.ndarray,
neighbor_embs: np.ndarray,
comparator_model: tp.Any,
symmetrize_comparator: bool,
) -> np.ndarray:
src_embs = np.repeat(
np.expand_dims(src_emb, axis=0), neighbor_embs.shape[0], axis=0
)
a = torch.from_numpy(src_embs).unsqueeze(1) # restore depth dim
b = torch.from_numpy(neighbor_embs).unsqueeze(1)
res = get_comparator_preds(a, b, comparator_model, symmetrize_comparator)
scores_softmax = softmax(res)
return np.array(scores_softmax)
def get_comparator_preds(
src_emb: np.ndarray, tgt_emb: np.ndarray, model: tp.Any, symmetrize: bool
):
preds = (
get_model_pred(
model,
src=src_emb[:, 0],
mt=tgt_emb[:, 0],
use_gpu=model.use_gpu,
batch_size=1,
)[:, 0]
.cpu()
.numpy()
)
if symmetrize:
preds2 = (
get_model_pred(
model,
src=tgt_emb[:, 0],
mt=src_emb[:, 0],
use_gpu=model.use_gpu,
batch_size=1,
)[:, 0]
.cpu()
.numpy()
)
preds = (preds2 + preds) / 2
return preds
def get_blended_predictions(
alpha: float,
nbex: int,
margin_scores: np.ndarray,
x_aux: np.ndarray,
y_aux: np.ndarray,
neighbor_indices: np.ndarray,
comparator_model: tp.Optional[tp.Any] = None,
symmetrize_comparator: bool = False,
) -> list[int]:
predictions = []
for src_index in range(nbex):
neighbors = neighbor_indices[src_index]
neighbor_embs = y_aux[neighbors].astype(np.float32)
src_emb = x_aux[src_index].astype(np.float32)
aux_scores = (
get_comparator_scores(
src_emb, neighbor_embs, comparator_model, symmetrize_comparator
)
if comparator_model
else get_cosine_scores(src_emb, neighbor_embs)
)
assert margin_scores[src_index].shape == aux_scores.shape
blended_scores = alpha * margin_scores[src_index] + (1 - alpha) * aux_scores
blended_neighbor_idx = blended_scores.argmax()
predictions.append(neighbors[blended_neighbor_idx])
return predictions
def PxSIM(
x: np.ndarray,
y: np.ndarray,
x_aux: np.ndarray,
y_aux: np.ndarray,
alpha: float,
margin: str = Margin.RATIO.value,
k: int = 16,
comparator_path: tp.Optional[Path] = None,
symmetrize_comparator: bool = False,
) -> tp.Tuple[int, int, list[int]]:
"""
Parameters
----------
x : np.ndarray
source-side embedding array
y : np.ndarray
target-side embedding array
x_aux : np.ndarray
source-side embedding array using auxiliary model
y_aux : np.ndarray
target-side embedding array using auxiliary model
alpha : int
parameter to weight blended score
margin : str
margin scoring function (e.g. ratio, absolute, distance)
k : int
number of neighbors in k-nn search
comparator_path : Path
path to AutoPCP model config
symmetrize_comparator : bool
whether to symmetrize the comparator predictions
Returns
-------
err : int
Number of errors
nbex : int
Number of examples
preds : list[int]
List of (index-based) predictions
"""
assert Margin.has_value(margin), f"Margin type: {margin}, is not supported."
comparator_model = Comparator.load(comparator_path) if comparator_path else None
# get margin-based nearest neighbors
margin_scores, neighbor_indices, nbex = get_neighbors(x, y, k=k, margin=margin)
preds = get_blended_predictions(
alpha,
nbex,
margin_scores,
x_aux,
y_aux,
neighbor_indices,
comparator_model,
symmetrize_comparator,
)
err = sum([idx != pred for idx, pred in enumerate(preds)])
print(f"P-xSIM error: {100 * (err / nbex):.2f}")
return err, nbex, preds
def load_embeddings(
infile: Path, dim: int, fp16: bool = False, numpy_header: bool = False
) -> np.ndarray:
assert infile.exists(), f"file: {infile} does not exist."
if numpy_header:
return np.load(infile)
emb = np.fromfile(infile, dtype=np.float16 if fp16 else np.float32)
num_examples = emb.shape[0] // dim
emb.resize(num_examples, dim)
if fp16:
emb = emb.astype(np.float32) # faiss currently only supports fp32
return emb
def run(
src_emb: Path,
tgt_emb: Path,
src_aux_emb: Path,
tgt_aux_emb: Path,
alpha: float,
margin: str = Margin.RATIO.value,
k: int = 16,
emb_fp16: bool = False,
aux_emb_fp16: bool = False,
emb_dim: int = 1024,
aux_emb_dim: int = 1024,
numpy_header: bool = False,
comparator_path: tp.Optional[Path] = None,
symmetrize_comparator: bool = False,
prediction_savepath: tp.Optional[Path] = None,
) -> None:
x = load_embeddings(src_emb, emb_dim, emb_fp16, numpy_header)
y = load_embeddings(tgt_emb, emb_dim, emb_fp16, numpy_header)
x_aux = load_embeddings(src_aux_emb, aux_emb_dim, aux_emb_fp16, numpy_header)
y_aux = load_embeddings(tgt_aux_emb, aux_emb_dim, aux_emb_fp16, numpy_header)
assert (x.shape == y.shape) and (x_aux.shape == y_aux.shape)
_, _, preds = PxSIM(
x, y, x_aux, y_aux, alpha, margin, k, comparator_path, symmetrize_comparator
)
if prediction_savepath:
with open(prediction_savepath, "w") as outf:
for pred in preds:
print(pred, file=outf)
if __name__ == "__main__":
import func_argparse
func_argparse.main()