# Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. # # LASER Language-Agnostic SEntence Representations # is a toolkit to calculate multilingual sentence embeddings # and to use them for various tasks such as document classification, # and bitext filtering # # -------------------------------------------------------- # # Tool to calculate the dual approach multilingual similarity error rate (P-xSIM) import typing as tp from pathlib import Path import faiss import numpy as np import torch from scipy.special import softmax from sklearn.metrics.pairwise import cosine_similarity from stopes.eval.auto_pcp.audio_comparator import Comparator, get_model_pred from xsim import Margin, score_margin def get_neighbors( x: np.ndarray, y: np.ndarray, k: int, margin: str ) -> tp.Tuple[np.ndarray, np.ndarray, int]: x_copy = x.astype(np.float32).copy() y_copy = y.astype(np.float32).copy() nbex, dim = x.shape # create index idx_x = faiss.IndexFlatIP(dim) idx_y = faiss.IndexFlatIP(dim) # L2 normalization needed for cosine distance faiss.normalize_L2(x_copy) faiss.normalize_L2(y_copy) idx_x.add(x_copy) idx_y.add(y_copy) if margin == Margin.ABSOLUTE.value: scores, indices = idx_y.search(x_copy, k) else: # return cosine similarity and indices of k closest neighbors Cos_xy, Idx_xy = idx_y.search(x_copy, k) Cos_yx, Idx_yx = idx_x.search(y_copy, k) # average cosines Avg_xy = Cos_xy.mean(axis=1) Avg_yx = Cos_yx.mean(axis=1) scores = score_margin(Cos_xy, Idx_xy, Avg_xy, Avg_yx, margin, k) indices = Idx_xy return scores, indices, nbex def get_cosine_scores(src_emb: np.ndarray, neighbor_embs: np.ndarray) -> np.ndarray: assert src_emb.shape[0] == neighbor_embs.shape[1] src_embs = np.repeat( np.expand_dims(src_emb, axis=0), neighbor_embs.shape[0], axis=0 ) cosine_scores = cosine_similarity(src_embs, neighbor_embs).diagonal() return cosine_scores def get_comparator_scores( src_emb: np.ndarray, neighbor_embs: np.ndarray, comparator_model: tp.Any, symmetrize_comparator: bool, ) -> np.ndarray: src_embs = np.repeat( np.expand_dims(src_emb, axis=0), neighbor_embs.shape[0], axis=0 ) a = torch.from_numpy(src_embs).unsqueeze(1) # restore depth dim b = torch.from_numpy(neighbor_embs).unsqueeze(1) res = get_comparator_preds(a, b, comparator_model, symmetrize_comparator) scores_softmax = softmax(res) return np.array(scores_softmax) def get_comparator_preds( src_emb: np.ndarray, tgt_emb: np.ndarray, model: tp.Any, symmetrize: bool ): preds = ( get_model_pred( model, src=src_emb[:, 0], mt=tgt_emb[:, 0], use_gpu=model.use_gpu, batch_size=1, )[:, 0] .cpu() .numpy() ) if symmetrize: preds2 = ( get_model_pred( model, src=tgt_emb[:, 0], mt=src_emb[:, 0], use_gpu=model.use_gpu, batch_size=1, )[:, 0] .cpu() .numpy() ) preds = (preds2 + preds) / 2 return preds def get_blended_predictions( alpha: float, nbex: int, margin_scores: np.ndarray, x_aux: np.ndarray, y_aux: np.ndarray, neighbor_indices: np.ndarray, comparator_model: tp.Optional[tp.Any] = None, symmetrize_comparator: bool = False, ) -> list[int]: predictions = [] for src_index in range(nbex): neighbors = neighbor_indices[src_index] neighbor_embs = y_aux[neighbors].astype(np.float32) src_emb = x_aux[src_index].astype(np.float32) aux_scores = ( get_comparator_scores( src_emb, neighbor_embs, comparator_model, symmetrize_comparator ) if comparator_model else get_cosine_scores(src_emb, neighbor_embs) ) assert margin_scores[src_index].shape == aux_scores.shape blended_scores = alpha * margin_scores[src_index] + (1 - alpha) * aux_scores blended_neighbor_idx = blended_scores.argmax() predictions.append(neighbors[blended_neighbor_idx]) return predictions def PxSIM( x: np.ndarray, y: np.ndarray, x_aux: np.ndarray, y_aux: np.ndarray, alpha: float, margin: str = Margin.RATIO.value, k: int = 16, comparator_path: tp.Optional[Path] = None, symmetrize_comparator: bool = False, ) -> tp.Tuple[int, int, list[int]]: """ Parameters ---------- x : np.ndarray source-side embedding array y : np.ndarray target-side embedding array x_aux : np.ndarray source-side embedding array using auxiliary model y_aux : np.ndarray target-side embedding array using auxiliary model alpha : int parameter to weight blended score margin : str margin scoring function (e.g. ratio, absolute, distance) k : int number of neighbors in k-nn search comparator_path : Path path to AutoPCP model config symmetrize_comparator : bool whether to symmetrize the comparator predictions Returns ------- err : int Number of errors nbex : int Number of examples preds : list[int] List of (index-based) predictions """ assert Margin.has_value(margin), f"Margin type: {margin}, is not supported." comparator_model = Comparator.load(comparator_path) if comparator_path else None # get margin-based nearest neighbors margin_scores, neighbor_indices, nbex = get_neighbors(x, y, k=k, margin=margin) preds = get_blended_predictions( alpha, nbex, margin_scores, x_aux, y_aux, neighbor_indices, comparator_model, symmetrize_comparator, ) err = sum([idx != pred for idx, pred in enumerate(preds)]) print(f"P-xSIM error: {100 * (err / nbex):.2f}") return err, nbex, preds def load_embeddings( infile: Path, dim: int, fp16: bool = False, numpy_header: bool = False ) -> np.ndarray: assert infile.exists(), f"file: {infile} does not exist." if numpy_header: return np.load(infile) emb = np.fromfile(infile, dtype=np.float16 if fp16 else np.float32) num_examples = emb.shape[0] // dim emb.resize(num_examples, dim) if fp16: emb = emb.astype(np.float32) # faiss currently only supports fp32 return emb def run( src_emb: Path, tgt_emb: Path, src_aux_emb: Path, tgt_aux_emb: Path, alpha: float, margin: str = Margin.RATIO.value, k: int = 16, emb_fp16: bool = False, aux_emb_fp16: bool = False, emb_dim: int = 1024, aux_emb_dim: int = 1024, numpy_header: bool = False, comparator_path: tp.Optional[Path] = None, symmetrize_comparator: bool = False, prediction_savepath: tp.Optional[Path] = None, ) -> None: x = load_embeddings(src_emb, emb_dim, emb_fp16, numpy_header) y = load_embeddings(tgt_emb, emb_dim, emb_fp16, numpy_header) x_aux = load_embeddings(src_aux_emb, aux_emb_dim, aux_emb_fp16, numpy_header) y_aux = load_embeddings(tgt_aux_emb, aux_emb_dim, aux_emb_fp16, numpy_header) assert (x.shape == y.shape) and (x_aux.shape == y_aux.shape) _, _, preds = PxSIM( x, y, x_aux, y_aux, alpha, margin, k, comparator_path, symmetrize_comparator ) if prediction_savepath: with open(prediction_savepath, "w") as outf: for pred in preds: print(pred, file=outf) if __name__ == "__main__": import func_argparse func_argparse.main()