Spaces:
Runtime error
Runtime error
langchain-qa-bot
/
docs
/langchain
/libs
/experimental
/langchain_experimental
/data_anonymizer
/presidio.py
from __future__ import annotations | |
import json | |
from pathlib import Path | |
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union | |
import yaml | |
from langchain_experimental.data_anonymizer.base import ( | |
DEFAULT_DEANONYMIZER_MATCHING_STRATEGY, | |
AnonymizerBase, | |
ReversibleAnonymizerBase, | |
) | |
from langchain_experimental.data_anonymizer.deanonymizer_mapping import ( | |
DeanonymizerMapping, | |
MappingDataType, | |
create_anonymizer_mapping, | |
) | |
from langchain_experimental.data_anonymizer.deanonymizer_matching_strategies import ( | |
exact_matching_strategy, | |
) | |
from langchain_experimental.data_anonymizer.faker_presidio_mapping import ( | |
get_pseudoanonymizer_mapping, | |
) | |
if TYPE_CHECKING: | |
from presidio_analyzer import AnalyzerEngine, EntityRecognizer | |
from presidio_analyzer.nlp_engine import NlpEngineProvider | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_anonymizer.entities import ConflictResolutionStrategy, OperatorConfig | |
def _import_analyzer_engine() -> "AnalyzerEngine": | |
try: | |
from presidio_analyzer import AnalyzerEngine | |
except ImportError as e: | |
raise ImportError( | |
"Could not import presidio_analyzer, please install with " | |
"`pip install presidio-analyzer`. You will also need to download a " | |
"spaCy model to use the analyzer, e.g. " | |
"`python -m spacy download en_core_web_lg`." | |
) from e | |
return AnalyzerEngine | |
def _import_nlp_engine_provider() -> "NlpEngineProvider": | |
try: | |
from presidio_analyzer.nlp_engine import NlpEngineProvider | |
except ImportError as e: | |
raise ImportError( | |
"Could not import presidio_analyzer, please install with " | |
"`pip install presidio-analyzer`. You will also need to download a " | |
"spaCy model to use the analyzer, e.g. " | |
"`python -m spacy download en_core_web_lg`." | |
) from e | |
return NlpEngineProvider | |
def _import_anonymizer_engine() -> "AnonymizerEngine": | |
try: | |
from presidio_anonymizer import AnonymizerEngine | |
except ImportError as e: | |
raise ImportError( | |
"Could not import presidio_anonymizer, please install with " | |
"`pip install presidio-anonymizer`." | |
) from e | |
return AnonymizerEngine | |
def _import_operator_config() -> "OperatorConfig": | |
try: | |
from presidio_anonymizer.entities import OperatorConfig | |
except ImportError as e: | |
raise ImportError( | |
"Could not import presidio_anonymizer, please install with " | |
"`pip install presidio-anonymizer`." | |
) from e | |
return OperatorConfig | |
# Configuring Anonymizer for multiple languages | |
# Detailed description and examples can be found here: | |
# langchain/docs/extras/guides/privacy/multi_language_anonymization.ipynb | |
DEFAULT_LANGUAGES_CONFIG = { | |
# You can also use Stanza or transformers library. | |
# See https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/ | |
"nlp_engine_name": "spacy", | |
"models": [ | |
{"lang_code": "en", "model_name": "en_core_web_lg"}, | |
# {"lang_code": "de", "model_name": "de_core_news_md"}, | |
# {"lang_code": "es", "model_name": "es_core_news_md"}, | |
# ... | |
# List of available models: https://spacy.io/usage/models | |
], | |
} | |
class PresidioAnonymizerBase(AnonymizerBase): | |
"""Base Anonymizer using Microsoft Presidio. | |
See more: https://microsoft.github.io/presidio/ | |
""" | |
def __init__( | |
self, | |
analyzed_fields: Optional[List[str]] = None, | |
operators: Optional[Dict[str, OperatorConfig]] = None, | |
languages_config: Optional[Dict] = None, | |
add_default_faker_operators: bool = True, | |
faker_seed: Optional[int] = None, | |
): | |
""" | |
Args: | |
analyzed_fields: List of fields to detect and then anonymize. | |
Defaults to all entities supported by Microsoft Presidio. | |
operators: Operators to use for anonymization. | |
Operators allow for custom anonymization of detected PII. | |
Learn more: | |
https://microsoft.github.io/presidio/tutorial/10_simple_anonymization/ | |
languages_config: Configuration for the NLP engine. | |
First language in the list will be used as the main language | |
in self.anonymize(...) when no language is specified. | |
Learn more: | |
https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/ | |
faker_seed: Seed used to initialize faker. | |
Defaults to None, in which case faker will be seeded randomly | |
and provide random values. | |
""" | |
if languages_config is None: | |
languages_config = DEFAULT_LANGUAGES_CONFIG | |
OperatorConfig = _import_operator_config() | |
AnalyzerEngine = _import_analyzer_engine() | |
NlpEngineProvider = _import_nlp_engine_provider() | |
AnonymizerEngine = _import_anonymizer_engine() | |
self.analyzed_fields = ( | |
analyzed_fields | |
if analyzed_fields is not None | |
else list(get_pseudoanonymizer_mapping().keys()) | |
) | |
if add_default_faker_operators: | |
self.operators = { | |
field: OperatorConfig( | |
operator_name="custom", params={"lambda": faker_function} | |
) | |
for field, faker_function in get_pseudoanonymizer_mapping( | |
faker_seed | |
).items() | |
} | |
else: | |
self.operators = {} | |
if operators: | |
self.add_operators(operators) | |
provider = NlpEngineProvider(nlp_configuration=languages_config) | |
nlp_engine = provider.create_engine() | |
self.supported_languages = list(nlp_engine.nlp.keys()) | |
self._analyzer = AnalyzerEngine( | |
supported_languages=self.supported_languages, nlp_engine=nlp_engine | |
) | |
self._anonymizer = AnonymizerEngine() | |
def add_recognizer(self, recognizer: EntityRecognizer) -> None: | |
"""Add a recognizer to the analyzer | |
Args: | |
recognizer: Recognizer to add to the analyzer. | |
""" | |
self._analyzer.registry.add_recognizer(recognizer) | |
self.analyzed_fields.extend(recognizer.supported_entities) | |
def add_operators(self, operators: Dict[str, OperatorConfig]) -> None: | |
"""Add operators to the anonymizer | |
Args: | |
operators: Operators to add to the anonymizer. | |
""" | |
self.operators.update(operators) | |
class PresidioAnonymizer(PresidioAnonymizerBase): | |
"""Anonymizer using Microsoft Presidio.""" | |
def _anonymize( | |
self, | |
text: str, | |
language: Optional[str] = None, | |
allow_list: Optional[List[str]] = None, | |
conflict_resolution: Optional[ConflictResolutionStrategy] = None, | |
) -> str: | |
"""Anonymize text. | |
Each PII entity is replaced with a fake value. | |
Each time fake values will be different, as they are generated randomly. | |
PresidioAnonymizer has no built-in memory - | |
so it will not remember the effects of anonymizing previous texts. | |
>>> anonymizer = PresidioAnonymizer() | |
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") | |
'My name is Noah Rhodes. Hi Noah Rhodes!' | |
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") | |
'My name is Brett Russell. Hi Brett Russell!' | |
Args: | |
text: text to anonymize | |
language: language to use for analysis of PII | |
If None, the first (main) language in the list | |
of languages specified in the configuration will be used. | |
""" | |
if language is None: | |
language = self.supported_languages[0] | |
elif language not in self.supported_languages: | |
raise ValueError( | |
f"Language '{language}' is not supported. " | |
f"Supported languages are: {self.supported_languages}. " | |
"Change your language configuration file to add more languages." | |
) | |
# Check supported entities for given language | |
# e.g. IT_FISCAL_CODE is not supported for English in Presidio by default | |
# If you want to use it, you need to add a recognizer manually | |
supported_entities = [] | |
for recognizer in self._analyzer.get_recognizers(language): | |
recognizer_dict = recognizer.to_dict() | |
supported_entities.extend( | |
[recognizer_dict["supported_entity"]] | |
if "supported_entity" in recognizer_dict | |
else recognizer_dict["supported_entities"] | |
) | |
entities_to_analyze = list( | |
set(supported_entities).intersection(set(self.analyzed_fields)) | |
) | |
analyzer_results = self._analyzer.analyze( | |
text, | |
entities=entities_to_analyze, | |
language=language, | |
allow_list=allow_list, | |
) | |
filtered_analyzer_results = ( | |
self._anonymizer._remove_conflicts_and_get_text_manipulation_data( | |
analyzer_results, conflict_resolution | |
) | |
) | |
anonymizer_results = self._anonymizer.anonymize( | |
text, | |
analyzer_results=analyzer_results, | |
operators=self.operators, | |
) | |
anonymizer_mapping = create_anonymizer_mapping( | |
text, | |
filtered_analyzer_results, | |
anonymizer_results, | |
) | |
return exact_matching_strategy(text, anonymizer_mapping) | |
class PresidioReversibleAnonymizer(PresidioAnonymizerBase, ReversibleAnonymizerBase): | |
"""Reversible Anonymizer using Microsoft Presidio.""" | |
def __init__( | |
self, | |
analyzed_fields: Optional[List[str]] = None, | |
operators: Optional[Dict[str, OperatorConfig]] = None, | |
languages_config: Optional[Dict] = None, | |
add_default_faker_operators: bool = True, | |
faker_seed: Optional[int] = None, | |
): | |
if languages_config is None: | |
languages_config = DEFAULT_LANGUAGES_CONFIG | |
super().__init__( | |
analyzed_fields, | |
operators, | |
languages_config, | |
add_default_faker_operators, | |
faker_seed, | |
) | |
self._deanonymizer_mapping = DeanonymizerMapping() | |
def deanonymizer_mapping(self) -> MappingDataType: | |
"""Return the deanonymizer mapping""" | |
return self._deanonymizer_mapping.data | |
def anonymizer_mapping(self) -> MappingDataType: | |
"""Return the anonymizer mapping | |
This is just the reverse version of the deanonymizer mapping.""" | |
return { | |
key: {v: k for k, v in inner_dict.items()} | |
for key, inner_dict in self.deanonymizer_mapping.items() | |
} | |
def _anonymize( | |
self, | |
text: str, | |
language: Optional[str] = None, | |
allow_list: Optional[List[str]] = None, | |
conflict_resolution: Optional[ConflictResolutionStrategy] = None, | |
) -> str: | |
"""Anonymize text. | |
Each PII entity is replaced with a fake value. | |
Each time fake values will be different, as they are generated randomly. | |
At the same time, we will create a mapping from each anonymized entity | |
back to its original text value. | |
Thanks to the built-in memory, all previously anonymised entities | |
will be remembered and replaced by the same fake values: | |
>>> anonymizer = PresidioReversibleAnonymizer() | |
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") | |
'My name is Noah Rhodes. Hi Noah Rhodes!' | |
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!") | |
'My name is Noah Rhodes. Hi Noah Rhodes!' | |
Args: | |
text: text to anonymize | |
language: language to use for analysis of PII | |
If None, the first (main) language in the list | |
of languages specified in the configuration will be used. | |
""" | |
if language is None: | |
language = self.supported_languages[0] | |
if language not in self.supported_languages: | |
raise ValueError( | |
f"Language '{language}' is not supported. " | |
f"Supported languages are: {self.supported_languages}. " | |
"Change your language configuration file to add more languages." | |
) | |
# Check supported entities for given language | |
# e.g. IT_FISCAL_CODE is not supported for English in Presidio by default | |
# If you want to use it, you need to add a recognizer manually | |
supported_entities = [] | |
for recognizer in self._analyzer.get_recognizers(language): | |
recognizer_dict = recognizer.to_dict() | |
supported_entities.extend( | |
[recognizer_dict["supported_entity"]] | |
if "supported_entity" in recognizer_dict | |
else recognizer_dict["supported_entities"] | |
) | |
entities_to_analyze = list( | |
set(supported_entities).intersection(set(self.analyzed_fields)) | |
) | |
analyzer_results = self._analyzer.analyze( | |
text, | |
entities=entities_to_analyze, | |
language=language, | |
allow_list=allow_list, | |
) | |
filtered_analyzer_results = ( | |
self._anonymizer._remove_conflicts_and_get_text_manipulation_data( | |
analyzer_results, conflict_resolution | |
) | |
) | |
anonymizer_results = self._anonymizer.anonymize( | |
text, | |
analyzer_results=analyzer_results, | |
operators=self.operators, | |
) | |
new_deanonymizer_mapping = create_anonymizer_mapping( | |
text, | |
filtered_analyzer_results, | |
anonymizer_results, | |
is_reversed=True, | |
) | |
self._deanonymizer_mapping.update(new_deanonymizer_mapping) | |
return exact_matching_strategy(text, self.anonymizer_mapping) | |
def _deanonymize( | |
self, | |
text_to_deanonymize: str, | |
deanonymizer_matching_strategy: Callable[ | |
[str, MappingDataType], str | |
] = DEFAULT_DEANONYMIZER_MATCHING_STRATEGY, | |
) -> str: | |
"""Deanonymize text. | |
Each anonymized entity is replaced with its original value. | |
This method exploits the mapping created during the anonymization process. | |
Args: | |
text_to_deanonymize: text to deanonymize | |
deanonymizer_matching_strategy: function to use to match | |
anonymized entities with their original values and replace them. | |
""" | |
if not self._deanonymizer_mapping: | |
raise ValueError( | |
"Deanonymizer mapping is empty.", | |
"Please call anonymize() and anonymize some text first.", | |
) | |
text_to_deanonymize = deanonymizer_matching_strategy( | |
text_to_deanonymize, self.deanonymizer_mapping | |
) | |
return text_to_deanonymize | |
def reset_deanonymizer_mapping(self) -> None: | |
"""Reset the deanonymizer mapping""" | |
self._deanonymizer_mapping = DeanonymizerMapping() | |
def save_deanonymizer_mapping(self, file_path: Union[Path, str]) -> None: | |
"""Save the deanonymizer mapping to a JSON or YAML file. | |
Args: | |
file_path: Path to file to save the mapping to. | |
Example: | |
.. code-block:: python | |
anonymizer.save_deanonymizer_mapping(file_path="path/mapping.json") | |
""" | |
save_path = Path(file_path) | |
if save_path.suffix not in [".json", ".yaml"]: | |
raise ValueError(f"{save_path} must have an extension of .json or .yaml") | |
# Make sure parent directories exist | |
save_path.parent.mkdir(parents=True, exist_ok=True) | |
if save_path.suffix == ".json": | |
with open(save_path, "w") as f: | |
json.dump(self.deanonymizer_mapping, f, indent=2) | |
elif save_path.suffix.endswith((".yaml", ".yml")): | |
with open(save_path, "w") as f: | |
yaml.dump(self.deanonymizer_mapping, f, default_flow_style=False) | |
def load_deanonymizer_mapping(self, file_path: Union[Path, str]) -> None: | |
"""Load the deanonymizer mapping from a JSON or YAML file. | |
Args: | |
file_path: Path to file to load the mapping from. | |
Example: | |
.. code-block:: python | |
anonymizer.load_deanonymizer_mapping(file_path="path/mapping.json") | |
""" | |
load_path = Path(file_path) | |
if load_path.suffix not in [".json", ".yaml"]: | |
raise ValueError(f"{load_path} must have an extension of .json or .yaml") | |
if load_path.suffix == ".json": | |
with open(load_path, "r") as f: | |
loaded_mapping = json.load(f) | |
elif load_path.suffix.endswith((".yaml", ".yml")): | |
with open(load_path, "r") as f: | |
loaded_mapping = yaml.load(f, Loader=yaml.FullLoader) | |
self._deanonymizer_mapping.update(loaded_mapping) | |