awacke1 commited on
Commit
b90cc86
Β·
verified Β·
1 Parent(s): 2d3fa71

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +92 -72
app.py CHANGED
@@ -17,122 +17,145 @@ dotenv.load_dotenv()
17
  logger = logging.getLogger("presidio-streamlit")
18
 
19
  def get_timestamp_prefix() -> str:
20
- """πŸ•’ Stamps time like a boss with Central flair!"""
21
  central = pytz.timezone("US/Central")
22
  return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
23
 
24
- def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple[object, RecognizerRegistry]:
25
- """πŸ€– Fires up NLP engines with a spark of genius!"""
26
  registry = RecognizerRegistry()
27
  if model_family.lower() == "flair":
28
  from flair.models import SequenceTagger
29
  tagger = SequenceTagger.load(model_path)
30
  registry.load_predefined_recognizers()
31
- registry.add_recognizer_from_dict({"name": "flair_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], "model": model_path, "package": "flair"})
 
 
32
  return tagger, registry
33
  elif model_family.lower() == "huggingface":
34
  from transformers import pipeline
35
  nlp = pipeline("ner", model=model_path, tokenizer=model_path)
36
  registry.load_predefined_recognizers()
37
- registry.add_recognizer_from_dict({"name": "huggingface_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], "model": model_path, "package": "transformers"})
 
 
38
  return nlp, registry
39
- raise ValueError(f"Model family {model_family} not supported")
40
 
41
  def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
42
- """πŸ” Unleashes the PHI-sniffing bloodhound!"""
43
  nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
44
  return AnalyzerEngine(registry=registry)
45
 
46
  def get_supported_entities(model_family: str, model_path: str) -> list[str]:
47
- """πŸ“‹ Lists what secrets we’re huntingβ€”PHI beware!"""
48
  return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] if model_family.lower() == "huggingface" else ["PERSON", "LOCATION", "ORGANIZATION"]
49
 
50
- # Feature Spotlight: πŸ•΅οΈβ€β™‚οΈ The Great PHI Hunt Begins!
51
- # Summon models to sniff out sensitive data in PDFs with ninja stealth! 😎
52
 
53
- def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list[RecognizerResult]:
54
- """🦸 Swoops in to spot PHI with laser precision!"""
55
  results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
56
- filtered_results = []
57
  for result in results:
58
- text_snippet = text[result.start:result.end].lower()
59
- if any(word.lower() in text_snippet for word in allow_list):
60
  continue
61
- if any(word.lower() in text_snippet for word in deny_list) or not deny_list:
62
- filtered_results.append(result)
63
- return filtered_results
64
 
65
- def anonymize(text: str, operator: str, analyze_results: list[RecognizerResult], mask_char: str = "*", number_of_chars: int = 15) -> dict:
66
- """πŸ•΅οΈβ€β™€οΈ Cloaks PHI in a disguiseβ€”poof, it’s gone!"""
67
  anonymizer = AnonymizerEngine()
68
- operator_config = {"DEFAULT": OperatorConfig(operator, {})}
69
  if operator == "mask":
70
- operator_config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
71
- return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=operator_config)
72
 
73
  def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
74
- """🚨 Builds a naughty list to catch sneaky PHI!"""
75
- if not deny_list:
76
- return None
77
- return PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
78
 
79
  def save_pdf(pdf_input) -> str:
80
- """πŸ’Ύ Drops PDFs into a cozy temp hideout!"""
81
  if pdf_input.size > 200 * 1024 * 1024:
82
- raise ValueError("PDF exceeds 200MB limit")
83
- with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir="/tmp") as tmp:
84
- tmp.write(pdf_input.read())
85
- return tmp.name
 
 
 
 
 
 
 
 
86
 
87
- # Feature Spotlight: πŸ“„ PDF Magic Unleashed!
88
- # Zap PHI from PDFs and sling back a shiny, safe file with timestamp swagger! ✨
89
 
90
  def read_pdf(pdf_path: str) -> str:
91
- """πŸ“– Slurps up PDF text like a thirsty camel!"""
92
- reader = PdfReader(pdf_path)
93
- return "".join(page.extract_text() or "" + "\n" for page in reader.pages)
 
 
 
 
 
 
94
 
95
  def create_pdf(text: str, input_path: str, output_filename: str) -> str:
96
- """πŸ–¨οΈ Crafts a fresh PDF with PHI-proof swagger!"""
97
- reader = PdfReader(input_path)
98
- writer = PdfWriter()
99
- for page in reader.pages:
100
- writer.add_page(page)
101
- with open(output_filename, "wb") as f:
102
- writer.write(f)
103
- return output_filename
 
 
 
 
 
104
 
105
- # Sidebar setup
106
  st.sidebar.header("PHI De-identification with Presidio")
107
  model_list = [
108
  ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
109
  ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
110
  ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
111
  ]
112
- st_model = st.sidebar.selectbox("NER model package", [m[0] for m in model_list], 0, help="Pick your PHI-hunting hero!")
113
- st.sidebar.markdown(f"[View model on HuggingFace]({next(url for m, url in model_list if m == st_model)})")
114
  st_model_package = st_model.split("/")[0]
115
  st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
116
  analyzer_params = (st_model_package, st_model)
117
- st.sidebar.warning("Models may take a sec to wake up!")
118
- st_operator = st.sidebar.selectbox("De-identification approach", ["replace", "redact", "mask"], 0, help="Choose how to zap PHI!")
119
- st_threshold = st.sidebar.slider("Acceptance threshold", 0.0, 1.0, 0.35)
120
- st_return_decision_process = st.sidebar.checkbox("Add analysis explanations", False)
121
- with st.sidebar.expander("Allowlists and denylists"):
122
- st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
123
- st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
124
 
125
- # Main panel
126
  col1, col2 = st.columns(2)
127
  with col1:
128
  st.subheader("Input")
129
- uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
130
  if uploaded_file:
131
  try:
 
132
  pdf_path = save_pdf(uploaded_file)
133
  text = read_pdf(pdf_path)
134
  if not text:
135
- raise ValueError("No text in that PDF!")
 
136
  analyzer = analyzer_engine(*analyzer_params)
137
  st_analyze_results = analyze(
138
  analyzer=analyzer,
@@ -146,33 +169,30 @@ with col1:
146
  )
147
  phi_types = set(res.entity_type for res in st_analyze_results)
148
  if phi_types:
149
- st.success(f"Removed PHI types: {', '.join(phi_types)}")
150
  else:
151
- st.info("No PHI detected")
152
  anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
153
  timestamp = get_timestamp_prefix()
154
  output_filename = f"{timestamp}_{uploaded_file.name}"
155
- pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
156
  with open(output_filename, "rb") as f:
157
- pdf_bytes = f.read()
158
- b64 = base64.b64encode(pdf_bytes).decode()
159
  st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
160
  with col2:
161
  st.subheader("Findings")
162
  if st_analyze_results:
163
- df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
164
- df["text"] = [text[res.start:res.end] for res in st_analyze_results]
165
  df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
166
- {"entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
167
  )
168
  if st_return_decision_process:
169
- analysis_explanation_df = pd.DataFrame.from_records([r.analysis_explanation.to_dict() for r in st_analyze_results])
170
- df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
171
  st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
172
  else:
173
  st.text("No findings")
174
- if os.path.exists(pdf_path):
175
- os.remove(pdf_path)
176
  except Exception as e:
177
- st.error(f"Oops, something broke: {str(e)}")
178
- logger.error(f"Processing error: {str(e)}")
 
17
  logger = logging.getLogger("presidio-streamlit")
18
 
19
  def get_timestamp_prefix() -> str:
20
+ """πŸ•’ Stamps time with Central swagger!"""
21
  central = pytz.timezone("US/Central")
22
  return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
23
 
24
+ def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple:
25
+ """πŸ€– Sparks NLP models with a wink!"""
26
  registry = RecognizerRegistry()
27
  if model_family.lower() == "flair":
28
  from flair.models import SequenceTagger
29
  tagger = SequenceTagger.load(model_path)
30
  registry.load_predefined_recognizers()
31
+ recognizer = PatternRecognizer(supported_entity="CUSTOM", supported_language="en")
32
+ registry.add_recognizer(recognizer)
33
+ logger.info(f"Flair model loaded: {model_path}")
34
  return tagger, registry
35
  elif model_family.lower() == "huggingface":
36
  from transformers import pipeline
37
  nlp = pipeline("ner", model=model_path, tokenizer=model_path)
38
  registry.load_predefined_recognizers()
39
+ recognizer = PatternRecognizer(supported_entity="CUSTOM", supported_language="en")
40
+ registry.add_recognizer(recognizer)
41
+ logger.info(f"HuggingFace model loaded: {model_path}")
42
  return nlp, registry
43
+ raise ValueError(f"Model family {model_family} unsupported")
44
 
45
  def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
46
+ """πŸ” Unleashes the PHI-hunting beast!"""
47
  nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
48
  return AnalyzerEngine(registry=registry)
49
 
50
  def get_supported_entities(model_family: str, model_path: str) -> list[str]:
51
+ """πŸ“‹ Spills the beans on PHI targets!"""
52
  return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] if model_family.lower() == "huggingface" else ["PERSON", "LOCATION", "ORGANIZATION"]
53
 
54
+ # Feature Spotlight: πŸ•΅οΈβ€β™‚οΈ PHI Hunt Kicks Off!
55
+ # Models dive into PDFs, sniffing out sensitive bits with ninja vibes! 😎
56
 
57
+ def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list:
58
+ """🦸 Zaps PHI with eagle-eye precision!"""
59
  results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
60
+ filtered = []
61
  for result in results:
62
+ snippet = text[result.start:result.end].lower()
63
+ if any(word.lower() in snippet for word in allow_list):
64
  continue
65
+ if any(word.lower() in snippet for word in deny_list) or not deny_list:
66
+ filtered.append(result)
67
+ return filtered
68
 
69
+ def anonymize(text: str, operator: str, analyze_results: list, mask_char: str = "*", number_of_chars: int = 15) -> dict:
70
+ """πŸ•΅οΈβ€β™€οΈ Hides PHI with a magician’s flair!"""
71
  anonymizer = AnonymizerEngine()
72
+ config = {"DEFAULT": OperatorConfig(operator, {})}
73
  if operator == "mask":
74
+ config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
75
+ return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=config)
76
 
77
  def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
78
+ """🚨 Sets traps for sneaky PHI rogues!"""
79
+ return None if not deny_list else PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
 
 
80
 
81
  def save_pdf(pdf_input) -> str:
82
+ """πŸ’Ύ Stashes PDFs in a temp vault!"""
83
  if pdf_input.size > 200 * 1024 * 1024:
84
+ logger.error(f"Upload rejected: {pdf_input.name} exceeds 200MB")
85
+ st.error("PDF exceeds 200MB limit")
86
+ raise ValueError("PDF too big")
87
+ try:
88
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir="/tmp") as tmp:
89
+ tmp.write(pdf_input.read())
90
+ logger.info(f"Uploaded PDF to {tmp.name}, size: {pdf_input.size} bytes")
91
+ return tmp.name
92
+ except Exception as e:
93
+ logger.error(f"Upload failed: {str(e)}")
94
+ st.error(f"Upload error: {str(e)}")
95
+ raise
96
 
97
+ # Feature Spotlight: πŸ“„ PDF Wizardry Unleashed!
98
+ # Uploads zip through, PHI vanishes, and out pops a safe PDF with timestamp pizzazz! ✨
99
 
100
  def read_pdf(pdf_path: str) -> str:
101
+ """πŸ“– Gobbles PDF text like candy!"""
102
+ try:
103
+ reader = PdfReader(pdf_path)
104
+ text = "".join(page.extract_text() or "" + "\n" for page in reader.pages)
105
+ logger.info(f"Extracted {len(text)} chars from {pdf_path}")
106
+ return text
107
+ except Exception as e:
108
+ logger.error(f"Read failed: {str(e)}")
109
+ raise
110
 
111
  def create_pdf(text: str, input_path: str, output_filename: str) -> str:
112
+ """πŸ–¨οΈ Spins a new PDF with PHI-proof charm!"""
113
+ try:
114
+ reader = PdfReader(input_path)
115
+ writer = PdfWriter()
116
+ for page in reader.pages:
117
+ writer.add_page(page)
118
+ with open(output_filename, "wb") as f:
119
+ writer.write(f)
120
+ logger.info(f"Created PDF: {output_filename}")
121
+ return output_filename
122
+ except Exception as e:
123
+ logger.error(f"Create failed: {str(e)}")
124
+ raise
125
 
126
+ # Sidebar
127
  st.sidebar.header("PHI De-identification with Presidio")
128
  model_list = [
129
  ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
130
  ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
131
  ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
132
  ]
133
+ st_model = st.sidebar.selectbox("NER model", [m[0] for m in model_list], 0)
134
+ st.sidebar.markdown(f"[View model]({next(url for m, url in model_list if m == st_model)})")
135
  st_model_package = st_model.split("/")[0]
136
  st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
137
  analyzer_params = (st_model_package, st_model)
138
+ st.sidebar.warning("Models may snooze briefly!")
139
+ st_operator = st.sidebar.selectbox("De-id approach", ["replace", "redact", "mask"], 0)
140
+ st_threshold = st.sidebar.slider("Threshold", 0.0, 1.0, 0.35)
141
+ st_return_decision_process = st.sidebar.checkbox("Show analysis", False)
142
+ with st.sidebar.expander("Allow/Deny lists"):
143
+ st_allow_list = st_tags(label="Allowlist", text="Add word, hit enter")
144
+ st_deny_list = st_tags(label="Denylist", text="Add word, hit enter")
145
 
146
+ # Main
147
  col1, col2 = st.columns(2)
148
  with col1:
149
  st.subheader("Input")
150
+ uploaded_file = st.file_uploader("Upload PDF", type=["pdf"], help="Max 200MB")
151
  if uploaded_file:
152
  try:
153
+ logger.info(f"Upload: {uploaded_file.name}, size: {uploaded_file.size} bytes")
154
  pdf_path = save_pdf(uploaded_file)
155
  text = read_pdf(pdf_path)
156
  if not text:
157
+ st.error("No text extracted")
158
+ raise ValueError("Empty PDF")
159
  analyzer = analyzer_engine(*analyzer_params)
160
  st_analyze_results = analyze(
161
  analyzer=analyzer,
 
169
  )
170
  phi_types = set(res.entity_type for res in st_analyze_results)
171
  if phi_types:
172
+ st.success(f"Zapped PHI: {', '.join(phi_types)}")
173
  else:
174
+ st.info("No PHI found")
175
  anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
176
  timestamp = get_timestamp_prefix()
177
  output_filename = f"{timestamp}_{uploaded_file.name}"
178
+ create_pdf(anonymized_result.text, pdf_path, output_filename)
179
  with open(output_filename, "rb") as f:
180
+ b64 = base64.b64encode(f.read()).decode()
 
181
  st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
182
  with col2:
183
  st.subheader("Findings")
184
  if st_analyze_results:
185
+ df = pd.DataFrame([r.to_dict() for r in st_analyze_results])
186
+ df["text"] = [text[r.start:r.end] for r in st_analyze_results]
187
  df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
188
+ {"entity_type": "Type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
189
  )
190
  if st_return_decision_process:
191
+ df_subset = pd.concat([df_subset, pd.DataFrame([r.analysis_explanation.to_dict() for r in st_analyze_results])], axis=1)
 
192
  st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
193
  else:
194
  st.text("No findings")
195
+ os.remove(pdf_path)
 
196
  except Exception as e:
197
+ st.error(f"Oops: {str(e)}")
198
+ logger.error(f"Error: {str(e)}")