Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import tempfile | |
import shutil | |
from kaggle.api.kaggle_api_extended import KaggleApi | |
import os | |
import time | |
import json | |
import subprocess | |
from google.oauth2.credentials import Credentials | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from googleapiclient.errors import HttpError | |
def setup_kaggle_api(): | |
try: | |
os.environ["KAGGLE_USERNAME"] = os.environ.get("KAGGLE_USERNAME") | |
os.environ["KAGGLE_KEY"] = os.environ.get("KAGGLE_KEY") | |
api = KaggleApi() | |
api.authenticate() | |
return api | |
except KeyError as e: | |
st.error(f"Missing environment variable: {e}") | |
st.stop() | |
except Exception as e: | |
st.error(f" API setup failed: {e}") | |
st.stop() | |
def setup_drive_service(): | |
try: | |
credentials_json = os.environ.get("GOOGLE_SERVICE_ACCOUNT") | |
credentials_dict = json.loads(credentials_json) | |
credentials = Credentials( | |
token=credentials_dict["access_token"], | |
refresh_token=credentials_dict["refresh_token"], | |
client_id=credentials_dict["client_id"], | |
client_secret=credentials_dict["client_secret"], | |
token_uri=credentials_dict["token_uri"], | |
scopes=credentials_dict["scopes"] | |
) | |
if credentials.expired and credentials.refresh_token: | |
credentials.refresh(requests.Request()) | |
drive_service = build('drive', 'v3', credentials=credentials) | |
return drive_service | |
except KeyError as e: | |
st.error(f"Missing GOOGLE_SERVICE_ACCOUNT environment variable: {e}") | |
st.stop() | |
except json.JSONDecodeError as e: | |
st.error(f"Invalid JSON in GOOGLE_SERVICE_ACCOUNT: {e}") | |
st.stop() | |
except Exception as e: | |
st.error(f"Google Drive auth failed: {e}") | |
st.stop() | |
def upload_to_drive(drive_service, file_path, title, folder_id=None): | |
try: | |
folder_id = os.environ.get("DRIVE_FOLDER_ID", "1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd") | |
file_metadata = {'name': title, 'parents': [folder_id]} | |
media = MediaFileUpload(file_path, resumable=True) | |
file = drive_service.files().create( | |
body=file_metadata, media_body=media, fields='id, name' | |
).execute() | |
file_id = file['id'] | |
drive_service.permissions().create( | |
fileId=file_id, | |
body={'type': 'anyone', 'role': 'reader'}, | |
fields='id' | |
).execute() | |
shareable_link = f"https://drive.google.com/file/d/{file_id}/view?usp=sharing" | |
return file_id, shareable_link | |
except HttpError as e: | |
st.error(f"Drive upload failed: {e}") | |
raise | |
except Exception as e: | |
st.error(f"Unexpected error during upload: {e}") | |
raise | |
def delete_from_drive(drive_service, file_id): | |
try: | |
drive_service.files().delete(fileId=file_id).execute() | |
except Exception as e: | |
st.error(f"Failed to delete file {file_id} from Drive: {e}") | |
def get_bvh_from_folder(drive_service, folder_id=None): | |
try: | |
folder_id = os.environ.get("DRIVE_FOLDER_ID", "1T6v7Iqc90-NA-F3I-HeHDSvEaIyFibKd") | |
query = f"'{folder_id}' in parents and name contains '.bvh'" | |
response = drive_service.files().list(q=query, fields="files(id, name, mimeType)", pageSize=1).execute() | |
files = response.get('files', []) | |
if files: | |
bvh_file = files[0] | |
bvh_id = bvh_file['id'] | |
bvh_url = f"https://drive.google.com/uc?id={bvh_id}" | |
return bvh_id, bvh_url, bvh_file['name'] | |
return None, None, None | |
except Exception as e: | |
st.error(f"Error checking folder for BVH: {e}") | |
return None, None, None | |
def download_notebook_from_drive(drive_service, temp_dir): | |
try: | |
notebook_file_id = os.environ.get("NOTEBOOK_FILE_ID") | |
if not notebook_file_id: | |
st.error("NOTEBOOK_FILE_ID environment variable not set") | |
raise KeyError("NOTEBOOK_FILE_ID not set") | |
request = drive_service.files().get_media(fileId=notebook_file_id) | |
notebook_path = os.path.join(temp_dir, 'video-to-bvh-converter.ipynb') | |
with open(notebook_path, 'wb') as f: | |
f.write(request.execute()) # Directly write the downloaded content | |
# Add kernel specification to notebook | |
with open(notebook_path, 'r') as f: | |
notebook_content = json.load(f) | |
notebook_content['metadata']['kernelspec'] = { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
} | |
notebook_content['metadata']['language_info'] = { | |
"name": "python", | |
"version": "3.10.12", | |
"mimetype": "text/x-python", | |
"codemirror_mode": {"name": "ipython", "version": 3}, | |
"pygments_lexer": "ipython3", | |
"nbconvert_exporter": "python", | |
"file_extension": ".py" | |
} | |
with open(notebook_path, 'w') as f: | |
json.dump(notebook_content, f) | |
return notebook_path | |
except Exception as e: | |
st.error(f"Failed to download notebook from Drive: {e}") | |
raise | |
def push_kaggle_kernel(api, temp_dir, notebook_slug): | |
try: | |
drive_service = setup_drive_service() | |
local_notebook_path = download_notebook_from_drive(drive_service, temp_dir) | |
kernel_file = os.path.join(temp_dir, 'kernel.ipynb') | |
shutil.copy(local_notebook_path, kernel_file) | |
# Verify kernel spec (for debugging) | |
with open(kernel_file, 'r') as f: | |
metadata_content = json.load(f)['metadata'] | |
code_file = "kernel.ipynb" | |
kernel_type = "notebook" | |
metadata = { | |
"id": notebook_slug, | |
"title": "video-to-bvh-converter", | |
"code_file": code_file, | |
"language": "python", | |
"kernel_type": kernel_type, | |
"enable_gpu": True, | |
"enable_internet": True, | |
"is_private": True, | |
"accelerator": "gpu", | |
"gpu_product": "T4x2", | |
"competition_sources": [], | |
"dataset_sources": ["amanu1234/pipeline"], # Add your dataset here | |
"kernel_sources": [] | |
} | |
metadata_file = os.path.join(temp_dir, 'kernel-metadata.json') | |
with open(metadata_file, 'w') as f: | |
json.dump(metadata, f) | |
cmd = f"kaggle kernels push -p {temp_dir}" | |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
if result.returncode != 0: | |
st.error(f"Kernel push failed: {result.stderr}") | |
raise Exception(f"Push failed: {result.stderr}") | |
os.remove(local_notebook_path) | |
os.remove(kernel_file) | |
kernel_url = f"https://www.kaggle.com/code/{notebook_slug}" | |
return kernel_url | |
except Exception as e: | |
st.error(f"Failed to push kernel: {str(e)}") | |
raise | |
def check_kernel_exists(api, notebook_slug): | |
try: | |
kernels = api.kernels_list(mine=True, search=notebook_slug) | |
for kernel in kernels: | |
if kernel.ref == notebook_slug: | |
return True | |
return False | |
except Exception as e: | |
st.error(f"Kernel check failed: {e}") | |
return False | |
def download_and_save_bvh(bvh_url, filename): | |
try: | |
response = requests.get(bvh_url) | |
if response.status_code == 200: | |
temp_dir = tempfile.mkdtemp() | |
bvh_path = os.path.join(temp_dir, filename) | |
with open(bvh_path, 'wb') as f: | |
f.write(response.content) | |
return bvh_path, response.content | |
else: | |
st.error(f"Failed to download BVH: Status code {response.status_code}") | |
return None, None | |
except Exception as e: | |
st.error(f"Error downloading BVH: {e}") | |
return None, None | |
def process_video(api, drive_service, video_file): | |
video_file_id = None | |
bvh_file_id = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: | |
tmp_file.write(video_file.read()) | |
video_path = tmp_file.name | |
video_file_id, video_shareable_link = upload_to_drive(drive_service, video_path, "input_video.mp4") | |
st.success(f"Video uploaded to Drive: {video_shareable_link}") | |
username = os.environ.get("KAGGLE_USERNAME") | |
notebook_slug = f"{username}/video-to-bvh-converter" | |
kernel_exists = check_kernel_exists(api, notebook_slug) | |
if not kernel_exists: | |
pass | |
temp_dir = tempfile.mkdtemp() | |
with st.spinner("Triggering..."): | |
kernel_url = push_kaggle_kernel(api, temp_dir, notebook_slug) | |
progress_bar = st.progress(0.0) | |
progress_text = st.empty() | |
with st.spinner("Waiting for video processing..."): | |
start_time = time.time() | |
execution_started = False | |
retry_count = 0 | |
max_retries = 3 | |
overall_timeout = 1800 | |
while time.time() - start_time < overall_timeout: | |
try: | |
status_response = api.kernels_status(notebook_slug) | |
current_status = status_response.status if hasattr(status_response, 'status') else 'unknown' | |
if current_status in ['queued', 'running']: | |
execution_started = True | |
if current_status == 'queued': | |
progress_bar.progress(0.2) | |
progress_text.text("Queued - Waiting for GPU...") | |
elif current_status == 'running': | |
progress_bar.progress(0.4) | |
progress_text.text("Processing video...") | |
elif current_status == 'complete' and not execution_started: | |
push_kaggle_kernel(api, temp_dir, notebook_slug) | |
time.sleep(10) | |
continue | |
elif current_status == 'error' and not execution_started: | |
if retry_count < max_retries: | |
time.sleep(10) | |
push_kaggle_kernel(api, temp_dir, notebook_slug) | |
retry_count += 1 | |
start_time = time.time() | |
continue | |
else: | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
return None | |
bvh_file_id, bvh_url, bvh_filename = get_bvh_from_folder(drive_service) | |
if bvh_url and bvh_filename: | |
# Immediately download the BVH file | |
bvh_path, bvh_data = download_and_save_bvh(bvh_url, bvh_filename or "motion_capture.bvh") | |
if bvh_path and bvh_data: | |
# Generate a timestamp for unique filenames | |
timestamp_str = time.strftime("%Y%m%d_%H%M%S") | |
# Create a filename with timestamp | |
if bvh_filename: | |
# Extract the base name without extension | |
base_name, ext = os.path.splitext(bvh_filename) | |
timestamped_filename = f"{base_name}_{timestamp_str}{ext}" | |
else: | |
timestamped_filename = f"motion_capture_{timestamp_str}.bvh" | |
progress_bar.progress(1.0) | |
progress_text.text("Complete!") | |
# Immediately delete files from Google Drive | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
video_file_id = None # Set to None to prevent double deletion in cleanup | |
if bvh_file_id: | |
delete_from_drive(drive_service, bvh_file_id) | |
bvh_file_id = None # Set to None to prevent double deletion in cleanup | |
st.success("Motion capture complete! BVH file ready for download.") | |
# Save the BVH data to session state for download | |
st.session_state['bvh_data'] = bvh_data | |
st.session_state['bvh_filename'] = timestamped_filename | |
st.session_state['bvh_path'] = bvh_path # Save path for deletion later | |
# Generate a unique timestamp for this result | |
st.session_state['bvh_timestamp'] = int(time.time()) | |
return { | |
'bvh_data': bvh_data, | |
'bvh_path': bvh_path, | |
'bvh_filename': timestamped_filename, | |
'timestamp': st.session_state['bvh_timestamp'] | |
} | |
if execution_started and current_status in ['complete', 'error']: | |
progress_bar.progress(0.8 if current_status == 'complete' else 0.6) | |
progress_text.text("Finalizing..." if current_status == 'complete' else "Error occurred...") | |
if current_status == 'error': | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
return None | |
time.sleep(10) | |
except Exception as e: | |
st.error(f"Status check failed: {str(e)}") | |
time.sleep(10) | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
return None | |
except Exception as e: | |
st.error(f"Processing error: {e}") | |
if video_file_id: | |
delete_from_drive(drive_service, video_file_id) | |
if bvh_file_id: | |
delete_from_drive(drive_service, bvh_file_id) | |
return None | |
finally: | |
if 'video_path' in locals(): | |
os.unlink(video_path) | |
if 'temp_dir' in locals() and os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
def main(): | |
st.set_page_config( | |
page_title="Motion Capture Studio | Video to BVH Converter", | |
page_icon="🎬", | |
layout="wide", | |
initial_sidebar_state="collapsed" | |
) | |
st.markdown(""" | |
<style> | |
:root { | |
--bg-color: #1a1a1a; | |
--card-bg: #252525; | |
--primary-color: #bb86fc; | |
--secondary-color: #03dac6; | |
--error-color: #cf6679; | |
--text-color: #e0e0e0; | |
--text-secondary: #a0a0a0; | |
} | |
.stApp { background-color: var(--bg-color); } | |
h1, h2, h3, p, div { color: var(--text-color) !important; } | |
.card { background-color: var(--card-bg); border-radius: 20px; padding: 2rem; margin: 1rem auto; max-width: 1200px; } | |
.main-title { font-size: 3.5rem; font-weight: 900; background: linear-gradient(135deg, var(--primary-color), var(--secondary-color)); -webkit-background-clip: text; -webkit-text-fill-color: transparent; text-align: center; } | |
.subtitle { font-size: 1.3rem; color: var(--text-secondary); text-align: center; } | |
.section-title { font-size: 1.5rem; font-weight: 700; color: var(--primary-color) !important; } | |
.stButton > button { background: linear-gradient(135deg, var(--primary-color), #9b59f5); color: #fff !important; border-radius: 12px; padding: 0.8rem 2.5rem; font-weight: 600; font-size: 1.2rem; border: none; width: 100%; } | |
.stDownloadButton > button { background: linear-gradient(135deg, var(--secondary-color), #02b3a3); color: #fff !important; border-radius: 12px; padding: 0.8rem 2.5rem; font-weight: 600; font-size: 1.2rem; border: none; width: 100%; } | |
</style> | |
""", unsafe_allow_html=True) | |
st.markdown('<h1 class="main-title">Motion Capture Studio</h1>', unsafe_allow_html=True) | |
st.markdown('<p class="subtitle">Convert videos to BVH with AI</p>', unsafe_allow_html=True) | |
st.markdown('<p class="section-title">(Note: Every Body part should be visible correctly and face should not be covered)</p>', unsafe_allow_html=True) | |
api = setup_kaggle_api() | |
drive_service = setup_drive_service() | |
st.markdown('<div class="card">', unsafe_allow_html=True) | |
st.markdown('<h3 class="section-title"></h3>', unsafe_allow_html=True) | |
status_col1, status_col2 = st.columns(2) | |
with status_col1: | |
try: | |
username = os.environ.get("KAGGLE_USERNAME") | |
notebook_slug = f"{username}/video-to-bvh-converter" | |
kernel_exists = check_kernel_exists(api, notebook_slug) | |
if kernel_exists: | |
st.success(f"✅kernel found") | |
else: | |
st.error(f"❌kernel not found") | |
except Exception as e: | |
st.error(f"❌failed: {e}") | |
with status_col2: | |
try: | |
drive_about = drive_service.about().get(fields="user,storageQuota").execute() | |
storage_used = int(drive_about.get('storageQuota', {}).get('usage', 0)) / (1024 * 1024) | |
# st.success(f"✅ Google Drive: {storage_used:.2f} MB used") | |
except Exception as e: | |
st.error(f"❌Drive check failed: {e}") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.markdown('<h3 class="section-title">Upload Video</h3>', unsafe_allow_html=True) | |
uploaded_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov']) | |
if uploaded_file: | |
st.session_state['uploaded_file'] = uploaded_file | |
with col2: | |
st.markdown('<h3 class="section-title">Preview</h3>', unsafe_allow_html=True) | |
if uploaded_file := st.session_state.get('uploaded_file'): | |
st.video(uploaded_file) | |
if st.session_state.get('uploaded_file'): | |
# Create containers for progress indicators and buttons | |
progress_container = st.container() | |
button_container = st.container() | |
download_container = st.container() | |
# Place these progress indicators above the button but only show when processing | |
with progress_container: | |
# This section will be populated during processing | |
pass | |
# Place the button below the progress indicators | |
with button_container: | |
st.markdown('<h3 class="section-title">Processing Options</h3>', unsafe_allow_html=True) | |
if st.button("Start Motion Capture", key="start_capture_button"): | |
# Clear previous BVH data when starting a new process | |
if 'bvh_data' in st.session_state: | |
del st.session_state['bvh_data'] | |
if 'bvh_filename' in st.session_state: | |
del st.session_state['bvh_filename'] | |
if 'bvh_timestamp' in st.session_state: | |
del st.session_state['bvh_timestamp'] | |
if 'bvh_path' in st.session_state and os.path.exists(st.session_state['bvh_path']): | |
try: | |
os.remove(st.session_state['bvh_path']) | |
except Exception as e: | |
st.warning(f"Could not delete previous local file: {e}") | |
# Use the progress container for progress indicators | |
with progress_container: | |
result = process_video(api, drive_service, st.session_state['uploaded_file']) | |
if not result or 'bvh_data' not in result: | |
st.error("Failed to generate BVH file.") | |
# If BVH data is in session state (from a previous run), offer it for download | |
with download_container: | |
if 'bvh_data' in st.session_state and 'bvh_filename' in st.session_state: | |
timestamp = st.session_state.get('bvh_timestamp', int(time.time())) | |
# Create a callback for when download completes | |
def on_download_complete(): | |
if 'bvh_path' in st.session_state and os.path.exists(st.session_state['bvh_path']): | |
try: | |
os.remove(st.session_state['bvh_path']) | |
st.session_state['bvh_path_deleted'] = True | |
except Exception as e: | |
st.warning(f"Failed to delete local BVH file: {e}") | |
# This doesn't actually work as Streamlit doesn't support download callbacks | |
# Instead, we'll clean up at the start of a new process | |
download_button = st.download_button( | |
label="Download BVH", | |
data=st.session_state['bvh_data'], | |
file_name=st.session_state['bvh_filename'], | |
mime="application/octet-stream", | |
key=f"download_saved_{timestamp}", | |
on_click=on_download_complete | |
) | |
st.markdown('</div>', unsafe_allow_html=True) | |
if __name__ == "__main__": | |
main() |