File size: 9,232 Bytes
35c70df
 
 
 
be8431a
35c70df
 
 
 
77c02fb
331cb9f
 
 
e8c756f
9f00f1c
35c70df
331cb9f
35c70df
 
 
be8431a
b90cc86
331cb9f
 
be8431a
 
 
 
 
b90cc86
 
331cb9f
7124f43
331cb9f
 
 
b90cc86
331cb9f
 
 
 
b90cc86
331cb9f
b90cc86
331cb9f
 
b90cc86
331cb9f
 
 
 
b90cc86
e8c756f
331cb9f
b90cc86
 
331cb9f
b90cc86
 
331cb9f
b90cc86
331cb9f
b90cc86
 
331cb9f
b90cc86
 
 
331cb9f
b90cc86
 
331cb9f
b90cc86
331cb9f
b90cc86
 
331cb9f
 
b90cc86
 
331cb9f
 
b90cc86
e8c756f
b90cc86
 
 
7124f43
 
 
 
331cb9f
b90cc86
 
331cb9f
 
b90cc86
7124f43
 
 
 
331cb9f
 
b90cc86
7124f43
 
 
 
 
 
 
 
331cb9f
b90cc86
35c70df
 
 
545e6f3
35c70df
 
b90cc86
 
35c70df
331cb9f
55290a8
b90cc86
 
 
 
 
 
 
35c70df
b90cc86
35c70df
 
 
b90cc86
35c70df
bbda733
b90cc86
77c02fb
 
cabea79
b90cc86
 
331cb9f
bbda733
 
 
 
 
 
 
 
 
 
 
 
b90cc86
35c70df
b90cc86
331cb9f
cabea79
bbda733
b90cc86
331cb9f
b90cc86
331cb9f
bbda733
 
 
b90cc86
 
bbda733
b90cc86
bbda733
 
b90cc86
bbda733
 
 
b90cc86
bbda733
b90cc86
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import logging
import os
import base64
import datetime
from datetime import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import tempfile
import pytz

st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")

def get_timestamp_prefix_old() -> str:
    """๐Ÿ•’ Stamps time with Central swagger!"""
    central = pytz.timezone("US/Central")
    return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
    
def get_timestamp_prefix() -> str:
    central = pytz.timezone("US/Central")
    return datetime.datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
    
def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple:
    """๐Ÿค– Sparks NLP models with a wink!"""
    registry = RecognizerRegistry()
    registry.load_predefined_recognizers()
    if model_family.lower() == "flair":
        from flair.models import SequenceTagger
        tagger = SequenceTagger.load(model_path)
        logger.info(f"Flair model loaded: {model_path}")
        return tagger, registry
    elif model_family.lower() == "huggingface":
        from transformers import pipeline
        nlp = pipeline("ner", model=model_path, tokenizer=model_path)
        logger.info(f"HuggingFace model loaded: {model_path}")
        return nlp, registry
    raise ValueError(f"Model family {model_family} unsupported")

def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
    """๐Ÿ” Unleashes the PHI-hunting beast!"""
    nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
    return AnalyzerEngine(registry=registry)

def get_supported_entities(model_family: str, model_path: str) -> list[str]:
    """๐Ÿ“‹ Spills the beans on PHI targets!"""
    return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] if model_family.lower() == "huggingface" else ["PERSON", "LOCATION", "ORGANIZATION"]

# Feature Spotlight: ๐Ÿ•ต๏ธโ€โ™‚๏ธ PHI Hunt Kicks Off!
# Models dive into PDFs, sniffing out sensitive bits with ninja vibes! ๐Ÿ˜Ž

def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list:
    """๐Ÿฆธ Zaps PHI with eagle-eye precision!"""
    results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
    filtered = []
    for result in results:
        snippet = text[result.start:result.end].lower()
        if any(word.lower() in snippet for word in allow_list):
            continue
        if any(word.lower() in snippet for word in deny_list) or not deny_list:
            filtered.append(result)
    return filtered

def anonymize(text: str, operator: str, analyze_results: list, mask_char: str = "*", number_of_chars: int = 15) -> dict:
    """๐Ÿ•ต๏ธโ€โ™€๏ธ Hides PHI with a magicianโ€™s flair!"""
    anonymizer = AnonymizerEngine()
    config = {"DEFAULT": OperatorConfig(operator, {})}
    if operator == "mask":
        config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
    return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=config)

def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
    """๐Ÿšจ Sets traps for sneaky PHI rogues!"""
    return None if not deny_list else PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)

def save_pdf(pdf_input) -> str:
    """๐Ÿ’พ Stashes PDFs in a temp vault!"""
    if pdf_input.size > 200 * 1024 * 1024:
        logger.error(f"Upload rejected: {pdf_input.name} exceeds 200MB")
        st.error("PDF exceeds 200MB limit")
        raise ValueError("PDF too big")
    with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir="/tmp") as tmp:
        tmp.write(pdf_input.read())
        logger.info(f"Uploaded PDF to {tmp.name}, size: {pdf_input.size} bytes")
        return tmp.name

# Feature Spotlight: ๐Ÿ“„ PDF Wizardry Unleashed!
# Uploads zip through, PHI vanishes, and out pops a safe PDF with timestamp pizzazz! โœจ

def read_pdf(pdf_path: str) -> str:
    """๐Ÿ“– Gobbles PDF text like candy!"""
    reader = PdfReader(pdf_path)
    text = "".join(page.extract_text() or "" + "\n" for page in reader.pages)
    logger.info(f"Extracted {len(text)} chars from {pdf_path}")
    return text

def create_pdf(text: str, input_path: str, output_filename: str) -> str:
    """๐Ÿ–จ๏ธ Spins a new PDF with PHI-proof charm!"""
    reader = PdfReader(input_path)
    writer = PdfWriter()
    for page in reader.pages:
        writer.add_page(page)
    with open(output_filename, "wb") as f:
        writer.write(f)
    logger.info(f"Created PDF: {output_filename}")
    return output_filename

# Sidebar
st.sidebar.header("PHI De-identification with Presidio")
model_list = [
    ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
    ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
    ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox("NER model", [m[0] for m in model_list], 0)
st.sidebar.markdown(f"[View model]({next(url for m, url in model_list if m == st_model)})")
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Models may snooze briefly!")
st_operator = st.sidebar.selectbox("De-id approach", ["replace", "redact", "mask"], 0)
st_threshold = st.sidebar.slider("Threshold", 0.0, 1.0, 0.35)
st_return_decision_process = st.sidebar.checkbox("Show analysis", False)
with st.sidebar.expander("Allow/Deny lists"):
    st_allow_list = st_tags(label="Allowlist", text="Add word, hit enter")
    st_deny_list = st_tags(label="Denylist", text="Add word, hit enter")

# Main
col1, col2 = st.columns(2)
with col1:
    st.subheader("Input")
    uploaded_file = st.file_uploader("Upload PDF", type=["pdf"], help="Max 200MB")
    if uploaded_file:
        try:
            logger.info(f"Upload: {uploaded_file.name}, size: {uploaded_file.size} bytes")
            pdf_path = save_pdf(uploaded_file)
            text = read_pdf(pdf_path)
            if not text:
                st.error("No text extracted")
                raise ValueError("Empty PDF")
            analyzer = analyzer_engine(*analyzer_params)
            st_analyze_results = analyze(
                analyzer=analyzer,
                text=text,
                entities=get_supported_entities(*analyzer_params),
                language="en",
                score_threshold=st_threshold,
                return_decision_process=st_return_decision_process,
                allow_list=st_allow_list,
                deny_list=st_deny_list,
            )
            phi_types = set(res.entity_type for res in st_analyze_results)
            if phi_types:
                st.success(f"Zapped PHI: {', '.join(phi_types)}")
            else:
                st.info("No PHI found")
            anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
            timestamp = get_timestamp_prefix()
            output_filename = f"{timestamp}_{uploaded_file.name}"
            create_pdf(anonymized_result.text, pdf_path, output_filename)
            with open(output_filename, "rb") as f:
                b64 = base64.b64encode(f.read()).decode()
                st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
            with col2:
                st.subheader("Findings")
                if st_analyze_results:
                    df = pd.DataFrame([r.to_dict() for r in st_analyze_results])
                    df["text"] = [text[r.start:r.end] for r in st_analyze_results]
                    df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
                        {"entity_type": "Type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
                    )
                    if st_return_decision_process:
                        df_subset = pd.concat([df_subset, pd.DataFrame([r.analysis_explanation.to_dict() for r in st_analyze_results])], axis=1)
                    st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
                else:
                    st.text("No findings")
            os.remove(pdf_path)
        except Exception as e:
            st.error(f"Oops: {str(e)}")
            logger.error(f"Error: {str(e)}")