fhir-r5 / src /app.py
notsahil's picture
FHIR Demo
c808da1
raw
history blame
14.5 kB
import streamlit as st
import requests
import json
import pandas as pd
import uuid
import os
# FHIR Server Base URL from environment variable
FHIR_BASE_URL = os.getenv("FHIR_BASE_URL")
st.title("FHIR API Explorer")
st.write("Explore the HAPI FHIR Server API")
st.sidebar.title("Navigation")
page = st.sidebar.radio("Select a page", ["Server Info", "Patient Search", "Resource Explorer", "CRUD Operations"])
if page == "Server Info":
st.header("Server Capabilities")
if st.button("Get Server Metadata"):
try:
response = requests.get(f"{FHIR_BASE_URL}/metadata")
if response.status_code == 200:
data = response.json()
st.json(data)
else:
st.error(f"Error: {response.status_code}")
except Exception as e:
st.error(f"Error: {str(e)}")
elif page == "Patient Search":
st.header("Patient Search")
col1, col2 = st.columns(2)
with col1:
family_name = st.text_input("Family Name")
with col2:
identifier = st.text_input("Identifier")
if st.button("Search Patients"):
params = {}
if family_name:
params["family"] = family_name
if identifier:
params["identifier"] = identifier
try:
response = requests.get(f"{FHIR_BASE_URL}/Patient", params=params)
if response.status_code == 200:
data = response.json()
if "entry" in data and len(data["entry"]) > 0:
patients = []
for entry in data["entry"]:
resource = entry["resource"]
patient = {
"id": resource.get("id", ""),
"name": ", ".join([name.get("family", "") for name in resource.get("name", [])]),
"gender": resource.get("gender", ""),
"birthDate": resource.get("birthDate", "")
}
patients.append(patient)
st.dataframe(pd.DataFrame(patients))
with st.expander("Raw Response"):
st.json(data)
else:
st.info("No patients found")
else:
st.error(f"Error: {response.status_code}")
except Exception as e:
st.error(f"Error: {str(e)}")
elif page == "Resource Explorer":
st.header("Resource Explorer")
resource_types = ["Patient", "Observation", "Medication", "Immunization", "Condition", "Procedure"]
resource_type = st.selectbox("Select Resource Type", resource_types)
resource_id = st.text_input("Resource ID (optional)")
if st.button("Fetch Resources"):
try:
if resource_id:
url = f"{FHIR_BASE_URL}/{resource_type}/{resource_id}"
else:
url = f"{FHIR_BASE_URL}/{resource_type}?_count=10"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if "resourceType" in data and data["resourceType"] != "Bundle":
st.write(f"Resource Type: {data['resourceType']}")
st.write(f"ID: {data.get('id', 'N/A')}")
with st.expander("View Full Resource"):
st.json(data)
else:
if "entry" in data and len(data["entry"]) > 0:
st.write(f"Found {len(data['entry'])} resources")
resources = []
for entry in data["entry"]:
resource = entry["resource"]
resources.append({
"id": resource.get("id", ""),
"resourceType": resource.get("resourceType", ""),
"lastUpdated": resource.get("meta", {}).get("lastUpdated", "")
})
st.dataframe(pd.DataFrame(resources))
selected_id = st.selectbox("Select a resource to view details",
[r["id"] for r in resources])
if selected_id:
for entry in data["entry"]:
if entry["resource"].get("id") == selected_id:
st.json(entry["resource"])
else:
st.info("No resources found")
else:
st.error(f"Error: {response.status_code}")
except Exception as e:
st.error(f"Error: {str(e)}")
elif page == "CRUD Operations":
st.header("CRUD Operations")
resource_types = ["Patient", "Observation", "Medication", "Immunization", "Condition", "Procedure"]
resource_type = st.selectbox("Select Resource Type", resource_types)
crud_tab = st.tabs(["Create", "Read", "Update", "Delete"])
# CREATE
with crud_tab[0]:
st.subheader(f"Create New {resource_type}")
if resource_type == "Patient":
with st.form("create_patient_form"):
family_name = st.text_input("Family Name")
given_name = st.text_input("Given Name")
gender = st.selectbox("Gender", ["male", "female", "other", "unknown"])
birth_date = st.date_input("Birth Date")
submit_button = st.form_submit_button("Create Patient")
if submit_button:
patient_data = {
"resourceType": "Patient",
"name": [
{
"family": family_name,
"given": [given_name]
}
],
"gender": gender,
"birthDate": str(birth_date)
}
try:
headers = {"Content-Type": "application/fhir+json"}
response = requests.post(
f"{FHIR_BASE_URL}/Patient",
json=patient_data,
headers=headers
)
if response.status_code in [200, 201]:
st.success("Patient created successfully!")
st.json(response.json())
else:
st.error(f"Error: {response.status_code}")
st.write(response.text)
except Exception as e:
st.error(f"Error: {str(e)}")
elif resource_type == "Observation":
with st.form("create_observation_form"):
patient_id = st.text_input("Patient ID")
code_display = st.text_input("Observation Name", "Heart rate")
code_system = st.text_input("Code System", "http://loinc.org")
code_code = st.text_input("Code", "8867-4")
value = st.number_input("Value", value=80)
unit = st.text_input("Unit", "beats/minute")
submit_button = st.form_submit_button("Create Observation")
if submit_button:
observation_data = {
"resourceType": "Observation",
"status": "final",
"code": {
"coding": [
{
"system": code_system,
"code": code_code,
"display": code_display
}
]
},
"subject": {
"reference": f"Patient/{patient_id}"
},
"valueQuantity": {
"value": value,
"unit": unit,
"system": "http://unitsofmeasure.org"
}
}
try:
headers = {"Content-Type": "application/fhir+json"}
response = requests.post(
f"{FHIR_BASE_URL}/Observation",
json=observation_data,
headers=headers
)
if response.status_code in [200, 201]:
st.success("Observation created successfully!")
st.json(response.json())
else:
st.error(f"Error: {response.status_code}")
st.write(response.text)
except Exception as e:
st.error(f"Error: {str(e)}")
else:
st.write(f"Create a new {resource_type} resource:")
template_json = {
"resourceType": resource_type,
}
json_str = st.text_area("Edit JSON", json.dumps(template_json, indent=2), height=300)
if st.button("Create Resource"):
try:
resource_data = json.loads(json_str)
headers = {"Content-Type": "application/fhir+json"}
response = requests.post(
f"{FHIR_BASE_URL}/{resource_type}",
json=resource_data,
headers=headers
)
if response.status_code in [200, 201]:
st.success(f"{resource_type} created successfully!")
st.json(response.json())
else:
st.error(f"Error: {response.status_code}")
st.write(response.text)
except json.JSONDecodeError:
st.error("Invalid JSON format")
except Exception as e:
st.error(f"Error: {str(e)}")
# READ
with crud_tab[1]:
st.subheader(f"Read {resource_type}")
resource_id = st.text_input(f"{resource_type} ID")
if st.button("Read Resource"):
if not resource_id:
st.warning("Please enter a resource ID")
else:
try:
response = requests.get(f"{FHIR_BASE_URL}/{resource_type}/{resource_id}")
if response.status_code == 200:
st.success(f"{resource_type} retrieved successfully!")
st.json(response.json())
else:
st.error(f"Error: {response.status_code}")
st.write(response.text)
except Exception as e:
st.error(f"Error: {str(e)}")
# UPDATE
with crud_tab[2]:
st.subheader(f"Update {resource_type}")
update_id = st.text_input(f"{resource_type} ID to update")
if update_id:
try:
response = requests.get(f"{FHIR_BASE_URL}/{resource_type}/{update_id}")
if response.status_code == 200:
resource_data = response.json()
json_str = st.text_area("Edit JSON", json.dumps(resource_data, indent=2), height=300)
if st.button("Update Resource"):
try:
updated_data = json.loads(json_str)
headers = {"Content-Type": "application/fhir+json"}
update_response = requests.put(
f"{FHIR_BASE_URL}/{resource_type}/{update_id}",
json=updated_data,
headers=headers
)
if update_response.status_code in [200, 201]:
st.success(f"{resource_type} updated successfully!")
st.json(update_response.json())
else:
st.error(f"Error: {update_response.status_code}")
st.write(update_response.text)
except json.JSONDecodeError:
st.error("Invalid JSON format")
except Exception as e:
st.error(f"Error: {str(e)}")
else:
st.error(f"Error fetching resource: {response.status_code}")
except Exception as e:
st.error(f"Error: {str(e)}")
else:
st.info(f"Enter a {resource_type} ID to update")
# DELETE
with crud_tab[3]:
st.subheader(f"Delete {resource_type}")
delete_id = st.text_input(f"{resource_type} ID to delete")
if delete_id:
st.warning(f"Are you sure you want to delete {resource_type}/{delete_id}?")
if st.button("Confirm Delete"):
try:
response = requests.delete(f"{FHIR_BASE_URL}/{resource_type}/{delete_id}")
if response.status_code in [200, 204]:
st.success(f"{resource_type} deleted successfully!")
else:
st.error(f"Error: {response.status_code}")
st.write(response.text)
except Exception as e:
st.error(f"Error: {str(e)}")
else:
st.info(f"Enter a {resource_type} ID to delete")
# Footer
st.markdown("---")
st.markdown("FHIR API Explorer - Using HAPI FHIR Server")