File size: 7,635 Bytes
e29924d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import os
import warnings
from shutil import copyfile
from typing import Any, Dict, List, Optional, Tuple
import sentencepiece as spm
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers.utils import logging
VOCAB_FILES_NAMES = {"vocab_file": "tokenizer.model"}
logger = logging.get_logger(__name__)
def _get_tokenizer_threads(default: int = -1) -> int:
env_names = [
"PLAMO_TOKENIZER_NUM_THREADS",
"RAYON_NUM_THREADS",
]
for name in env_names:
v = os.environ.get(name, None)
if v:
try:
return int(v)
except ValueError:
warnings.warn(
f"Value assigned to env `{name}` is not an integer. Current value is {v}",
category=RuntimeWarning,
stacklevel=2,
)
return default
class PlamoTokenizer(PreTrainedTokenizer): # type: ignore
vocab_files_names = VOCAB_FILES_NAMES
model_input_names = ["input_ids", "attention_mask"]
def __init__(
self,
vocab_file: str,
unk_token: str = "<unk>",
bos_token: str = "<s>",
eos_token: str = "</s>",
pad_token: str = "<pad>",
cls_token: str = "<cls>",
sep_token: str = "<sep>",
mask_token: str = "<mask>",
sp_model_kwargs: Optional[Dict[str, Any]] = None,
clean_up_tokenization_spaces: bool = False,
num_threads: int = -1,
**kwargs: Any,
) -> None:
"""Tokenizer for PLaMo.
Args:
vocab_file (str): Vocabrary file path.
unk_token (str): Unknown token.
bos_token (str): Beginning of sentence token.
eos_token (str): End of sentence token.
pad_token (str): Padding token.
cls_token (str):
Classification token, to extract a summary of an input sequence leveraging self-attention along the
full depth of the model.
sep_token (str): Separation token, to separate context and query in an input sequence.
mask_token (str): Mask token, to use when training a model with masked-language modeling.
sp_model_kwargs (Dict[atr, Any] or None): kwargs for sentencepiece model.
clean_up_tokenization_spaces (bool): Whether or not to clean up the tokenization spaces.
num_threads (int):
Number of threads. This value will be ignored if one of `PLAMO_TOKENIZER_NUM_THREADS` or
`RAYON_NUM_THREADS` is set as an environment variable.
"""
if "add_bos_token" not in kwargs:
kwargs["add_bos_token"] = False
if "add_eos_token" not in kwargs:
kwargs["add_eos_token"] = False
self.sp_model_kwargs = {} if sp_model_kwargs is None else sp_model_kwargs
self.sp_model = spm.SentencePieceProcessor(**self.sp_model_kwargs)
self.sp_model.Init(model_file=vocab_file, num_threads=_get_tokenizer_threads(num_threads))
self.vocab_file = vocab_file
self.add_bos_token = kwargs["add_bos_token"]
self.add_eos_token = kwargs["add_eos_token"]
super().__init__(
vocab_file=vocab_file,
unk_token=unk_token,
bos_token=bos_token,
eos_token=eos_token,
pad_token=pad_token,
cls_token=cls_token,
sep_token=sep_token,
mask_token=mask_token,
sp_model_kwargs=sp_model_kwargs,
clean_up_tokenization_spaces=clean_up_tokenization_spaces,
**kwargs,
)
# the functions below are copied from hf transformers LlamaTokenizer's implementation to fix the behaviour of the tokenizer
# https://github.com/huggingface/transformers/blob/v4.30.2/src/transformers/models/llama/tokenization_llama.py
def __getstate__(self) -> dict[str, Any]:
state = self.__dict__.copy()
state["sp_model"] = None
state["sp_model_proto"] = self.sp_model.serialized_model_proto()
return state
def __setstate__(self, d: dict[str, Any]) -> None:
self.__dict__ = d
self.sp_model = spm.SentencePieceProcessor(**self.sp_model_kwargs)
self.sp_model.LoadFromSerializedProto(self.sp_model_proto)
@property
def vocab_size(self) -> Any:
"""Returns vocab size"""
return self.sp_model.get_piece_size()
def get_vocab(self) -> dict[str, int]:
"""Returns vocab as a dict"""
vocab = {self.convert_ids_to_tokens(i): i for i in range(self.vocab_size)}
vocab.update(self.added_tokens_encoder)
return vocab
def convert_tokens_to_string(self, tokens: List[int]) -> str:
"""Converts a sequence of tokens (string) in a single string."""
current_sub_tokens: List[int] = []
out_string = ""
prev_is_special = False
for i, token in enumerate(tokens):
# make sure that special tokens are not decoded using sentencepiece model
if token in self.all_special_tokens:
if not prev_is_special and i != 0:
out_string += " "
out_string += self.sp_model.decode(current_sub_tokens) + token
prev_is_special = True
current_sub_tokens = []
else:
current_sub_tokens.append(token)
prev_is_special = False
out_string += self.sp_model.decode(current_sub_tokens)
return out_string
def _tokenize(self, text: str) -> Any:
"""Returns a tokenized string."""
return self.sp_model.encode(text, out_type=str)
def _convert_token_to_id(self, token: str) -> Any:
"""Converts a token (str) in an id using the vocab."""
return self.sp_model.piece_to_id(token)
def _convert_id_to_token(self, index: int) -> Any:
"""Converts an index (integer) in a token (str) using the vocab."""
token = self.sp_model.IdToPiece(index)
return token
def build_inputs_with_special_tokens(
self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None
) -> List[int]:
bos_token_id = [self.bos_token_id] if self.add_bos_token else []
eos_token_id = [self.eos_token_id] if self.add_eos_token else []
output = bos_token_id + token_ids_0 + eos_token_id
if token_ids_1 is not None:
output = output + bos_token_id + token_ids_1 + eos_token_id
return output
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]:
"""
Save the vocabulary and special tokens file to a directory.
Args:
save_directory (`str`):
The directory in which to save the vocabulary.
Returns:
`Tuple(str)`: Paths to the files saved.
"""
if not os.path.isdir(save_directory):
logger.error(f"Vocabulary path ({save_directory}) should be a directory")
return ("",)
out_vocab_file = os.path.join(
save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["vocab_file"]
)
if os.path.abspath(self.vocab_file) != os.path.abspath(out_vocab_file) and os.path.isfile(self.vocab_file):
copyfile(self.vocab_file, out_vocab_file)
elif not os.path.isfile(self.vocab_file):
with open(out_vocab_file, "wb") as fi:
content_spiece_model = self.sp_model.serialized_model_proto()
fi.write(content_spiece_model)
return (out_vocab_file,)
|