import streamlit as st import requests import tempfile import shutil from kaggle.api.kaggle_api_extended import KaggleApi import os import time import json import subprocess from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.errors import HttpError def setup_kaggle_api(): try: os.environ["KAGGLE_USERNAME"] = st.secrets["KAGGLE_USERNAME"] os.environ["KAGGLE_KEY"] = st.secrets["KAGGLE_KEY"] api = KaggleApi() api.authenticate() return api except KeyError as e: st.error(f"Missing Kaggle secret: {e}") st.stop() except Exception as e: st.error(f"Kaggle API setup failed: {e}") st.stop() def setup_drive_service(): try: credentials_dict = dict(st.secrets["GOOGLE_SERVICE_ACCOUNT"]) credentials = Credentials( token=credentials_dict["access_token"], refresh_token=credentials_dict["refresh_token"], client_id=credentials_dict["client_id"], client_secret=credentials_dict["client_secret"], token_uri=credentials_dict["token_uri"], scopes=credentials_dict["scopes"] ) if credentials.expired and credentials.refresh_token: credentials.refresh(requests.Request()) drive_service = build('drive', 'v3', credentials=credentials) return drive_service except KeyError as e: st.error(f"Missing key in GOOGLE_SERVICE_ACCOUNT: {e}") st.stop() except Exception as e: st.error(f"Google Drive auth failed: {e}") st.stop() def upload_to_drive(drive_service, file_path, title, folder_id="1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd"): try: file_metadata = {'name': title, 'parents': [folder_id]} media = MediaFileUpload(file_path, resumable=True) file = drive_service.files().create( body=file_metadata, media_body=media, fields='id, name' ).execute() file_id = file['id'] drive_service.permissions().create( fileId=file_id, body={'type': 'anyone', 'role': 'reader'}, fields='id' ).execute() shareable_link = f"https://drive.google.com/file/d/{file_id}/view?usp=sharing" return file_id, shareable_link except HttpError as e: st.error(f"Drive upload failed: {e}") raise except Exception as e: st.error(f"Unexpected error during upload: {e}") raise def delete_from_drive(drive_service, file_id): try: drive_service.files().delete(fileId=file_id).execute() except Exception as e: st.error(f"Failed to delete file {file_id} from Drive: {e}") def get_bvh_from_folder(drive_service, folder_id="1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd"): try: query = f"'{folder_id}' in parents and name contains '.bvh'" response = drive_service.files().list(q=query, fields="files(id, name, mimeType)", pageSize=1).execute() files = response.get('files', []) if files: bvh_file = files[0] bvh_id = bvh_file['id'] bvh_url = f"https://drive.google.com/uc?id={bvh_id}" return bvh_id, bvh_url return None, None except Exception as e: st.error(f"Error checking folder for BVH: {e}") return None, None def push_kaggle_kernel(api, temp_dir, notebook_slug): try: local_notebook_path = os.path.join(os.path.dirname(__file__), 'video-to-bvh-converter.ipynb') if not os.path.exists(local_notebook_path): st.error(f"Notebook file not found") raise FileNotFoundError kernel_file = os.path.join(temp_dir, 'kernel.ipynb') shutil.copy(local_notebook_path, kernel_file) code_file = "kernel.ipynb" kernel_type = "notebook" metadata = { "id": notebook_slug, "title": "video-to-bvh-converter", "code_file": code_file, "language": "python", "kernel_type": kernel_type, "enable_gpu": True, "enable_internet": True, "is_private": True, "competition_sources": [], "dataset_sources": [], "kernel_sources": [] } metadata_file = os.path.join(temp_dir, 'kernel-metadata.json') with open(metadata_file, 'w') as f: json.dump(metadata, f) cmd = f"kaggle kernels push -p {temp_dir}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode != 0: st.error(f"Kaggle push failed: {result.stderr}") raise Exception kernel_url = f"https://www.kaggle.com/code/{notebook_slug}" return kernel_url except Exception as e: st.error(f"Failed to push kernel: {str(e)}") raise def check_kernel_exists(api, notebook_slug): try: kernels = api.kernels_list(mine=True, search=notebook_slug) for kernel in kernels: if kernel.ref == notebook_slug: return True return False except Exception as e: st.error(f"Kernel check failed: {e}") return False def process_video(api, drive_service, video_file): video_file_id = None bvh_file_id = None try: with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: tmp_file.write(video_file.read()) video_path = tmp_file.name video_file_id, video_shareable_link = upload_to_drive(drive_service, video_path, "input_video.mp4") st.success(f"Video uploaded to Drive: {video_shareable_link}") username = st.secrets['KAGGLE_USERNAME'] notebook_slug = f"{username}/video-to-bvh-converter" kernel_exists = check_kernel_exists(api, notebook_slug) if not kernel_exists: pass temp_dir = tempfile.mkdtemp() with st.spinner("Triggering Kaggle notebook..."): kernel_url = push_kaggle_kernel(api, temp_dir, notebook_slug) progress_bar = st.progress(0.0) progress_text = st.empty() with st.spinner("Waiting for video processing..."): start_time = time.time() execution_started = False retry_count = 0 max_retries = 3 overall_timeout = 1800 while time.time() - start_time < overall_timeout: try: status_response = api.kernels_status(notebook_slug) current_status = status_response.status if hasattr(status_response, 'status') else 'unknown' if current_status in ['queued', 'running']: execution_started = True if current_status == 'queued': progress_bar.progress(0.2) progress_text.text("Queued - Waiting for GPU...") elif current_status == 'running': progress_bar.progress(0.4) progress_text.text("Processing video...") elif current_status == 'complete' and not execution_started: push_kaggle_kernel(api, temp_dir, notebook_slug) time.sleep(10) continue elif current_status == 'error' and not execution_started: if retry_count < max_retries: time.sleep(10) push_kaggle_kernel(api, temp_dir, notebook_slug) retry_count += 1 start_time = time.time() continue else: if video_file_id: delete_from_drive(drive_service, video_file_id) return None bvh_file_id, bvh_url = get_bvh_from_folder(drive_service) if bvh_url: response = requests.get(bvh_url) if response.status_code == 200: progress_bar.progress(1.0) progress_text.text("Complete!") if video_file_id: delete_from_drive(drive_service, video_file_id) if bvh_file_id: delete_from_drive(drive_service, bvh_file_id) st.write("BVH generated") return {'bvh_data': response.content} if execution_started and current_status in ['complete', 'error']: progress_bar.progress(0.8 if current_status == 'complete' else 0.6) progress_text.text("Finalizing..." if current_status == 'complete' else "Error occurred...") if current_status == 'error': if video_file_id: delete_from_drive(drive_service, video_file_id) return None time.sleep(10) except Exception as e: st.error(f"Status check failed: {str(e)}") time.sleep(10) if video_file_id: delete_from_drive(drive_service, video_file_id) return None except Exception as e: st.error(f"Processing error: {e}") if video_file_id: delete_from_drive(drive_service, video_file_id) if bvh_file_id: delete_from_drive(drive_service, bvh_file_id) return None finally: if 'video_path' in locals(): os.unlink(video_path) if 'temp_dir' in locals() and os.path.exists(temp_dir): shutil.rmtree(temp_dir) def main(): st.set_page_config( page_title="Motion Capture Studio | Video to BVH Converter", page_icon="🎬", layout="wide", initial_sidebar_state="collapsed" ) st.markdown(""" """, unsafe_allow_html=True) st.markdown('

Motion Capture Studio

', unsafe_allow_html=True) st.markdown('

Convert videos to BVH with AI

', unsafe_allow_html=True) api = setup_kaggle_api() drive_service = setup_drive_service() st.markdown('
', unsafe_allow_html=True) st.markdown('

', unsafe_allow_html=True) status_col1, status_col2 = st.columns(2) with status_col1: try: username = st.secrets['KAGGLE_USERNAME'] notebook_slug = f"{username}/video-to-bvh-converter" kernel_exists = check_kernel_exists(api, notebook_slug) if kernel_exists: st.success(f"✅ Kaggle kernel found: {notebook_slug}") else: st.error(f"❌ Kaggle kernel not found: {notebook_slug}") except Exception as e: st.error(f"❌failed: {e}") with status_col2: try: drive_about = drive_service.about().get(fields="user,storageQuota").execute() storage_used = int(drive_about.get('storageQuota', {}).get('usage', 0)) / (1024 * 1024) # st.success(f"✅ Google Drive: {storage_used:.2f} MB used") except Exception as e: st.error(f"❌Drive check failed: {e}") col1, col2 = st.columns(2) with col1: st.markdown('

Upload Video

', unsafe_allow_html=True) uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov']) if uploaded_file: st.session_state['uploaded_file'] = uploaded_file with col2: st.markdown('

Preview

', unsafe_allow_html=True) if uploaded_file := st.session_state.get('uploaded_file'): st.video(uploaded_file) if st.session_state.get('uploaded_file'): st.markdown('

Processing Options

', unsafe_allow_html=True) if st.button("Start Motion Capture"): result = process_video(api, drive_service, st.session_state['uploaded_file']) if result and 'bvh_data' in result: st.success("Motion capture complete!") st.download_button( label="Download BVH", data=result['bvh_data'], file_name="motion_capture.bvh", mime="application/octet-stream" ) else: st.error("Failed to retrieve BVH file.") st.markdown('
', unsafe_allow_html=True) if __name__ == "__main__": main()