WASP / app.py
Amanpreet Singh
wasp
d24b4dd
raw
history blame
14.3 kB
import streamlit as st
import requests
import tempfile
import shutil
from kaggle.api.kaggle_api_extended import KaggleApi
import os
import time
import json
import subprocess
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError
def setup_kaggle_api():
try:
os.environ["KAGGLE_USERNAME"] = st.secrets["KAGGLE_USERNAME"]
os.environ["KAGGLE_KEY"] = st.secrets["KAGGLE_KEY"]
api = KaggleApi()
api.authenticate()
return api
except KeyError as e:
st.error(f"Missing Kaggle secret: {e}")
st.stop()
except Exception as e:
st.error(f"Kaggle API setup failed: {e}")
st.stop()
def setup_drive_service():
try:
credentials_dict = dict(st.secrets["GOOGLE_SERVICE_ACCOUNT"])
credentials = Credentials(
token=credentials_dict["access_token"],
refresh_token=credentials_dict["refresh_token"],
client_id=credentials_dict["client_id"],
client_secret=credentials_dict["client_secret"],
token_uri=credentials_dict["token_uri"],
scopes=credentials_dict["scopes"]
)
if credentials.expired and credentials.refresh_token:
credentials.refresh(requests.Request())
drive_service = build('drive', 'v3', credentials=credentials)
return drive_service
except KeyError as e:
st.error(f"Missing key in GOOGLE_SERVICE_ACCOUNT: {e}")
st.stop()
except Exception as e:
st.error(f"Google Drive auth failed: {e}")
st.stop()
def upload_to_drive(drive_service, file_path, title, folder_id="1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd"):
try:
file_metadata = {'name': title, 'parents': [folder_id]}
media = MediaFileUpload(file_path, resumable=True)
file = drive_service.files().create(
body=file_metadata, media_body=media, fields='id, name'
).execute()
file_id = file['id']
drive_service.permissions().create(
fileId=file_id,
body={'type': 'anyone', 'role': 'reader'},
fields='id'
).execute()
shareable_link = f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
return file_id, shareable_link
except HttpError as e:
st.error(f"Drive upload failed: {e}")
raise
except Exception as e:
st.error(f"Unexpected error during upload: {e}")
raise
def delete_from_drive(drive_service, file_id):
try:
drive_service.files().delete(fileId=file_id).execute()
except Exception as e:
st.error(f"Failed to delete file {file_id} from Drive: {e}")
def get_bvh_from_folder(drive_service, folder_id="1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd"):
try:
query = f"'{folder_id}' in parents and name contains '.bvh'"
response = drive_service.files().list(q=query, fields="files(id, name, mimeType)", pageSize=1).execute()
files = response.get('files', [])
if files:
bvh_file = files[0]
bvh_id = bvh_file['id']
bvh_url = f"https://drive.google.com/uc?id={bvh_id}"
return bvh_id, bvh_url
return None, None
except Exception as e:
st.error(f"Error checking folder for BVH: {e}")
return None, None
def push_kaggle_kernel(api, temp_dir, notebook_slug):
try:
local_notebook_path = os.path.join(os.path.dirname(__file__), 'video-to-bvh-converter.ipynb')
if not os.path.exists(local_notebook_path):
st.error(f"Notebook file not found")
raise FileNotFoundError
kernel_file = os.path.join(temp_dir, 'kernel.ipynb')
shutil.copy(local_notebook_path, kernel_file)
code_file = "kernel.ipynb"
kernel_type = "notebook"
metadata = {
"id": notebook_slug,
"title": "video-to-bvh-converter",
"code_file": code_file,
"language": "python",
"kernel_type": kernel_type,
"enable_gpu": True,
"enable_internet": True,
"is_private": True,
"competition_sources": [],
"dataset_sources": [],
"kernel_sources": []
}
metadata_file = os.path.join(temp_dir, 'kernel-metadata.json')
with open(metadata_file, 'w') as f:
json.dump(metadata, f)
cmd = f"kaggle kernels push -p {temp_dir}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
st.error(f"Kaggle push failed: {result.stderr}")
raise Exception
kernel_url = f"https://www.kaggle.com/code/{notebook_slug}"
return kernel_url
except Exception as e:
st.error(f"Failed to push kernel: {str(e)}")
raise
def check_kernel_exists(api, notebook_slug):
try:
kernels = api.kernels_list(mine=True, search=notebook_slug)
for kernel in kernels:
if kernel.ref == notebook_slug:
return True
return False
except Exception as e:
st.error(f"Kernel check failed: {e}")
return False
def process_video(api, drive_service, video_file):
video_file_id = None
bvh_file_id = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file:
tmp_file.write(video_file.read())
video_path = tmp_file.name
video_file_id, video_shareable_link = upload_to_drive(drive_service, video_path, "input_video.mp4")
st.success(f"Video uploaded to Drive: {video_shareable_link}")
username = st.secrets['KAGGLE_USERNAME']
notebook_slug = f"{username}/video-to-bvh-converter"
kernel_exists = check_kernel_exists(api, notebook_slug)
if not kernel_exists:
pass
temp_dir = tempfile.mkdtemp()
with st.spinner("Triggering Kaggle notebook..."):
kernel_url = push_kaggle_kernel(api, temp_dir, notebook_slug)
progress_bar = st.progress(0.0)
progress_text = st.empty()
with st.spinner("Waiting for video processing..."):
start_time = time.time()
execution_started = False
retry_count = 0
max_retries = 3
overall_timeout = 1800
while time.time() - start_time < overall_timeout:
try:
status_response = api.kernels_status(notebook_slug)
current_status = status_response.status if hasattr(status_response, 'status') else 'unknown'
if current_status in ['queued', 'running']:
execution_started = True
if current_status == 'queued':
progress_bar.progress(0.2)
progress_text.text("Queued - Waiting for GPU...")
elif current_status == 'running':
progress_bar.progress(0.4)
progress_text.text("Processing video...")
elif current_status == 'complete' and not execution_started:
push_kaggle_kernel(api, temp_dir, notebook_slug)
time.sleep(10)
continue
elif current_status == 'error' and not execution_started:
if retry_count < max_retries:
time.sleep(10)
push_kaggle_kernel(api, temp_dir, notebook_slug)
retry_count += 1
start_time = time.time()
continue
else:
if video_file_id:
delete_from_drive(drive_service, video_file_id)
return None
bvh_file_id, bvh_url = get_bvh_from_folder(drive_service)
if bvh_url:
response = requests.get(bvh_url)
if response.status_code == 200:
progress_bar.progress(1.0)
progress_text.text("Complete!")
if video_file_id:
delete_from_drive(drive_service, video_file_id)
if bvh_file_id:
delete_from_drive(drive_service, bvh_file_id)
st.write("BVH generated")
return {'bvh_data': response.content}
if execution_started and current_status in ['complete', 'error']:
progress_bar.progress(0.8 if current_status == 'complete' else 0.6)
progress_text.text("Finalizing..." if current_status == 'complete' else "Error occurred...")
if current_status == 'error':
if video_file_id:
delete_from_drive(drive_service, video_file_id)
return None
time.sleep(10)
except Exception as e:
st.error(f"Status check failed: {str(e)}")
time.sleep(10)
if video_file_id:
delete_from_drive(drive_service, video_file_id)
return None
except Exception as e:
st.error(f"Processing error: {e}")
if video_file_id:
delete_from_drive(drive_service, video_file_id)
if bvh_file_id:
delete_from_drive(drive_service, bvh_file_id)
return None
finally:
if 'video_path' in locals():
os.unlink(video_path)
if 'temp_dir' in locals() and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def main():
st.set_page_config(
page_title="Motion Capture Studio | Video to BVH Converter",
page_icon="🎬",
layout="wide",
initial_sidebar_state="collapsed"
)
st.markdown("""
<style>
:root {
--bg-color: #1a1a1a;
--card-bg: #252525;
--primary-color: #bb86fc;
--secondary-color: #03dac6;
--error-color: #cf6679;
--text-color: #e0e0e0;
--text-secondary: #a0a0a0;
}
.stApp { background-color: var(--bg-color); }
h1, h2, h3, p, div { color: var(--text-color) !important; }
.card { background-color: var(--card-bg); border-radius: 20px; padding: 2rem; margin: 1rem auto; max-width: 1200px; }
.main-title { font-size: 3.5rem; font-weight: 900; background: linear-gradient(135deg, var(--primary-color), var(--secondary-color)); -webkit-background-clip: text; -webkit-text-fill-color: transparent; text-align: center; }
.subtitle { font-size: 1.3rem; color: var(--text-secondary); text-align: center; }
.section-title { font-size: 1.5rem; font-weight: 700; color: var(--primary-color) !important; }
.stButton > button { background: linear-gradient(135deg, var(--primary-color), #9b59f5); color: #fff !important; border-radius: 12px; padding: 0.8rem 2.5rem; font-weight: 600; font-size: 1.2rem; border: none; width: 100%; }
.stDownloadButton > button { background: linear-gradient(135deg, var(--secondary-color), #02b3a3); color: #fff !important; border-radius: 12px; padding: 0.8rem 2.5rem; font-weight: 600; font-size: 1.2rem; border: none; width: 100%; }
</style>
""", unsafe_allow_html=True)
st.markdown('<h1 class="main-title">Motion Capture Studio</h1>', unsafe_allow_html=True)
st.markdown('<p class="subtitle">Convert videos to BVH with AI</p>', unsafe_allow_html=True)
api = setup_kaggle_api()
drive_service = setup_drive_service()
st.markdown('<div class="card">', unsafe_allow_html=True)
st.markdown('<h3 class="section-title"></h3>', unsafe_allow_html=True)
status_col1, status_col2 = st.columns(2)
with status_col1:
try:
username = st.secrets['KAGGLE_USERNAME']
notebook_slug = f"{username}/video-to-bvh-converter"
kernel_exists = check_kernel_exists(api, notebook_slug)
if kernel_exists:
st.success(f"✅ Kaggle kernel found: {notebook_slug}")
else:
st.error(f"❌ Kaggle kernel not found: {notebook_slug}")
except Exception as e:
st.error(f"❌failed: {e}")
with status_col2:
try:
drive_about = drive_service.about().get(fields="user,storageQuota").execute()
storage_used = int(drive_about.get('storageQuota', {}).get('usage', 0)) / (1024 * 1024)
# st.success(f"✅ Google Drive: {storage_used:.2f} MB used")
except Exception as e:
st.error(f"❌Drive check failed: {e}")
col1, col2 = st.columns(2)
with col1:
st.markdown('<h3 class="section-title">Upload Video</h3>', unsafe_allow_html=True)
uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov'])
if uploaded_file:
st.session_state['uploaded_file'] = uploaded_file
with col2:
st.markdown('<h3 class="section-title">Preview</h3>', unsafe_allow_html=True)
if uploaded_file := st.session_state.get('uploaded_file'):
st.video(uploaded_file)
if st.session_state.get('uploaded_file'):
st.markdown('<h3 class="section-title">Processing Options</h3>', unsafe_allow_html=True)
if st.button("Start Motion Capture"):
result = process_video(api, drive_service, st.session_state['uploaded_file'])
if result and 'bvh_data' in result:
st.success("Motion capture complete!")
st.download_button(
label="Download BVH",
data=result['bvh_data'],
file_name="motion_capture.bvh",
mime="application/octet-stream"
)
else:
st.error("Failed to retrieve BVH file.")
st.markdown('</div>', unsafe_allow_html=True)
if __name__ == "__main__":
main()