AzureCosmosDBUI / app.py
awacke1's picture
Update app.py
3cf3fce verified
raw
history blame
10.7 kB
import streamlit as st
from azure.cosmos import CosmosClient, PartitionKey, exceptions
import os
import pandas as pd
import traceback
import requests
import shutil
import zipfile
from github import Github
from git import Repo
from datetime import datetime
import base64
import json
st.set_page_config(layout="wide")
# Cosmos DB configuration
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key")
# GitHub configuration
def download_github_repo(url, local_path):
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
def create_zip_file(source_dir, output_filename):
shutil.make_archive(output_filename, 'zip', source_dir)
def create_repo(g, repo_name):
user = g.get_user()
return user.create_repo(repo_name)
def push_to_github(local_path, repo, github_token):
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
if 'origin' in [remote.name for remote in local_repo.remotes]:
origin = local_repo.remote('origin')
origin.set_url(repo_url)
else:
origin = local_repo.create_remote('origin', repo_url)
if not local_repo.heads:
local_repo.git.checkout('-b', 'main')
current_branch = 'main'
else:
current_branch = local_repo.active_branch.name
local_repo.git.add(A=True)
if local_repo.is_dirty():
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
def get_base64_download_link(file_path, file_name):
with open(file_path, "rb") as file:
contents = file.read()
base64_encoded = base64.b64encode(contents).decode()
return f'<a href="data:application/zip;base64,{base64_encoded}" download="{file_name}">Download {file_name}</a>'
# New functions for dynamic sidebar
def get_databases(client):
return [db['id'] for db in client.list_databases()]
def get_containers(database):
return [container['id'] for container in database.list_containers()]
def get_documents(container, limit=1000):
query = "SELECT * FROM c"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items
# Cosmos DB functions
def insert_record(record):
try:
response = container.create_item(body=record)
return True, response
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
except Exception as e:
return False, f"An unexpected error occurred: {str(e)}"
def call_stored_procedure(record):
try:
response = container.scripts.execute_stored_procedure(
sproc="processPrompt",
params=[record],
partition_key=record['id']
)
return True, response
except exceptions.CosmosHttpResponseError as e:
error_message = f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
return False, error_message
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
return False, error_message
def fetch_all_records():
try:
query = "SELECT * FROM c"
items = list(container.query_items(query=query, enable_cross_partition_query=True))
return pd.DataFrame(items)
except exceptions.CosmosHttpResponseError as e:
st.error(f"HTTP error occurred while fetching records: {str(e)}. Status code: {e.status_code}")
return pd.DataFrame()
except Exception as e:
st.error(f"An unexpected error occurred while fetching records: {str(e)}")
return pd.DataFrame()
def update_record(updated_record):
try:
container.upsert_item(body=updated_record)
return True, f"Record with id {updated_record['id']} successfully updated."
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()}"
def delete_record(name, id):
try:
container.delete_item(item=id, partition_key=id)
return True, f"Successfully deleted record with name: {name} and id: {id}"
except exceptions.CosmosResourceNotFoundError:
return False, f"Record with id {id} not found. It may have been already deleted."
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()}"
# New function to archive all databases and containers
def archive_all_data(client):
try:
base_dir = "./cosmos_archive"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)
for database in client.list_databases():
db_name = database['id']
db_dir = os.path.join(base_dir, db_name)
os.makedirs(db_dir)
db_client = client.get_database_client(db_name)
for container in db_client.list_containers():
container_name = container['id']
container_dir = os.path.join(db_dir, container_name)
os.makedirs(container_dir)
container_client = db_client.get_container_client(container_name)
items = list(container_client.read_all_items())
with open(os.path.join(container_dir, f"{container_name}.json"), 'w') as f:
json.dump(items, f, indent=2)
archive_name = f"cosmos_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
shutil.make_archive(archive_name, 'zip', base_dir)
return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip")
except Exception as e:
return f"An error occurred while archiving data: {str(e)}"
# Streamlit app
st.title("🌟 Cosmos DB and GitHub Integration")
# Modify the main app
def main():
st.title("🌟 Cosmos DB and GitHub Integration")
# Initialize session state
if 'logged_in' not in st.session_state:
st.session_state.logged_in = False
if 'selected_records' not in st.session_state:
st.session_state.selected_records = []
if 'client' not in st.session_state:
st.session_state.client = None
if 'selected_database' not in st.session_state:
st.session_state.selected_database = None
if 'selected_container' not in st.session_state:
st.session_state.selected_container = None
# Login section
if not st.session_state.logged_in:
st.subheader("πŸ” Login")
input_key = st.text_input("Enter your Cosmos DB key", type="password")
if st.button("πŸš€ Login"):
if input_key:
st.session_state.primary_key = input_key
st.session_state.logged_in = True
st.rerun()
else:
st.error("Invalid key. Please check your input.")
else:
# Initialize Cosmos DB client
try:
if st.session_state.client is None:
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
# Sidebar for database, container, and document selection
st.sidebar.title("πŸ—„οΈ Cosmos DB Navigator")
databases = get_databases(st.session_state.client)
selected_db = st.sidebar.selectbox("πŸ—ƒοΈ Select Database", databases)
if selected_db != st.session_state.selected_database:
st.session_state.selected_database = selected_db
st.session_state.selected_container = None
st.rerun()
if st.session_state.selected_database:
database = st.session_state.client.get_database_client(st.session_state.selected_database)
containers = get_containers(database)
selected_container = st.sidebar.selectbox("πŸ“ Select Container", containers)
if selected_container != st.session_state.selected_container:
st.session_state.selected_container = selected_container
st.rerun()
if st.session_state.selected_container:
container = database.get_container_client(st.session_state.selected_container)
limit_to_1000 = st.sidebar.checkbox("πŸ”’ Limit to top 1000 documents", value=True)
documents = get_documents(container, limit=1000 if limit_to_1000 else None)
if documents:
document_ids = [doc.get('id', 'Unknown') for doc in documents]
selected_document = st.sidebar.selectbox("πŸ“„ Select Document", document_ids)
if selected_document:
st.subheader(f"πŸ“„ Document Details: {selected_document}")
selected_doc = next((doc for doc in documents if doc.get('id') == selected_document), None)
if selected_doc:
st.json(selected_doc)
else:
st.sidebar.info("No documents found in this container.")
# Main content area
st.subheader(f"πŸ“Š Container: {st.session_state.selected_container}")
if st.session_state.selected_container:
df = pd.DataFrame(documents)
st.dataframe(df)
# ... (keep the rest of your existing code for GitHub operations, etc.)
except exceptions.CosmosHttpResponseError as e:
st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code}")
except Exception as e:
st.error(f"An unexpected error occurred: {str(e)}")
# Logout button
if st.session_state.logged_in and st.sidebar.button("πŸšͺ Logout"):
st.session_state.logged_in = False
st.session_state.selected_records.clear()
st.session_state.client = None
st.session_state.selected_database = None
st.session_state.selected_container = None
st.rerun()
if __name__ == "__main__":
main()