import os import time import json import gradio as gr import cv2 from google import genai from google.genai import types # Retrieve API key from environment variables GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") if not GOOGLE_API_KEY: raise ValueError("Please set the GOOGLE_API_KEY environment variable with your Google Cloud API key.") # Initialize the Gemini API client client = genai.Client(api_key=GOOGLE_API_KEY) MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Model supporting video analysis def upload_and_process_video(video_file: str, timeout: int = 300) -> types.File: """ Upload a video file to the Gemini API and wait for processing. Args: video_file (str): Path to the video file timeout (int): Maximum time to wait for processing in seconds (default: 5 minutes) Returns: types.File: Processed video file object """ try: video_file_obj = client.files.upload(file=video_file) start_time = time.time() while video_file_obj.state == "PROCESSING": elapsed_time = time.time() - start_time if elapsed_time > timeout: raise TimeoutError(f"Video processing timed out after {timeout} seconds.") print(f"Processing {video_file}... ({int(elapsed_time)}s elapsed)") time.sleep(10) video_file_obj = client.files.get(name=video_file_obj.name) if video_file_obj.state == "FAILED": raise ValueError(f"Video processing failed: {video_file_obj.state}") print(f"Video processing complete: {video_file_obj.uri}") return video_file_obj except Exception as e: raise Exception(f"Error uploading video: {str(e)}") def hhmmss_to_seconds(timestamp: str) -> float: """ Convert HH:MM:SS timestamp to seconds. Args: timestamp (str): Time in HH:MM:SS format Returns: float: Time in seconds """ try: h, m, s = map(float, timestamp.split(":")) return h * 3600 + m * 60 + s except ValueError: return 0.0 # Default to 0 if parsing fails def extract_key_frames(video_file: str, key_frames_response: str) -> list: """ Extract key frames from the video based on Gemini API response. Args: video_file (str): Path to the video file key_frames_response (str): Raw response from Gemini API Returns: list: List of tuples (image, caption) """ extracted_frames = [] cap = cv2.VideoCapture(video_file) if not cap.isOpened(): print("Error: Could not open video file.") return extracted_frames # Strip Markdown code block if present cleaned_response = key_frames_response.strip() if cleaned_response.startswith("```json") and cleaned_response.endswith("```"): cleaned_response = cleaned_response[7:-3].strip() elif cleaned_response.startswith("```") and cleaned_response.endswith("```"): cleaned_response = cleaned_response[3:-3].strip() print(f"Cleaned key frames response: {cleaned_response}") # Debug output try: # Try parsing as JSON key_frames = json.loads(cleaned_response) if not isinstance(key_frames, list): raise ValueError("Response is not a list.") except json.JSONDecodeError as e: print(f"JSON parsing failed: {str(e)}. Falling back to text parsing.") # Fallback: Parse plain text with timecodes (e.g., "00:00:03 - Scene" or "00:00:03: Scene") key_frames = [] lines = cleaned_response.strip().split("\n") for line in lines: line = line.strip() if not line: continue if " - " in line: timestamp, title = line.split(" - ", 1) key_frames.append({"timecode": timestamp.strip(), "title": title.strip()}) elif ": " in line and len(line.split(":")[0]) == 2: # Check for HH:MM:SS format timestamp, title = line.split(": ", 1) key_frames.append({"timecode": timestamp.strip(), "title": title.strip()}) elif len(line.split(":")) == 3: # Rough check for standalone HH:MM:SS key_frames.append({"timecode": line.strip(), "title": "Untitled"}) for frame in key_frames: timestamp = frame.get("timecode", frame.get("timestamp", "")) title = frame.get("title", frame.get("caption", "Untitled")) if not timestamp: continue seconds = hhmmss_to_seconds(timestamp) if seconds == 0.0: # Skip invalid timestamps continue cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000) ret, frame_img = cap.read() if ret: frame_rgb = cv2.cvtColor(frame_img, cv2.COLOR_BGR2RGB) caption = f"{timestamp}: {title}" extracted_frames.append((frame_rgb, caption)) cap.release() return extracted_frames def analyze_video(video_file: str, user_query: str) -> tuple[str, list]: """ Analyze the video using the Gemini API and extract key frames. Args: video_file (str): Path to the video file user_query (str): Optional query to guide the analysis Returns: tuple: (Markdown report, list of key frames as (image, caption) tuples) """ # Validate input if not video_file or not os.path.exists(video_file): return "Please upload a valid video file.", [] if not video_file.lower().endswith('.mp4'): return "Please upload an MP4 video file.", [] try: # Upload and process the video video_file_obj = upload_and_process_video(video_file) # Step 1: Generate detailed summary summary_prompt = "Provide a detailed summary of this video with timestamps for key sections." if user_query: summary_prompt += f" Focus on: {user_query}" summary_response = client.models.generate_content( model=MODEL_NAME, contents=[video_file_obj, summary_prompt] ) summary = summary_response.text # Step 2: Extract key frames with few-shot examples key_frames_prompt = ( "Identify key frames in this video and return them as a JSON array. " "Each object must have 'timecode' (in HH:MM:SS format) and 'title' describing the scene. " "Ensure the response is valid JSON. Here are examples of the expected format:\n" "Example 1: For a video of a car chase:\n" "```json\n" "[\n" " {\"timecode\": \"00:00:00\", \"title\": \"Car chase begins on highway\"},\n" " {\"timecode\": \"00:00:10\", \"title\": \"Police car joins pursuit\"}\n" "]\n" "```\n" "Example 2: For a nature video:\n" "```json\n" "[\n" " {\"timecode\": \"00:00:05\", \"title\": \"Bird flies across screen\"},\n" " {\"timecode\": \"00:00:15\", \"title\": \"Deer appears in forest\"}\n" "]\n" "```\n" "Now, provide the key frames for this video in the same JSON format." ) if user_query: key_frames_prompt += f" Focus on: {user_query}" key_frames_response = client.models.generate_content( model=MODEL_NAME, contents=[video_file_obj, key_frames_prompt] ) key_frames = extract_key_frames(video_file, key_frames_response.text) # Generate Markdown report markdown_report = ( "## Video Analysis Report\n\n" f"**Summary:**\n{summary}\n" ) if key_frames: markdown_report += "\n**Key Frames Identified:**\n" for i, (_, caption) in enumerate(key_frames, 1): markdown_report += f"- Frame {i}: {caption}\n" else: markdown_report += "\n*No key frames extracted. Check the console for the raw response.*\n" return markdown_report, key_frames except Exception as e: error_msg = ( "## Video Analysis Report\n\n" f"**Error:** Unable to analyze video.\n" f"Details: {str(e)}\n" "Please check your API key, ensure the video is valid, or try again later." ) return error_msg, [] # Define the Gradio interface iface = gr.Interface( fn=analyze_video, inputs=[ gr.Video(label="Upload Video File (MP4)"), gr.Textbox(label="Analysis Query (optional)", placeholder="e.g., focus on main events or themes") ], outputs=[ gr.Markdown(label="Video Analysis Report"), gr.Gallery(label="Key Frames", columns=2) ], title="AI Video Analysis Agent with Gemini", description=( "Upload an MP4 video to get a detailed summary and key frames using Google's Gemini API. " "This tool analyzes the video content directly and extracts key moments as images. " "Optionally, provide a query to guide the analysis." ) ) if __name__ == "__main__": iface.launch(share=True)