Plan2Align-NV / laser /utils /src /cleaner_splitter.py
KuangDW
Add laser2.spm using Git LFS
05d3571
raw
history blame
4.99 kB
import argparse
import sys
import typing as tp
import unicodedata
import xxhash
from sacremoses import MosesPunctNormalizer
from .demojizer import Demojizer, legacy_demojizer
from .remove_non_printing_char import \
get_replacer as non_printing_char_replacer
from .sentence_split import get_split_algo
demojizer = Demojizer()
class SentenceSplitClean:
def __init__(self, splitter_lang: str, split_algo: str):
# setup sentence splitter
self.splitter = get_split_algo(splitter_lang, split_algo=split_algo)
# setup "moses" normalization
self.mpn = MosesPunctNormalizer(lang="en", perl_parity=True) # TODO
self.replace_nonprint = non_printing_char_replacer(" ")
def __call__(self, line):
sentence_splits = self.splitter(line)
line_hash = xxhash.xxh3_64_intdigest(line)
for sent in sentence_splits:
# normalize -- moses equivalent
clean = self.mpn.normalize(sent)
clean = self.replace_nonprint(clean)
# replace π“•π”―π”žπ”«π” π”’π”°π” π”ž by Francesca
clean = unicodedata.normalize("NFKC", clean)
yield (line_hash, sent, clean)
def remove_on_unicode_category(x: str) -> str:
return "".join(filter(lambda ch: not unicodedata.category(ch) in {"So"}, x))
def get_replacer_unicode_category(
skip_min: int, max_num: int, replace_by: str = " "
) -> str:
def replace_by_unicode_category(x: str) -> str:
total_counter = 0
skip_counter = 0
def flt(ch):
nonlocal total_counter
nonlocal skip_counter
if max_num == 0 or total_counter < max_num:
if unicodedata.category(ch) in {"So"}:
if skip_counter < skip_min:
skip_counter += 1
return ch
total_counter += 1
return replace_by
return ch
return "".join(map(flt, x))
return replace_by_unicode_category
# to map with previous versions of the pipeline
def get_sentence_candidate_modifiers() -> tp.List[tp.Callable]:
return [
lambda x: x,
lambda x: x + " ",
lambda x: " " + x,
lambda x: " " + x + " ",
lambda x: " " + x,
lambda x: x.rstrip(),
lambda x: x.lstrip(),
lambda x: " " + x.rstrip(),
lambda x: x.strip(),
lambda x: demojizer(x, ""),
lambda x: demojizer(x, "").strip(),
lambda x: " " + demojizer(x, ""),
legacy_demojizer,
remove_on_unicode_category,
get_replacer_unicode_category(1, 1),
get_replacer_unicode_category(0, 0),
]
def reach_sentence_from_paragraph(
paragraph: str,
expected_paragraph_digest: int,
expected_sentence_digest: int,
lang: str,
sentence_splitters: tp.Dict[str, "SentenceSplitClean"],
debug_candidates: bool,
):
if lang not in sentence_splitters:
sentence_splitters[lang] = SentenceSplitClean(lang, "default")
def no_splitter(paragraph):
line_h = xxhash.xxh3_64_intdigest(paragraph)
return [(line_h, paragraph, paragraph)]
sentence_splitter = sentence_splitters[lang]
splitter_candidates = [sentence_splitter, no_splitter]
for duct_candidate in get_sentence_candidate_modifiers():
for split_cand in splitter_candidates:
for line_hash, sent, clean in split_cand(paragraph):
assert line_hash == expected_paragraph_digest
clean_cand = duct_candidate(clean)
reached_sentence_digest = xxhash.xxh3_64_intdigest(clean_cand)
if debug_candidates:
print(f"{reached_sentence_digest}::\t::{clean_cand}::")
if reached_sentence_digest == expected_sentence_digest:
return clean_cand
return None
def split_clean():
split_algo = "default"
sentence_splitters = {}
for line in sys.stdin:
line_stripped = line.rstrip("\n")
metadata, paragraph = line_stripped.split("\t")
(
_,
_,
_,
_,
paragraph_digest,
sentence_digest,
_,
_,
_,
lang,
_,
) = metadata.split()
paragraph_digest = int(paragraph_digest)
sentence_digest = int(sentence_digest)
sentence = reach_sentence_from_paragraph(
paragraph,
paragraph_digest,
sentence_digest,
lang,
sentence_splitters,
False,
)
if sentence is not None:
print(f"{line_stripped}\t{sentence}")
else:
print(
f"Couldn't match sentence for paragraph: {paragraph_digest} sentence: {sentence_digest} lang: {lang}",
file=sys.stderr,
)
def main():
split_clean()
if __name__ == "__main__":
main()