import streamlit as st import requests import tempfile import shutil from kaggle.api.kaggle_api_extended import KaggleApi import os import time import json import subprocess from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from googleapiclient.errors import HttpError def setup_kaggle_api(): try: os.environ["KAGGLE_USERNAME"] = os.environ.get("KAGGLE_USERNAME") os.environ["KAGGLE_KEY"] = os.environ.get("KAGGLE_KEY") api = KaggleApi() api.authenticate() return api except KeyError as e: st.error(f"Missing environment variable: {e}") st.stop() except Exception as e: st.error(f" API setup failed: {e}") st.stop() def setup_drive_service(): try: credentials_json = os.environ.get("GOOGLE_SERVICE_ACCOUNT") credentials_dict = json.loads(credentials_json) credentials = Credentials( token=credentials_dict["access_token"], refresh_token=credentials_dict["refresh_token"], client_id=credentials_dict["client_id"], client_secret=credentials_dict["client_secret"], token_uri=credentials_dict["token_uri"], scopes=credentials_dict["scopes"] ) if credentials.expired and credentials.refresh_token: credentials.refresh(requests.Request()) drive_service = build('drive', 'v3', credentials=credentials) return drive_service except KeyError as e: st.error(f"Missing GOOGLE_SERVICE_ACCOUNT environment variable: {e}") st.stop() except json.JSONDecodeError as e: st.error(f"Invalid JSON in GOOGLE_SERVICE_ACCOUNT: {e}") st.stop() except Exception as e: st.error(f"Google Drive auth failed: {e}") st.stop() def upload_to_drive(drive_service, file_path, title, folder_id=None): try: folder_id = os.environ.get("DRIVE_FOLDER_ID", "1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd") file_metadata = {'name': title, 'parents': [folder_id]} media = MediaFileUpload(file_path, resumable=True) file = drive_service.files().create( body=file_metadata, media_body=media, fields='id, name' ).execute() file_id = file['id'] drive_service.permissions().create( fileId=file_id, body={'type': 'anyone', 'role': 'reader'}, fields='id' ).execute() shareable_link = f"https://drive.google.com/file/d/{file_id}/view?usp=sharing" return file_id, shareable_link except HttpError as e: st.error(f"Drive upload failed: {e}") raise except Exception as e: st.error(f"Unexpected error during upload: {e}") raise def delete_from_drive(drive_service, file_id): try: drive_service.files().delete(fileId=file_id).execute() except Exception as e: st.error(f"Failed to delete file {file_id} from Drive: {e}") def get_bvh_from_folder(drive_service, folder_id=None): try: folder_id = os.environ.get("DRIVE_FOLDER_ID", "1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd") query = f"'{folder_id}' in parents and name contains '.bvh'" response = drive_service.files().list(q=query, fields="files(id, name, mimeType)", pageSize=1).execute() files = response.get('files', []) if files: bvh_file = files[0] bvh_id = bvh_file['id'] bvh_url = f"https://drive.google.com/uc?id={bvh_id}" return bvh_id, bvh_url, bvh_file['name'] return None, None, None except Exception as e: st.error(f"Error checking folder for BVH: {e}") return None, None, None def download_notebook_from_drive(drive_service, temp_dir): try: notebook_file_id = os.environ.get("NOTEBOOK_FILE_ID") if not notebook_file_id: st.error("NOTEBOOK_FILE_ID environment variable not set") raise KeyError("NOTEBOOK_FILE_ID not set") request = drive_service.files().get_media(fileId=notebook_file_id) notebook_path = os.path.join(temp_dir, 'video-to-bvh-converter.ipynb') with open(notebook_path, 'wb') as f: f.write(request.execute()) # Directly write the downloaded content # Add kernel specification to notebook with open(notebook_path, 'r') as f: notebook_content = json.load(f) notebook_content['metadata']['kernelspec'] = { "display_name": "Python 3", "language": "python", "name": "python3" } notebook_content['metadata']['language_info'] = { "name": "python", "version": "3.10.12", "mimetype": "text/x-python", "codemirror_mode": {"name": "ipython", "version": 3}, "pygments_lexer": "ipython3", "nbconvert_exporter": "python", "file_extension": ".py" } with open(notebook_path, 'w') as f: json.dump(notebook_content, f) return notebook_path except Exception as e: st.error(f"Failed to download notebook from Drive: {e}") raise def push_kaggle_kernel(api, temp_dir, notebook_slug): try: drive_service = setup_drive_service() local_notebook_path = download_notebook_from_drive(drive_service, temp_dir) kernel_file = os.path.join(temp_dir, 'kernel.ipynb') shutil.copy(local_notebook_path, kernel_file) # Verify kernel spec (for debugging) with open(kernel_file, 'r') as f: metadata_content = json.load(f)['metadata'] code_file = "kernel.ipynb" kernel_type = "notebook" metadata = { "id": notebook_slug, "title": "video-to-bvh-converter", "code_file": code_file, "language": "python", "kernel_type": kernel_type, "enable_gpu": True, "enable_internet": True, "is_private": True, "accelerator": "gpu", "gpu_product": "T4x2", "competition_sources": [], "dataset_sources": ["amanu1234/pipeline"], # Add your dataset here "kernel_sources": [] } metadata_file = os.path.join(temp_dir, 'kernel-metadata.json') with open(metadata_file, 'w') as f: json.dump(metadata, f) cmd = f"kaggle kernels push -p {temp_dir}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.returncode != 0: st.error(f"Kernel push failed: {result.stderr}") raise Exception(f"Push failed: {result.stderr}") os.remove(local_notebook_path) os.remove(kernel_file) kernel_url = f"https://www.kaggle.com/code/{notebook_slug}" return kernel_url except Exception as e: st.error(f"Failed to push kernel: {str(e)}") raise def check_kernel_exists(api, notebook_slug): try: kernels = api.kernels_list(mine=True, search=notebook_slug) for kernel in kernels: if kernel.ref == notebook_slug: return True return False except Exception as e: st.error(f"Kernel check failed: {e}") return False def download_and_save_bvh(bvh_url, filename): try: response = requests.get(bvh_url) if response.status_code == 200: temp_dir = tempfile.mkdtemp() bvh_path = os.path.join(temp_dir, filename) with open(bvh_path, 'wb') as f: f.write(response.content) return bvh_path, response.content else: st.error(f"Failed to download BVH: Status code {response.status_code}") return None, None except Exception as e: st.error(f"Error downloading BVH: {e}") return None, None def process_video(api, drive_service, video_file): video_file_id = None bvh_file_id = None try: with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: tmp_file.write(video_file.read()) video_path = tmp_file.name video_file_id, video_shareable_link = upload_to_drive(drive_service, video_path, "input_video.mp4") st.success(f"Video uploaded to Drive: {video_shareable_link}") username = os.environ.get("KAGGLE_USERNAME") notebook_slug = f"{username}/video-to-bvh-converter" kernel_exists = check_kernel_exists(api, notebook_slug) if not kernel_exists: pass temp_dir = tempfile.mkdtemp() with st.spinner("Triggering..."): kernel_url = push_kaggle_kernel(api, temp_dir, notebook_slug) progress_bar = st.progress(0.0) progress_text = st.empty() with st.spinner("Waiting for video processing..."): start_time = time.time() execution_started = False retry_count = 0 max_retries = 3 overall_timeout = 1800 while time.time() - start_time < overall_timeout: try: status_response = api.kernels_status(notebook_slug) current_status = status_response.status if hasattr(status_response, 'status') else 'unknown' if current_status in ['queued', 'running']: execution_started = True if current_status == 'queued': progress_bar.progress(0.2) progress_text.text("Queued - Waiting for GPU...") elif current_status == 'running': progress_bar.progress(0.4) progress_text.text("Processing video...") elif current_status == 'complete' and not execution_started: push_kaggle_kernel(api, temp_dir, notebook_slug) time.sleep(10) continue elif current_status == 'error' and not execution_started: if retry_count < max_retries: time.sleep(10) push_kaggle_kernel(api, temp_dir, notebook_slug) retry_count += 1 start_time = time.time() continue else: if video_file_id: delete_from_drive(drive_service, video_file_id) return None bvh_file_id, bvh_url, bvh_filename = get_bvh_from_folder(drive_service) if bvh_url and bvh_filename: # Immediately download the BVH file bvh_path, bvh_data = download_and_save_bvh(bvh_url, bvh_filename or "motion_capture.bvh") if bvh_path and bvh_data: # Generate a timestamp for unique filenames timestamp_str = time.strftime("%Y%m%d_%H%M%S") # Create a filename with timestamp if bvh_filename: # Extract the base name without extension base_name, ext = os.path.splitext(bvh_filename) timestamped_filename = f"{base_name}_{timestamp_str}{ext}" else: timestamped_filename = f"motion_capture_{timestamp_str}.bvh" progress_bar.progress(1.0) progress_text.text("Complete!") # Immediately delete files from Google Drive if video_file_id: delete_from_drive(drive_service, video_file_id) video_file_id = None # Set to None to prevent double deletion in cleanup if bvh_file_id: delete_from_drive(drive_service, bvh_file_id) bvh_file_id = None # Set to None to prevent double deletion in cleanup st.success("Motion capture complete! BVH file ready for download.") # Save the BVH data to session state for download st.session_state['bvh_data'] = bvh_data st.session_state['bvh_filename'] = timestamped_filename st.session_state['bvh_path'] = bvh_path # Save path for deletion later # Generate a unique timestamp for this result st.session_state['bvh_timestamp'] = int(time.time()) return { 'bvh_data': bvh_data, 'bvh_path': bvh_path, 'bvh_filename': timestamped_filename, 'timestamp': st.session_state['bvh_timestamp'] } if execution_started and current_status in ['complete', 'error']: progress_bar.progress(0.8 if current_status == 'complete' else 0.6) progress_text.text("Finalizing..." if current_status == 'complete' else "Error occurred...") if current_status == 'error': if video_file_id: delete_from_drive(drive_service, video_file_id) return None time.sleep(10) except Exception as e: st.error(f"Status check failed: {str(e)}") time.sleep(10) if video_file_id: delete_from_drive(drive_service, video_file_id) return None except Exception as e: st.error(f"Processing error: {e}") if video_file_id: delete_from_drive(drive_service, video_file_id) if bvh_file_id: delete_from_drive(drive_service, bvh_file_id) return None finally: if 'video_path' in locals(): os.unlink(video_path) if 'temp_dir' in locals() and os.path.exists(temp_dir): shutil.rmtree(temp_dir) def main(): st.set_page_config( page_title="Motion Capture Studio | Video to BVH Converter", page_icon="🎬", layout="wide", initial_sidebar_state="collapsed" ) st.markdown(""" """, unsafe_allow_html=True) st.markdown('
Convert videos to BVH with AI
', unsafe_allow_html=True) st.markdown('(Note: Every Body part should be visible correctly and face should not be covered)
', unsafe_allow_html=True) api = setup_kaggle_api() drive_service = setup_drive_service() st.markdown('