Tonic commited on
Commit
03b34b8
·
unverified ·
1 Parent(s): 7f2013e

add response parser

Browse files
Files changed (2) hide show
  1. extractcode.py +30 -0
  2. responseparser.py +266 -0
extractcode.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import urllib.parse
2
+
3
+ def extract_code_from_url(url: str) -> str:
4
+ """Extracts the 'code' parameter from a given URL."""
5
+ parsed_url = urllib.parse.urlparse(url)
6
+ query_params = urllib.parse.parse_qs(parsed_url.query)
7
+ return query_params.get("code", [""])[0] # Return the first code or empty string if not found
8
+
9
+ class CallbackManager:
10
+ def __init__(self, redirect_uri: str, client_secret: str = None):
11
+ client_id = os.getenv("APPID")
12
+ if not client_id:
13
+ raise ValueError("APPID environment variable not set.")
14
+ workspace_id = os.getenv("WORKSPACE_URL")
15
+ if not workspace_id:
16
+ raise ValueError("WORKSPACE_URL environment variable not set.")
17
+ self.api = MeldRxAPI(client_id, client_secret, workspace_id, redirect_uri)
18
+ self.auth_code = None
19
+ self.access_token = None
20
+
21
+ def handle_callback(self, callback_url: str) -> str:
22
+ """Handles the callback URL and extracts the code automatically."""
23
+ self.auth_code = extract_code_from_url(callback_url)
24
+ if not self.auth_code:
25
+ return "No authentication code found in URL."
26
+
27
+ if self.api.authenticate_with_code(self.auth_code):
28
+ self.access_token = self.api.access_token
29
+ return f"Authentication successful! Access Token: {self.access_token[:10]}... (truncated)"
30
+ return "Authentication failed. Please check the authorization code."
responseparser.py ADDED
@@ -0,0 +1,266 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from datetime import datetime
3
+ from typing import List, Dict, Optional
4
+
5
+ class PatientDataExtractor:
6
+ """Class to extract patient data from a FHIR Bundle response and map it to discharge form fields."""
7
+
8
+ def __init__(self, patient_data: str):
9
+ """Initialize with patient data in JSON string format."""
10
+ self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data
11
+ self.patients = self._extract_patients()
12
+ self.current_patient_idx = 0 # Default to first patient
13
+
14
+ def _extract_patients(self) -> List[Dict]:
15
+ """Extract all patient entries from the Bundle."""
16
+ if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
17
+ raise ValueError("Invalid FHIR Bundle format")
18
+ return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"]
19
+
20
+ def set_patient_by_index(self, index: int) -> bool:
21
+ """Set the current patient by index. Returns True if successful."""
22
+ if 0 <= index < len(self.patients):
23
+ self.current_patient_idx = index
24
+ return True
25
+ return False
26
+
27
+ def set_patient_by_id(self, patient_id: str) -> bool:
28
+ """Set the current patient by FHIR Patient ID. Returns True if successful."""
29
+ for i, patient in enumerate(self.patients):
30
+ if patient["id"] == patient_id:
31
+ self.current_patient_idx = i
32
+ return True
33
+ return False
34
+
35
+ def _get_current_patient(self) -> Dict:
36
+ """Get the currently selected patient resource."""
37
+ return self.patients[self.current_patient_idx]
38
+
39
+ def get_first_name(self) -> str:
40
+ """Extract patient's first name."""
41
+ patient = self._get_current_patient()
42
+ names = patient.get("name", [])
43
+ for name in names:
44
+ if name.get("use") == "official" and "given" in name:
45
+ return name["given"][0] # First given name
46
+ return ""
47
+
48
+ def get_last_name(self) -> str:
49
+ """Extract patient's last name."""
50
+ patient = self._get_current_patient()
51
+ names = patient.get("name", [])
52
+ for name in names:
53
+ if name.get("use") == "official" and "family" in name:
54
+ return name["family"]
55
+ return ""
56
+
57
+ def get_middle_initial(self) -> str:
58
+ """Extract patient's middle initial (second given name initial if present)."""
59
+ patient = self._get_current_patient()
60
+ names = patient.get("name", [])
61
+ for name in names:
62
+ if name.get("use") == "official" and "given" in name and len(name["given"]) > 1:
63
+ return name["given"][1][0] # First letter of second given name
64
+ return ""
65
+
66
+ def get_dob(self) -> str:
67
+ """Extract patient's date of birth."""
68
+ patient = self._get_current_patient()
69
+ return patient.get("birthDate", "")
70
+
71
+ def get_age(self) -> str:
72
+ """Calculate patient's age based on birth date."""
73
+ dob = self.get_dob()
74
+ if not dob:
75
+ return ""
76
+ birth_date = datetime.strptime(dob, "%Y-%m-%d")
77
+ today = datetime.now()
78
+ age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
79
+ return str(age)
80
+
81
+ def get_sex(self) -> str:
82
+ """Extract patient's sex (gender)."""
83
+ patient = self._get_current_patient()
84
+ return patient.get("gender", "").capitalize()
85
+
86
+ def get_address(self) -> str:
87
+ """Extract patient's street address."""
88
+ patient = self._get_current_patient()
89
+ addresses = patient.get("address", [])
90
+ return addresses[0]["line"][0] if addresses and "line" in addresses[0] else ""
91
+
92
+ def get_city(self) -> str:
93
+ """Extract patient's city."""
94
+ patient = self._get_current_patient()
95
+ addresses = patient.get("address", [])
96
+ return addresses[0]["city"] if addresses and "city" in addresses[0] else ""
97
+
98
+ def get_state(self) -> str:
99
+ """Extract patient's state."""
100
+ patient = self._get_current_patient()
101
+ addresses = patient.get("address", [])
102
+ return addresses[0]["state"] if addresses and "state" in addresses[0] else ""
103
+
104
+ def get_zip_code(self) -> str:
105
+ """Extract patient's postal code."""
106
+ patient = self._get_current_patient()
107
+ addresses = patient.get("address", [])
108
+ return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
109
+
110
+ def get_patient_dict(self) -> Dict[str, str]:
111
+ """Return a dictionary of patient data mapped to discharge form fields."""
112
+ return {
113
+ "first_name": self.get_first_name(),
114
+ "last_name": self.get_last_name(),
115
+ "middle_initial": self.get_middle_initial(),
116
+ "dob": self.get_dob(),
117
+ "age": self.get_age(),
118
+ "sex": self.get_sex(),
119
+ "address": self.get_address(),
120
+ "city": self.get_city(),
121
+ "state": self.get_state(),
122
+ "zip_code": self.get_zip_code(),
123
+ # Fields not directly available in Patient resource can be left blank or populated separately
124
+ "doctor_first_name": "",
125
+ "doctor_last_name": "",
126
+ "doctor_middle_initial": "",
127
+ "hospital_name": "",
128
+ "doctor_address": "",
129
+ "doctor_city": "",
130
+ "doctor_state": "",
131
+ "doctor_zip": "",
132
+ "admission_date": "",
133
+ "referral_source": "",
134
+ "admission_method": "",
135
+ "discharge_date": "",
136
+ "discharge_reason": "",
137
+ "date_of_death": "",
138
+ "diagnosis": "",
139
+ "procedures": "",
140
+ "medications": "",
141
+ "preparer_name": "",
142
+ "preparer_job_title": ""
143
+ }
144
+
145
+ def get_all_patients(self) -> List[Dict[str, str]]:
146
+ """Return a list of dictionaries for all patients."""
147
+ original_idx = self.current_patient_idx
148
+ all_patients = []
149
+ for i in range(len(self.patients)):
150
+ self.set_patient_by_index(i)
151
+ all_patients.append(self.get_patient_dict())
152
+ self.set_patient_by_index(original_idx) # Restore original selection
153
+ return all_patients
154
+
155
+ def get_patient_ids(self) -> List[str]:
156
+ """Return a list of all patient IDs in the Bundle."""
157
+ return [patient["id"] for patient in self.patients]
158
+
159
+
160
+ # # Example usage with integration into app.py
161
+ # def integrate_with_app(patient_data: str):
162
+ # """Integrate PatientDataExtractor with the Gradio app."""
163
+ # extractor = PatientDataExtractor(patient_data)
164
+
165
+ # # Function to populate form with selected patient's data
166
+ # def populate_form(patient_id: str):
167
+ # if extractor.set_patient_by_id(patient_id):
168
+ # patient_dict = extractor.get_patient_dict()
169
+ # return list(patient_dict.values()) # Return values in order expected by display_form
170
+ # return [""] * 28 # Return empty values if patient not found
171
+
172
+ # # Modify the Gradio app to include patient selection
173
+ # with gr.Blocks() as demo:
174
+ # gr.Markdown("# Patient Discharge Form with MeldRx Integration")
175
+ # with gr.Tab("Authenticate with MeldRx"):
176
+ # gr.Markdown("## SMART on FHIR Authentication")
177
+ # auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False)
178
+ # gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.")
179
+ # auth_code_input = gr.Textbox(label="Authorization Code")
180
+ # auth_submit = gr.Button("Submit Code")
181
+ # auth_result = gr.Textbox(label="Authentication Result")
182
+ # patient_data_button = gr.Button("Fetch Patient Data")
183
+ # patient_data_output = gr.Textbox(label="Patient Data")
184
+ # auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result)
185
+ # patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output)
186
+
187
+ # with gr.Tab("Discharge Form"):
188
+ # gr.Markdown("## Select Patient")
189
+ # patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID")
190
+ # populate_button = gr.Button("Populate Form with Patient Data")
191
+
192
+ # gr.Markdown("## Patient Details")
193
+ # with gr.Row():
194
+ # first_name = gr.Textbox(label="First Name")
195
+ # last_name = gr.Textbox(label="Last Name")
196
+ # middle_initial = gr.Textbox(label="Middle Initial")
197
+ # with gr.Row():
198
+ # dob = gr.Textbox(label="Date of Birth")
199
+ # age = gr.Textbox(label="Age")
200
+ # sex = gr.Textbox(label="Sex")
201
+ # address = gr.Textbox(label="Address")
202
+ # with gr.Row():
203
+ # city = gr.Textbox(label="City")
204
+ # state = gr.Textbox(label="State")
205
+ # zip_code = gr.Textbox(label="Zip Code")
206
+ # gr.Markdown("## Primary Healthcare Professional Details")
207
+ # with gr.Row():
208
+ # doctor_first_name = gr.Textbox(label="Doctor's First Name")
209
+ # doctor_last_name = gr.Textbox(label="Doctor's Last Name")
210
+ # doctor_middle_initial = gr.Textbox(label="Middle Initial")
211
+ # hospital_name = gr.Textbox(label="Hospital/Clinic Name")
212
+ # doctor_address = gr.Textbox(label="Address")
213
+ # with gr.Row():
214
+ # doctor_city = gr.Textbox(label="City")
215
+ # doctor_state = gr.Textbox(label="State")
216
+ # doctor_zip = gr.Textbox(label="Zip Code")
217
+ # gr.Markdown("## Admission and Discharge Details")
218
+ # with gr.Row():
219
+ # admission_date = gr.Textbox(label="Date of Admission")
220
+ # referral_source = gr.Textbox(label="Source of Referral")
221
+ # admission_method = gr.Textbox(label="Method of Admission")
222
+ # with gr.Row():
223
+ # discharge_date = gr.Textbox(label="Date of Discharge")
224
+ # discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason")
225
+ # date_of_death = gr.Textbox(label="Date of Death (if applicable)")
226
+ # gr.Markdown("## Diagnosis & Procedures")
227
+ # diagnosis = gr.Textbox(label="Diagnosis")
228
+ # procedures = gr.Textbox(label="Operation & Procedures")
229
+ # gr.Markdown("## Medication Details")
230
+ # medications = gr.Textbox(label="Medication on Discharge")
231
+ # gr.Markdown("## Prepared By")
232
+ # with gr.Row():
233
+ # preparer_name = gr.Textbox(label="Name")
234
+ # preparer_job_title = gr.Textbox(label="Job Title")
235
+ # submit = gr.Button("Generate Form")
236
+ # output = gr.Markdown()
237
+
238
+ # # Inputs list for populate_form and display_form
239
+ # inputs_list = [
240
+ # first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
241
+ # doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
242
+ # doctor_city, doctor_state, doctor_zip,
243
+ # admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
244
+ # diagnosis, procedures, medications, preparer_name, preparer_job_title
245
+ # ]
246
+
247
+ # # Populate form with patient data when button is clicked
248
+ # populate_button.click(
249
+ # fn=populate_form,
250
+ # inputs=patient_dropdown,
251
+ # outputs=inputs_list
252
+ # )
253
+
254
+ # # Generate the form output
255
+ # submit.click(
256
+ # display_form,
257
+ # inputs=inputs_list,
258
+ # outputs=output
259
+ # )
260
+
261
+ # return demo
262
+
263
+ # # Assuming patient_data is the JSON string from your example
264
+ # # patient_data = <your JSON string here>
265
+ # # demo = integrate_with_app(patient_data)
266
+ # # demo.launch()