awacke1 commited on
Commit
331cb9f
·
verified ·
1 Parent(s): f46375b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +114 -157
app.py CHANGED
@@ -7,147 +7,137 @@ import pandas as pd
7
  import streamlit as st
8
  from streamlit_tags import st_tags
9
  from PyPDF2 import PdfReader, PdfWriter
10
- from presidio_helpers import (
11
- analyzer_engine,
12
- get_supported_entities,
13
- analyze,
14
- anonymize,
15
- )
16
-
17
- st.set_page_config(
18
- page_title="Presidio PHI De-identification",
19
- layout="wide",
20
- initial_sidebar_state="expanded",
21
- menu_items={"About": "https://microsoft.github.io/presidio/"},
22
- )
23
 
 
24
  dotenv.load_dotenv()
25
  logger = logging.getLogger("presidio-streamlit")
26
 
27
- # Sidebar
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  st.sidebar.header("PHI De-identification with Presidio")
29
-
30
- model_help_text = "Select Named Entity Recognition (NER) model for PHI detection."
31
  model_list = [
32
  ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
33
  ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
34
  ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
35
  ]
36
-
37
- st_model = st.sidebar.selectbox(
38
- "NER model package",
39
- [model[0] for model in model_list],
40
- index=0,
41
- help=model_help_text,
42
- )
43
-
44
- # Display HuggingFace link for selected model
45
- selected_model_url = next(url for model, url in model_list if model == st_model)
46
- st.sidebar.markdown(f"[View model on HuggingFace]({selected_model_url})")
47
-
48
- # Extract model package
49
  st_model_package = st_model.split("/")[0]
50
- st_model = st_model if st_model_package.lower() not in ("huggingface") else "/".join(st_model.split("/")[1:])
51
-
52
  analyzer_params = (st_model_package, st_model)
53
- st.sidebar.warning("Note: Models might take some time to download on first run.")
54
-
55
- st_operator = st.sidebar.selectbox(
56
- "De-identification approach",
57
- ["replace", "redact", "mask"],
58
- index=0,
59
- help="Select PHI manipulation method.",
60
- )
61
-
62
- st_threshold = st.sidebar.slider(
63
- label="Acceptance threshold",
64
- min_value=0.0,
65
- max_value=1.0,
66
- value=0.35,
67
- )
68
-
69
- st_return_decision_process = st.sidebar.checkbox(
70
- "Add analysis explanations",
71
- value=False,
72
- )
73
-
74
- # Allow and deny lists
75
- with st.sidebar.expander("Allowlists and denylists", expanded=False):
76
  st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
77
  st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
78
 
79
- # PDF processing functions
80
- def get_timestamp_prefix():
81
- central = pytz.timezone("US/Central")
82
- now = datetime.now(central)
83
- return now.strftime("%I%M%p_%d-%m-%y").upper()
84
-
85
- def save_pdf(pdf_input):
86
- """Save uploaded PDF to disk."""
87
- try:
88
- original_name = pdf_input.name
89
- with open(original_name, "wb") as f:
90
- f.write(pdf_input.read())
91
- return original_name
92
- except Exception as e:
93
- st.error(f"Failed to save PDF: {str(e)}")
94
- return None
95
-
96
- def read_pdf(pdf_path):
97
- """Read text from a PDF using PyPDF2."""
98
- try:
99
- reader = PdfReader(pdf_path)
100
- text = ""
101
- for page in reader.pages:
102
- page_text = page.extract_text() or ""
103
- text += page_text + "\n"
104
- return text
105
- except Exception as e:
106
- st.error(f"Failed to read PDF: {str(e)}")
107
- return None
108
-
109
- def create_pdf(text, input_path, output_filename):
110
- """Create a PDF with anonymized text using PyPDF2."""
111
- try:
112
- reader = PdfReader(input_path)
113
- writer = PdfWriter()
114
- for page in reader.pages:
115
- writer.add_page(page)
116
- with open(output_filename, "wb") as f:
117
- writer.write(f)
118
- return output_filename
119
- except Exception as e:
120
- st.error(f"Failed to create PDF: {str(e)}")
121
- return None
122
-
123
  # Main panel
124
  col1, col2 = st.columns(2)
125
-
126
  with col1:
127
  st.subheader("Input")
128
  uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
129
-
130
  if uploaded_file:
131
  try:
132
- # Save PDF to disk
133
  pdf_path = save_pdf(uploaded_file)
134
  if not pdf_path:
135
- raise ValueError("Failed to save PDF")
136
-
137
- # Read PDF
138
  text = read_pdf(pdf_path)
139
  if not text:
140
- raise ValueError("No text extracted from PDF")
141
-
142
- # Initialize analyzer
143
- try:
144
- analyzer = analyzer_engine(*analyzer_params)
145
- except Exception as e:
146
- st.error(f"Failed to load model: {str(e)}")
147
- st.info("Ensure models are downloaded and check network/permissions.")
148
- raise
149
-
150
- # Analyze
151
  st_analyze_results = analyze(
152
  analyzer=analyzer,
153
  text=text,
@@ -158,70 +148,37 @@ with col1:
158
  allow_list=st_allow_list,
159
  deny_list=st_deny_list,
160
  )
161
-
162
- # Process results
163
  phi_types = set(res.entity_type for res in st_analyze_results)
164
  if phi_types:
165
  st.success(f"Removed PHI types: {', '.join(phi_types)}")
166
  else:
167
  st.info("No PHI detected")
168
-
169
- # Anonymize
170
- anonymized_result = anonymize(
171
- text=text,
172
- operator=st_operator,
173
- analyze_results=st_analyze_results,
174
- )
175
-
176
- # Generate output filename with timestamp
177
  timestamp = get_timestamp_prefix()
178
  output_filename = f"{timestamp}_{uploaded_file.name}"
179
-
180
- # Create new PDF
181
  pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
182
  if not pdf_output:
183
- raise ValueError("Failed to generate PDF")
184
-
185
- # Generate base64 download link
186
- try:
187
- with open(output_filename, "rb") as f:
188
- pdf_bytes = f.read()
189
- b64 = base64.b64encode(pdf_bytes).decode()
190
- href = f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>'
191
- st.markdown(href, unsafe_allow_html=True)
192
- except Exception as e:
193
- st.error(f"Error generating download link: {str(e)}")
194
- raise
195
-
196
- # Display findings
197
  with col2:
198
  st.subheader("Findings")
199
  if st_analyze_results:
200
  df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
201
  df["text"] = [text[res.start:res.end] for res in st_analyze_results]
202
  df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
203
- {
204
- "entity_type": "Entity type",
205
- "text": "Text",
206
- "start": "Start",
207
- "end": "End",
208
- "score": "Confidence",
209
- },
210
- axis=1,
211
  )
212
  if st_return_decision_process:
213
- analysis_explanation_df = pd.DataFrame.from_records(
214
- [r.analysis_explanation.to_dict() for r in st_analyze_results]
215
- )
216
  df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
217
  st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
218
  else:
219
  st.text("No findings")
220
-
221
- # Clean up temporary file
222
  if os.path.exists(pdf_path):
223
  os.remove(pdf_path)
224
-
225
  except Exception as e:
226
- st.error(f"An error occurred: {str(e)}")
227
  logger.error(f"Processing error: {str(e)}")
 
7
  import streamlit as st
8
  from streamlit_tags import st_tags
9
  from PyPDF2 import PdfReader, PdfWriter
10
+ from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult
11
+ from presidio_anonymizer import AnonymizerEngine
12
+ from presidio_anonymizer.entities import OperatorConfig
 
 
 
 
 
 
 
 
 
 
13
 
14
+ st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"})
15
  dotenv.load_dotenv()
16
  logger = logging.getLogger("presidio-streamlit")
17
 
18
+ def get_timestamp_prefix() -> str:
19
+ """🕒 Stamps time like a boss with Central flair!"""
20
+ central = pytz.timezone("US/Central")
21
+ return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper()
22
+
23
+ def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple[object, RecognizerRegistry]:
24
+ """🤖 Fires up NLP engines with a spark of genius!"""
25
+ registry = RecognizerRegistry()
26
+ if model_family.lower() == "flair":
27
+ from flair.models import SequenceTagger
28
+ tagger = SequenceTagger.load(model_path)
29
+ registry.load_predefined_recognizers()
30
+ registry.add_recognizer_from_dict({"name": "flair_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], "model": model_path, "package": "flair"})
31
+ return tagger, registry
32
+ elif model_family.lower() == "huggingface":
33
+ from transformers import pipeline
34
+ nlp = pipeline("ner", model=model_path, tokenizer=model_path)
35
+ registry.load_predefined_recognizers()
36
+ registry.add_recognizer_from_dict({"name": "huggingface_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], "model": model_path, "package": "transformers"})
37
+ return nlp, registry
38
+ raise ValueError(f"Model family {model_family} not supported")
39
+
40
+ def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine:
41
+ """🔍 Unleashes the PHI-sniffing bloodhound!"""
42
+ nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
43
+ return AnalyzerEngine(registry=registry)
44
+
45
+ def get_supported_entities(model_family: str, model_path: str) -> list[str]:
46
+ """📋 Lists what secrets we’re hunting—PHI beware!"""
47
+ if model_family.lower() == "huggingface":
48
+ return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
49
+ elif model_family.lower() == "flair":
50
+ return ["PERSON", "LOCATION", "ORGANIZATION"]
51
+ return ["PERSON", "LOCATION", "ORGANIZATION"]
52
+
53
+ # Feature Spotlight: 🕵️‍♂️ The Great PHI Hunt Begins!
54
+ # With a flick of the wrist, we summon models to sniff out sensitive data in PDFs, making privacy a breeze! 😎
55
+
56
+ def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list[RecognizerResult]:
57
+ """🦸 Swoops in to spot PHI with laser precision!"""
58
+ results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process)
59
+ filtered_results = []
60
+ for result in results:
61
+ text_snippet = text[result.start:result.end].lower()
62
+ if any(word.lower() in text_snippet for word in allow_list):
63
+ continue
64
+ if any(word.lower() in text_snippet for word in deny_list) or not deny_list:
65
+ filtered_results.append(result)
66
+ return filtered_results
67
+
68
+ def anonymize(text: str, operator: str, analyze_results: list[RecognizerResult], mask_char: str = "*", number_of_chars: int = 15) -> dict:
69
+ """🕵️‍♀️ Cloaks PHI in a disguise—poof, it’s gone!"""
70
+ anonymizer = AnonymizerEngine()
71
+ operator_config = {"DEFAULT": OperatorConfig(operator, {})}
72
+ if operator == "mask":
73
+ operator_config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars})
74
+ return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=operator_config)
75
+
76
+ def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer:
77
+ """🚨 Builds a naughty list to catch sneaky PHI!"""
78
+ if not deny_list:
79
+ return None
80
+ return PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list)
81
+
82
+ def save_pdf(pdf_input) -> str:
83
+ """💾 Drops PDFs onto disk like hot cakes!"""
84
+ original_name = pdf_input.name
85
+ with open(original_name, "wb") as f:
86
+ f.write(pdf_input.read())
87
+ return original_name
88
+
89
+ # Feature Spotlight: 📄 PDF Magic Unleashed!
90
+ # Upload a PDF, zap the PHI, and grab a shiny new file—all with a timestamp swagger! ✨
91
+
92
+ def read_pdf(pdf_path: str) -> str:
93
+ """📖 Slurps up PDF text like a thirsty camel!"""
94
+ reader = PdfReader(pdf_path)
95
+ return "".join(page.extract_text() or "" + "\n" for page in reader.pages)
96
+
97
+ def create_pdf(text: str, input_path: str, output_filename: str) -> str:
98
+ """🖨️ Crafts a fresh PDF with PHI-proof swagger!"""
99
+ reader = PdfReader(input_path)
100
+ writer = PdfWriter()
101
+ for page in reader.pages:
102
+ writer.add_page(page)
103
+ with open(output_filename, "wb") as f:
104
+ writer.write(f)
105
+ return output_filename
106
+
107
+ # Sidebar setup
108
  st.sidebar.header("PHI De-identification with Presidio")
 
 
109
  model_list = [
110
  ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"),
111
  ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"),
112
  ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"),
113
  ]
114
+ st_model = st.sidebar.selectbox("NER model package", [model[0] for model in model_list], index=0, help="Pick your PHI-hunting hero!")
115
+ st.sidebar.markdown(f"[View model on HuggingFace]({next(url for model, url in model_list if model == st_model)})")
 
 
 
 
 
 
 
 
 
 
 
116
  st_model_package = st_model.split("/")[0]
117
+ st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:])
 
118
  analyzer_params = (st_model_package, st_model)
119
+ st.sidebar.warning("Models may take a sec to wake up!")
120
+ st_operator = st.sidebar.selectbox("De-identification approach", ["replace", "redact", "mask"], index=0, help="Choose how to zap PHI!")
121
+ st_threshold = st.sidebar.slider("Acceptance threshold", 0.0, 1.0, 0.35)
122
+ st_return_decision_process = st.sidebar.checkbox("Add analysis explanations", False)
123
+ with st.sidebar.expander("Allowlists and denylists"):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
  st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.")
125
  st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.")
126
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
  # Main panel
128
  col1, col2 = st.columns(2)
 
129
  with col1:
130
  st.subheader("Input")
131
  uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])
 
132
  if uploaded_file:
133
  try:
 
134
  pdf_path = save_pdf(uploaded_file)
135
  if not pdf_path:
136
+ raise ValueError("PDF save flopped!")
 
 
137
  text = read_pdf(pdf_path)
138
  if not text:
139
+ raise ValueError("No text in that PDF!")
140
+ analyzer = analyzer_engine(*analyzer_params)
 
 
 
 
 
 
 
 
 
141
  st_analyze_results = analyze(
142
  analyzer=analyzer,
143
  text=text,
 
148
  allow_list=st_allow_list,
149
  deny_list=st_deny_list,
150
  )
 
 
151
  phi_types = set(res.entity_type for res in st_analyze_results)
152
  if phi_types:
153
  st.success(f"Removed PHI types: {', '.join(phi_types)}")
154
  else:
155
  st.info("No PHI detected")
156
+ anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results)
 
 
 
 
 
 
 
 
157
  timestamp = get_timestamp_prefix()
158
  output_filename = f"{timestamp}_{uploaded_file.name}"
 
 
159
  pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename)
160
  if not pdf_output:
161
+ raise ValueError("PDF creation tanked!")
162
+ with open(output_filename, "rb") as f:
163
+ pdf_bytes = f.read()
164
+ b64 = base64.b64encode(pdf_bytes).decode()
165
+ st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True)
 
 
 
 
 
 
 
 
 
166
  with col2:
167
  st.subheader("Findings")
168
  if st_analyze_results:
169
  df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results])
170
  df["text"] = [text[res.start:res.end] for res in st_analyze_results]
171
  df_subset = df[["entity_type", "text", "start", "end", "score"]].rename(
172
+ {"entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1
 
 
 
 
 
 
 
173
  )
174
  if st_return_decision_process:
175
+ analysis_explanation_df = pd.DataFrame.from_records([r.analysis_explanation.to_dict() for r in st_analyze_results])
 
 
176
  df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1)
177
  st.dataframe(df_subset.reset_index(drop=True), use_container_width=True)
178
  else:
179
  st.text("No findings")
 
 
180
  if os.path.exists(pdf_path):
181
  os.remove(pdf_path)
 
182
  except Exception as e:
183
+ st.error(f"Oops, something broke: {str(e)}")
184
  logger.error(f"Processing error: {str(e)}")