KuangDW
Add laser2.spm using Git LFS
05d3571
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
# LASER Language-Agnostic SEntence Representations
# is a toolkit to calculate multilingual sentence embeddings
# and to use them for document classification, bitext filtering
# and mining
#
# --------------------------------------------------------
#
# Tool to calculate multilingual similarity error rate (xSIM)
import faiss
import numpy as np
import typing as tp
import os
import json
from enum import Enum
class Margin(Enum):
RATIO = "ratio"
DISTANCE = "distance"
ABSOLUTE = "absolute"
@classmethod
def has_value(cls, value):
return value in cls._value2member_map_
def xSIM(
x: tp.Union[str, np.ndarray],
y: tp.Union[str, np.ndarray],
margin: str = Margin.RATIO.value,
k: int = 4,
dim: int = 1024,
fp16: bool = False,
eval_text: str = None,
augmented_json: str = None,
) -> tp.Tuple[int, int, tp.Dict[str, int]]:
assert Margin.has_value(margin), f"Margin type: {margin}, is not supported."
if not isinstance(x, np.ndarray):
x = _load_embeddings(x, dim, fp16)
if not isinstance(y, np.ndarray):
y = _load_embeddings(y, dim, fp16)
# calculate xSIM error
return calculate_error(x, y, margin, k, eval_text, augmented_json)
def _load_embeddings(infile: str, dim: int, fp16: bool = False) -> np.ndarray:
assert os.path.isfile(infile), f"file: {infile} does not exist."
emb = np.fromfile(infile, dtype=np.float16 if fp16 else np.float32)
num_examples = emb.shape[0] // dim
emb.resize(num_examples, dim)
if fp16:
emb = emb.astype(np.float32) # faiss currently only supports fp32
return emb
def score_margin(
Dxy: np.ndarray,
Ixy: np.ndarray,
Ax: np.ndarray,
Ay: np.ndarray,
margin: str,
k: int,
) -> np.ndarray:
nbex = Dxy.shape[0]
scores = np.zeros((nbex, k))
for i in range(nbex):
for j in range(k):
jj = Ixy[i, j]
a = Dxy[i, j]
b = (Ax[i] + Ay[jj]) / 2
if margin == Margin.RATIO.value:
scores[i, j] = a / b
else: # distance margin
scores[i, j] = a - b
return scores
def _score_knn(x: np.ndarray, y: np.ndarray, k: int, margin: str) -> np.ndarray:
nbex, dim = x.shape
# create index
idx_x = faiss.IndexFlatIP(dim)
idx_y = faiss.IndexFlatIP(dim)
# L2 normalization needed for cosine distance
faiss.normalize_L2(x)
faiss.normalize_L2(y)
idx_x.add(x)
idx_y.add(y)
if margin == Margin.ABSOLUTE.value:
scores, indices = idx_y.search(x, 1)
else:
# return cosine similarity and indices of k closest neighbors
Cos_xy, Idx_xy = idx_y.search(x, k)
Cos_yx, Idx_yx = idx_x.search(y, k)
# average cosines
Avg_xy = Cos_xy.mean(axis=1)
Avg_yx = Cos_yx.mean(axis=1)
scores = score_margin(Cos_xy, Idx_xy, Avg_xy, Avg_yx, margin, k)
# find best
best = scores.argmax(axis=1)
indices = np.zeros((nbex, 1), dtype=np.int32)
for i in range(nbex):
indices[i] = Idx_xy[i, best[i]]
return indices
def get_transform(augmented_json, closest_neighbor, src):
if (
closest_neighbor in augmented_json
and augmented_json[closest_neighbor]["src"] == src
):
return augmented_json[closest_neighbor]["errtype"]
return "Misaligned"
def calculate_error(
x: np.ndarray,
y: np.ndarray,
margin: str = None,
k: int = 4,
eval_text: str = None,
augmented_json: str = None,
) -> tp.Tuple[int, int, tp.Dict[str, int]]:
if augmented_json:
with open(augmented_json) as f:
augmented_json = json.load(f)
assert (
x.shape[0] < y.shape[0]
), f"Shape mismatch: {x.shape[0]} >= target {y.shape[0]}"
else:
assert (
x.shape == y.shape
), f"number of source {x.shape} / target {y.shape} shapes mismatch, "
nbex = x.shape[0]
augmented_report = {}
# for each x calculate the highest scoring neighbor from y
closest_neighbor = _score_knn(x, y, k, margin)
if eval_text: # calc textual error
lines = open(eval_text, encoding="utf-8", errors="surrogateescape").readlines()
err = 0
for ex in range(nbex):
if lines[ex] != lines[closest_neighbor[ex, 0]]:
err += 1
if augmented_json:
transform = get_transform(
augmented_json,
lines[closest_neighbor[ex, 0]].strip(),
lines[ex].strip(),
)
augmented_report[transform] = augmented_report.get(transform, 0) + 1
else: # calc index error
ref = np.linspace(0, nbex - 1, nbex).astype(int) # [0, nbex)
err = nbex - np.equal(closest_neighbor.reshape(nbex), ref).astype(int).sum()
return err, nbex, augmented_report