Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 9,232 Bytes
35c70df be8431a 35c70df 77c02fb 331cb9f e8c756f 9f00f1c 35c70df 331cb9f 35c70df be8431a b90cc86 331cb9f be8431a b90cc86 331cb9f 7124f43 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 e8c756f 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 331cb9f b90cc86 e8c756f b90cc86 7124f43 331cb9f b90cc86 331cb9f b90cc86 7124f43 331cb9f b90cc86 7124f43 331cb9f b90cc86 35c70df 545e6f3 35c70df b90cc86 35c70df 331cb9f 55290a8 b90cc86 35c70df b90cc86 35c70df b90cc86 35c70df bbda733 b90cc86 77c02fb cabea79 b90cc86 331cb9f bbda733 b90cc86 35c70df b90cc86 331cb9f cabea79 bbda733 b90cc86 331cb9f b90cc86 331cb9f bbda733 b90cc86 bbda733 b90cc86 bbda733 b90cc86 bbda733 b90cc86 bbda733 b90cc86 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import logging
import os
import base64
import datetime
from datetime import datetime
import dotenv
import pandas as pd
import streamlit as st
from streamlit_tags import st_tags
from PyPDF2 import PdfReader, PdfWriter
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import tempfile
import pytz
st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
dotenv.load_dotenv()
logger = logging.getLogger("presidio-streamlit")
def get_timestamp_prefix_old() -> str:
"""๐ Stamps time with Central swagger!"""
central = pytz.timezone("US/Central")
return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
def get_timestamp_prefix() -> str:
central = pytz.timezone("US/Central")
return datetime.datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple:
"""๐ค Sparks NLP models with a wink!"""
registry = RecognizerRegistry()
registry.load_predefined_recognizers()
if model_family.lower() == "flair":
from flair.models import SequenceTagger
tagger = SequenceTagger.load(model_path)
logger.info(f"Flair model loaded: {model_path}")
return tagger, registry
elif model_family.lower() == "huggingface":
from transformers import pipeline
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
logger.info(f"HuggingFace model loaded: {model_path}")
return nlp, registry
raise ValueError(f"Model family {model_family} unsupported")
def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
"""๐ Unleashes the PHI-hunting beast!"""
nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
return AnalyzerEngine(registry=registry)
def get_supported_entities(model_family: str, model_path: str) -> list[str]:
"""๐ Spills the beans on PHI targets!"""
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] if model_family.lower() == "huggingface" else ["PERSON", "LOCATION", "ORGANIZATION"]
# Feature Spotlight: ๐ต๏ธโโ๏ธ PHI Hunt Kicks Off!
# Models dive into PDFs, sniffing out sensitive bits with ninja vibes! ๐
def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list:
"""๐ฆธ Zaps PHI with eagle-eye precision!"""
results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
filtered = []
for result in results:
snippet = text[result.start:result.end].lower()
if any(word.lower() in snippet for word in allow_list):
continue
if any(word.lower() in snippet for word in deny_list) or not deny_list:
filtered.append(result)
return filtered
def anonymize(text: str, operator: str, analyze_results: list, mask_char: str = "*", number_of_chars: int = 15) -> dict:
"""๐ต๏ธโโ๏ธ Hides PHI with a magicianโs flair!"""
anonymizer = AnonymizerEngine()
config = {"DEFAULT": OperatorConfig(operator, {})}
if operator == "mask":
config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=config)
def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
"""๐จ Sets traps for sneaky PHI rogues!"""
return None if not deny_list else PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
def save_pdf(pdf_input) -> str:
"""๐พ Stashes PDFs in a temp vault!"""
if pdf_input.size > 200 * 1024 * 1024:
logger.error(f"Upload rejected: {pdf_input.name} exceeds 200MB")
st.error("PDF exceeds 200MB limit")
raise ValueError("PDF too big")
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir="/tmp") as tmp:
tmp.write(pdf_input.read())
logger.info(f"Uploaded PDF to {tmp.name}, size: {pdf_input.size} bytes")
return tmp.name
# Feature Spotlight: ๐ PDF Wizardry Unleashed!
# Uploads zip through, PHI vanishes, and out pops a safe PDF with timestamp pizzazz! โจ
def read_pdf(pdf_path: str) -> str:
"""๐ Gobbles PDF text like candy!"""
reader = PdfReader(pdf_path)
text = "".join(page.extract_text() or "" + "\n" for page in reader.pages)
logger.info(f"Extracted {len(text)} chars from {pdf_path}")
return text
def create_pdf(text: str, input_path: str, output_filename: str) -> str:
"""๐จ๏ธ Spins a new PDF with PHI-proof charm!"""
reader = PdfReader(input_path)
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(output_filename, "wb") as f:
writer.write(f)
logger.info(f"Created PDF: {output_filename}")
return output_filename
# Sidebar
st.sidebar.header("PHI De-identification with Presidio")
model_list = [
("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
]
st_model = st.sidebar.selectbox("NER model", [m[0] for m in model_list], 0)
st.sidebar.markdown(f"[View model]({next(url for m, url in model_list if m == st_model)})")
st_model_package = st_model.split("/")[0]
st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
analyzer_params = (st_model_package, st_model)
st.sidebar.warning("Models may snooze briefly!")
st_operator = st.sidebar.selectbox("De-id approach", ["replace", "redact", "mask"], 0)
st_threshold = st.sidebar.slider("Threshold", 0.0, 1.0, 0.35)
st_return_decision_process = st.sidebar.checkbox("Show analysis", False)
with st.sidebar.expander("Allow/Deny lists"):
st_allow_list = st_tags(label="Allowlist", text="Add word, hit enter")
st_deny_list = st_tags(label="Denylist", text="Add word, hit enter")
# Main
col1, col2 = st.columns(2)
with col1:
st.subheader("Input")
uploaded_file = st.file_uploader("Upload PDF", type=["pdf"], help="Max 200MB")
if uploaded_file:
try:
logger.info(f"Upload: {uploaded_file.name}, size: {uploaded_file.size} bytes")
pdf_path = save_pdf(uploaded_file)
text = read_pdf(pdf_path)
if not text:
st.error("No text extracted")
raise ValueError("Empty PDF")
analyzer = analyzer_engine(*analyzer_params)
st_analyze_results = analyze(
analyzer=analyzer,
text=text,
entities=get_supported_entities(*analyzer_params),
language="en",
score_threshold=st_threshold,
return_decision_process=st_return_decision_process,
allow_list=st_allow_list,
deny_list=st_deny_list,
)
phi_types = set(res.entity_type for res in st_analyze_results)
if phi_types:
st.success(f"Zapped PHI: {', '.join(phi_types)}")
else:
st.info("No PHI found")
anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
timestamp = get_timestamp_prefix()
output_filename = f"{timestamp}_{uploaded_file.name}"
create_pdf(anonymized_result.text, pdf_path, output_filename)
with open(output_filename, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
with col2:
st.subheader("Findings")
if st_analyze_results:
df = pd.DataFrame([r.to_dict() for r in st_analyze_results])
df["text"] = [text[r.start:r.end] for r in st_analyze_results]
df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
{"entity_type": "Type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
)
if st_return_decision_process:
df_subset = pd.concat([df_subset, pd.DataFrame([r.analysis_explanation.to_dict() for r in st_analyze_results])], axis=1)
st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
else:
st.text("No findings")
os.remove(pdf_path)
except Exception as e:
st.error(f"Oops: {str(e)}")
logger.error(f"Error: {str(e)}") |