Spaces:
Sleeping
Sleeping
File size: 6,988 Bytes
f8aaa9d 78aee58 d38e256 f8aaa9d d38e256 f8aaa9d 78aee58 f8aaa9d c137e5c f8aaa9d 78aee58 f8aaa9d c137e5c f8aaa9d d38e256 f8aaa9d d38e256 d638712 78aee58 c137e5c d38e256 c137e5c 78aee58 d638712 78aee58 d38e256 78aee58 d38e256 78aee58 d638712 78aee58 001b623 d38e256 0f96bc2 d38e256 c137e5c 7c2c622 d38e256 f8aaa9d 78aee58 c137e5c d38e256 c137e5c d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 78aee58 d38e256 f8aaa9d c137e5c f8aaa9d 78aee58 0f96bc2 d38e256 78aee58 0f96bc2 d38e256 78aee58 f8aaa9d d38e256 78aee58 f8aaa9d 78aee58 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
import os
import time
import json
import gradio as gr
import cv2
from google import genai
from google.genai import types
# Retrieve API key from environment variables
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("Please set the GOOGLE_API_KEY environment variable with your Google Cloud API key.")
# Initialize the Gemini API client
client = genai.Client(api_key=GOOGLE_API_KEY)
MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Model supporting video analysis
def upload_and_process_video(video_file: str, timeout: int = 300) -> types.File:
"""
Upload a video file to the Gemini API and wait for processing.
Args:
video_file (str): Path to the video file
timeout (int): Maximum time to wait for processing in seconds (default: 5 minutes)
Returns:
types.File: Processed video file object
"""
try:
video_file_obj = client.files.upload(file=video_file)
start_time = time.time()
while video_file_obj.state == "PROCESSING":
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
raise TimeoutError(f"Video processing timed out after {timeout} seconds.")
print(f"Processing {video_file}... ({int(elapsed_time)}s elapsed)")
time.sleep(10)
video_file_obj = client.files.get(name=video_file_obj.name)
if video_file_obj.state == "FAILED":
raise ValueError(f"Video processing failed: {video_file_obj.state}")
print(f"Video processing complete: {video_file_obj.uri}")
return video_file_obj
except Exception as e:
raise Exception(f"Error uploading video: {str(e)}")
def hhmmss_to_seconds(timestamp: str) -> float:
"""
Convert HH:MM:SS timestamp to seconds.
Args:
timestamp (str): Time in HH:MM:SS format
Returns:
float: Time in seconds
"""
h, m, s = map(float, timestamp.split(":"))
return h * 3600 + m * 60 + s
def extract_key_frames(video_file: str, key_frames_json: str) -> list:
"""
Extract key frames from the video based on JSON data.
Args:
video_file (str): Path to the video file
key_frames_json (str): JSON string with key frames data
Returns:
list: List of tuples (image, caption)
"""
try:
key_frames = json.loads(key_frames_json)
if not isinstance(key_frames, list):
raise ValueError("Key frames data must be a list of objects.")
extracted_frames = []
cap = cv2.VideoCapture(video_file)
if not cap.isOpened():
raise ValueError("Could not open video file.")
for frame in key_frames:
timestamp = frame.get("timecode", frame.get("timestamp", ""))
title = frame.get("title", frame.get("caption", "Untitled"))
if not timestamp:
continue
seconds = hhmmss_to_seconds(timestamp)
cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000)
ret, frame_img = cap.read()
if ret:
frame_rgb = cv2.cvtColor(frame_img, cv2.COLOR_BGR2RGB)
caption = f"{timestamp}: {title}"
extracted_frames.append((frame_rgb, caption))
cap.release()
return extracted_frames
except Exception as e:
print(f"Error extracting frames: {str(e)}")
return []
def analyze_video(video_file: str, user_query: str) -> tuple[str, list]:
"""
Analyze the video using the Gemini API and extract key frames.
Args:
video_file (str): Path to the video file
user_query (str): Optional query to guide the analysis
Returns:
tuple: (Markdown report, list of key frames as (image, caption) tuples)
"""
# Validate input
if not video_file or not os.path.exists(video_file):
return "Please upload a valid video file.", []
if not video_file.lower().endswith('.mp4'):
return "Please upload an MP4 video file.", []
try:
# Upload and process the video
video_file_obj = upload_and_process_video(video_file)
# Step 1: Generate detailed summary
summary_prompt = "Provide a detailed summary of this video with timestamps for key sections."
if user_query:
summary_prompt += f" Focus on: {user_query}"
summary_response = client.models.generate_content(
model=MODEL_NAME,
contents=[video_file_obj, summary_prompt]
)
summary = summary_response.text
# Step 2: Extract key frames in an agentic loop
key_frames_prompt = (
"Identify key frames in this video and return them as a JSON array. "
"Each object should have 'timecode' (in HH:MM:SS format) and 'title' describing the scene."
)
if user_query:
key_frames_prompt += f" Focus on: {user_query}"
key_frames_response = client.models.generate_content(
model=MODEL_NAME,
contents=[video_file_obj, key_frames_prompt]
)
key_frames_json = key_frames_response.text
# Parse and extract frames
key_frames = extract_key_frames(video_file, key_frames_json)
# Generate Markdown report
markdown_report = (
"## Video Analysis Report\n\n"
f"**Summary:**\n{summary}\n"
f"**Video URI:** {video_file_obj.uri}\n"
)
if key_frames:
markdown_report += "\n**Key Frames Identified:**\n"
for i, (_, caption) in enumerate(key_frames, 1):
markdown_report += f"- Frame {i}: {caption}\n"
else:
markdown_report += "\n*No key frames extracted.*\n"
return markdown_report, key_frames
except Exception as e:
error_msg = (
"## Video Analysis Report\n\n"
f"**Error:** Unable to analyze video.\n"
f"Details: {str(e)}\n"
"Please check your API key, ensure the video is valid, or try again later."
)
return error_msg, []
# Define the Gradio interface
iface = gr.Interface(
fn=analyze_video,
inputs=[
gr.Video(label="Upload Video File (MP4)"),
gr.Textbox(label="Analysis Query (optional)",
placeholder="e.g., focus on main events or themes")
],
outputs=[
gr.Markdown(label="Video Analysis Report"),
gr.Gallery(label="Key Frames", columns=2)
],
title="AI Video Analysis Agent with Gemini",
description=(
"Upload an MP4 video to get a detailed summary and key frames using Google's Gemini API. "
"This tool analyzes the video content directly and extracts key moments as images. "
"Optionally, provide a query to guide the analysis."
)
)
if __name__ == "__main__":
iface.launch(share=True) |