Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import tempfile | |
import shutil | |
from kaggle.api.kaggle_api_extended import KaggleApi | |
import os | |
import time | |
import json | |
import subprocess | |
from google.oauth2.credentials import Credentials | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from googleapiclient.errors import HttpError | |
def setup_kaggle_api(): | |
try: | |
os.environ["KAGGLE_USERNAME"] = st.secrets["KAGGLE_USERNAME"] | |
os.environ["KAGGLE_KEY"] = st.secrets["KAGGLE_KEY"] | |
api = KaggleApi() | |
api.authenticate() | |
return api | |
except KeyError as e: | |
st.error(f"Missing Kaggle secret: {e}") | |
st.stop() | |
except Exception as e: | |
st.error(f"Kaggle API setup failed: {e}") | |
st.stop() | |
def setup_drive_service(): | |
try: | |
credentials_dict = dict(st.secrets["GOOGLE_SERVICE_ACCOUNT"]) | |
credentials = Credentials( | |
token=credentials_dict["access_token"], | |
refresh_token=credentials_dict["refresh_token"], | |
client_id=credentials_dict["client_id"], | |
client_secret=credentials_dict["client_secret"], | |
token_uri=credentials_dict["token_uri"], | |
scopes=credentials_dict["scopes"] | |
) | |
if credentials.expired and credentials.refresh_token: | |
credentials.refresh(requests.Request()) | |
drive_service = build('drive', 'v3', credentials=credentials) | |
return drive_service | |
except KeyError as e: | |
st.error(f"Missing key in GOOGLE_SERVICE_ACCOUNT: {e}") | |
st.stop() | |
except Exception as e: | |
st.error(f"Google Drive auth failed: {e}") | |
st.stop() | |
def upload_to_drive(drive_service, file_path, title, folder_id="1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd"): | |
try: | |
file_metadata = {'name': title, 'parents': [folder_id]} | |
media = MediaFileUpload(file_path, resumable=True) | |
file = drive_service.files().create( | |
body=file_metadata, media_body=media, fields='id, name' | |
).execute() | |
file_id = file['id'] | |
drive_service.permissions().create( | |
fileId=file_id, | |
body={'type': 'anyone', 'role': 'reader'}, | |
fields='id' | |
).execute() | |
shareable_link = f"https://drive.google.com/file/d/{file_id}/view?usp=sharing" | |
return file_id, shareable_link | |
except HttpError as e: | |
st.error(f"Drive upload failed: {e}") | |
raise | |
except Exception as e: | |
st.error(f"Unexpected error during upload: {e}") | |
raise | |
def delete_from_drive(drive_service, file_id): | |
try: | |
drive_service.files().delete(fileId=file_id).execute() | |
except Exception as e: | |
st.error(f"Failed to delete file {file_id} from Drive: {e}") | |
def get_bvh_from_folder(drive_service, folder_id="1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd"): | |
try: | |
query = f"'{folder_id}' in parents and name contains '.bvh'" | |
response = drive_service.files().list(q=query, fields="files(id, name, mimeType)", pageSize=1).execute() | |
files = response.get('files', []) | |
if files: | |
bvh_file = files[0] | |
bvh_id = bvh_file['id'] | |
bvh_url = f"https://drive.google.com/uc?id={bvh_id}" | |
return bvh_id, bvh_url | |
return None, None | |
except Exception as e: | |
st.error(f"Error checking folder for BVH: {e}") | |
return None, None | |
def push_kaggle_kernel(api, temp_dir, notebook_slug): | |
try: | |
local_notebook_path = os.path.join(os.path.dirname(__file__), 'video-to-bvh-converter.ipynb') | |
if not os.path.exists(local_notebook_path): | |
st.error(f"Notebook file not found") | |
raise FileNotFoundError | |
kernel_file = os.path.join(temp_dir, 'kernel.ipynb') | |
shutil.copy(local_notebook_path, kernel_file) | |
code_file = "kernel.ipynb" | |
kernel_type = "notebook" | |
metadata = { | |
"id": notebook_slug, | |
"title": "video-to-bvh-converter", | |
"code_file": code_file, | |
"language": "python", | |
"kernel_type": kernel_type, | |
"enable_gpu": True, | |
"enable_internet": True, | |
"is_private": True, | |
"competition_sources": [], | |
"dataset_sources": [], | |
"kernel_sources": [] | |
} | |
metadata_file = os.path.join(temp_dir, 'kernel-metadata.json') | |
with open(metadata_file, 'w') as f: | |
json.dump(metadata, f) | |
cmd = f"kaggle kernels push -p {temp_dir}" | |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
if result.returncode != 0: | |
st.error(f"Kaggle push failed: {result.stderr}") | |
raise Exception | |
kernel_url = f"https://www.kaggle.com/code/{notebook_slug}" | |
return kernel_url | |
except Exception as e: | |
st.error(f"Failed to push kernel: {str(e)}") | |
raise | |
def check_kernel_exists(api, notebook_slug): | |
try: | |
kernels = api.kernels_list(mine=True, search=notebook_slug) | |
for kernel in kernels: | |
if kernel.ref == notebook_slug: | |
return True | |
return False | |
except Exception as e: | |
st.error(f"Kernel check failed: {e}") | |
return False | |
def process_video(api, drive_service, video_file): | |
video_file_id = None | |
bvh_file_id = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: | |
tmp_file.write(video_file.read()) | |
video_path = tmp_file.name | |
video_file_id, video_shareable_link = upload_to_drive(drive_service, video_path, "input_video.mp4") | |
st.success(f"Video uploaded to Drive: {video_shareable_link}") | |
username = st.secrets['KAGGLE_USERNAME'] | |
notebook_slug = f"{username}/video-to-bvh-converter" | |
kernel_exists = check_kernel_exists(api, notebook_slug) | |
if not kernel_exists: | |
pass | |
temp_dir = tempfile.mkdtemp() | |
with st.spinner("Triggering Kaggle notebook..."): | |
kernel_url = push_kaggle_kernel(api, temp_dir, notebook_slug) | |
progress_bar = st.progress(0.0) | |
progress_text = st.empty() | |
with st.spinner("Waiting for video processing..."): | |
start_time = time.time() | |
execution_started = False | |
retry_count = 0 | |
max_retries = 3 | |
overall_timeout = 1800 | |
while time.time() - start_time < overall_timeout: | |
try: | |
status_response = api.kernels_status(notebook_slug) | |
current_status = status_response.status if hasattr(status_response, 'status') else 'unknown' | |
if current_status in ['queued', 'running']: | |
execution_started = True | |
if current_status == 'queued': | |
progress_bar.progress(0.2) | |
progress_text.text("Queued - Waiting for GPU...") | |
elif current_status == 'running': | |
progress_bar.progress(0.4) | |
progress_text.text("Processing video...") | |
elif current_status == 'complete' and not execution_started: | |
push_kaggle_kernel(api, temp_dir, notebook_slug) | |
time.sleep(10) | |
continue | |
elif current_status == 'error' and not execution_started: | |
if retry_count < max_retries: | |
time.sleep(10) | |
push_kaggle_kernel(api, temp_dir, notebook_slug) | |
retry_count += 1 | |
start_time = time.time() | |
continue | |
else: | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
return None | |
bvh_file_id, bvh_url = get_bvh_from_folder(drive_service) | |
if bvh_url: | |
response = requests.get(bvh_url) | |
if response.status_code == 200: | |
progress_bar.progress(1.0) | |
progress_text.text("Complete!") | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
if bvh_file_id: | |
delete_from_drive(drive_service, bvh_file_id) | |
st.write("BVH generated") | |
return {'bvh_data': response.content} | |
if execution_started and current_status in ['complete', 'error']: | |
progress_bar.progress(0.8 if current_status == 'complete' else 0.6) | |
progress_text.text("Finalizing..." if current_status == 'complete' else "Error occurred...") | |
if current_status == 'error': | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
return None | |
time.sleep(10) | |
except Exception as e: | |
st.error(f"Status check failed: {str(e)}") | |
time.sleep(10) | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
return None | |
except Exception as e: | |
st.error(f"Processing error: {e}") | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
if bvh_file_id: | |
delete_from_drive(drive_service, bvh_file_id) | |
return None | |
finally: | |
if 'video_path' in locals(): | |
os.unlink(video_path) | |
if 'temp_dir' in locals() and os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
def main(): | |
st.set_page_config( | |
page_title="Motion Capture Studio | Video to BVH Converter", | |
page_icon="🎬", | |
layout="wide", | |
initial_sidebar_state="collapsed" | |
) | |
st.markdown(""" | |
<style> | |
:root { | |
--bg-color: #1a1a1a; | |
--card-bg: #252525; | |
--primary-color: #bb86fc; | |
--secondary-color: #03dac6; | |
--error-color: #cf6679; | |
--text-color: #e0e0e0; | |
--text-secondary: #a0a0a0; | |
} | |
.stApp { background-color: var(--bg-color); } | |
h1, h2, h3, p, div { color: var(--text-color) !important; } | |
.card { background-color: var(--card-bg); border-radius: 20px; padding: 2rem; margin: 1rem auto; max-width: 1200px; } | |
.main-title { font-size: 3.5rem; font-weight: 900; background: linear-gradient(135deg, var(--primary-color), var(--secondary-color)); -webkit-background-clip: text; -webkit-text-fill-color: transparent; text-align: center; } | |
.subtitle { font-size: 1.3rem; color: var(--text-secondary); text-align: center; } | |
.section-title { font-size: 1.5rem; font-weight: 700; color: var(--primary-color) !important; } | |
.stButton > button { background: linear-gradient(135deg, var(--primary-color), #9b59f5); color: #fff !important; border-radius: 12px; padding: 0.8rem 2.5rem; font-weight: 600; font-size: 1.2rem; border: none; width: 100%; } | |
.stDownloadButton > button { background: linear-gradient(135deg, var(--secondary-color), #02b3a3); color: #fff !important; border-radius: 12px; padding: 0.8rem 2.5rem; font-weight: 600; font-size: 1.2rem; border: none; width: 100%; } | |
</style> | |
""", unsafe_allow_html=True) | |
st.markdown('<h1 class="main-title">Motion Capture Studio</h1>', unsafe_allow_html=True) | |
st.markdown('<p class="subtitle">Convert videos to BVH with AI</p>', unsafe_allow_html=True) | |
api = setup_kaggle_api() | |
drive_service = setup_drive_service() | |
st.markdown('<div class="card">', unsafe_allow_html=True) | |
st.markdown('<h3 class="section-title"></h3>', unsafe_allow_html=True) | |
status_col1, status_col2 = st.columns(2) | |
with status_col1: | |
try: | |
username = st.secrets['KAGGLE_USERNAME'] | |
notebook_slug = f"{username}/video-to-bvh-converter" | |
kernel_exists = check_kernel_exists(api, notebook_slug) | |
if kernel_exists: | |
st.success(f"✅ Kaggle kernel found: {notebook_slug}") | |
else: | |
st.error(f"❌ Kaggle kernel not found: {notebook_slug}") | |
except Exception as e: | |
st.error(f"❌failed: {e}") | |
with status_col2: | |
try: | |
drive_about = drive_service.about().get(fields="user,storageQuota").execute() | |
storage_used = int(drive_about.get('storageQuota', {}).get('usage', 0)) / (1024 * 1024) | |
# st.success(f"✅ Google Drive: {storage_used:.2f} MB used") | |
except Exception as e: | |
st.error(f"❌Drive check failed: {e}") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.markdown('<h3 class="section-title">Upload Video</h3>', unsafe_allow_html=True) | |
uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov']) | |
if uploaded_file: | |
st.session_state['uploaded_file'] = uploaded_file | |
with col2: | |
st.markdown('<h3 class="section-title">Preview</h3>', unsafe_allow_html=True) | |
if uploaded_file := st.session_state.get('uploaded_file'): | |
st.video(uploaded_file) | |
if st.session_state.get('uploaded_file'): | |
st.markdown('<h3 class="section-title">Processing Options</h3>', unsafe_allow_html=True) | |
if st.button("Start Motion Capture"): | |
result = process_video(api, drive_service, st.session_state['uploaded_file']) | |
if result and 'bvh_data' in result: | |
st.success("Motion capture complete!") | |
st.download_button( | |
label="Download BVH", | |
data=result['bvh_data'], | |
file_name="motion_capture.bvh", | |
mime="application/octet-stream" | |
) | |
else: | |
st.error("Failed to retrieve BVH file.") | |
st.markdown('</div>', unsafe_allow_html=True) | |
if __name__ == "__main__": | |
main() |