import os import json import gradio as gr import cv2 from google import genai from google.genai.types import Part, GenerateContentConfig from tenacity import retry, stop_after_attempt, wait_random_exponential # Retrieve API key from environment variables. GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") if not GOOGLE_API_KEY: raise ValueError("Please set the GOOGLE_API_KEY environment variable.") # Initialize the Gemini API client via AI Studio using the API key. client = genai.Client(api_key=GOOGLE_API_KEY) # Use the Gemini 2.0 Flash model. MODEL_NAME = "gemini-2.0-flash-001" @retry(wait=wait_random_exponential(multiplier=1, max=60), stop=stop_after_attempt(3)) def call_gemini(video_file: str, prompt: str, config: GenerateContentConfig = None) -> str: """ Call the Gemini model with the provided video file and prompt. The video file is read as bytes and passed with MIME type "video/mp4". Optionally accepts a config (e.g. response_schema) for structured output. """ with open(video_file, "rb") as f: file_bytes = f.read() response = client.models.generate_content( model=MODEL_NAME, contents=[ Part(file_data=file_bytes, mime_type="video/mp4"), Part(text=prompt) ], config=config ) return response.text def hhmmss_to_seconds(time_str: str) -> float: """ Convert a HH:MM:SS formatted string into seconds. """ parts = time_str.strip().split(":") parts = [float(p) for p in parts] if len(parts) == 3: return parts[0] * 3600 + parts[1] * 60 + parts[2] elif len(parts) == 2: return parts[0] * 60 + parts[1] else: return parts[0] def get_key_frames(video_file: str, analysis: str, user_query: str) -> list: """ Prompt Gemini to return key frame timestamps (in HH:MM:SS) with descriptions, then extract those frames from the uploaded video file using OpenCV. Returns a list of tuples: (image_array, caption) """ # Define a response schema for key frames. response_schema = { "type": "ARRAY", "items": { "type": "OBJECT", "properties": { "timestamp": {"type": "string"}, "description": {"type": "string"} }, "required": ["timestamp", "description"] } } config = GenerateContentConfig( temperature=0.0, max_output_tokens=1024, response_mime_type="application/json", response_schema=response_schema ) prompt = ( "From the following video analysis, list key frames with their timestamps (in HH:MM:SS format) " "and a brief description of the important event at that timestamp. " "Return the result as a JSON array of objects with keys 'timestamp' and 'description'." ) prompt += f" Video Analysis: {analysis}" if user_query: prompt += f" Additional focus: {user_query}" try: key_frames_response = call_gemini(video_file, prompt, config=config) key_frames = json.loads(key_frames_response) if not isinstance(key_frames, list): key_frames = [] except Exception as e: print("Error in key frame extraction:", e) key_frames = [] extracted_frames = [] cap = cv2.VideoCapture(video_file) if not cap.isOpened(): print("Error: Could not open the uploaded video file.") return extracted_frames for frame_obj in key_frames: ts = frame_obj.get("timestamp") description = frame_obj.get("description", "") try: seconds = hhmmss_to_seconds(ts) except Exception: continue # Set video position (in milliseconds) cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000) ret, frame = cap.read() if ret: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) caption = f"{ts}: {description}" extracted_frames.append((frame_rgb, caption)) cap.release() return extracted_frames def analyze_video(video_file: str, user_query: str) -> (str, list): """ Perform iterative, agentic video analysis on the uploaded file. First, refine the video analysis over several iterations. Then, prompt the model to identify key frames. Returns: - A Markdown report as a string. - A gallery list of key frames (each as a tuple of (image, caption)). """ analysis = "" num_iterations = 3 for i in range(num_iterations): base_prompt = ( "You are a video analysis agent focusing on security and surveillance. " "Provide a detailed summary of the video, highlighting key events, suspicious activities, or anomalies." ) if user_query: base_prompt += f" Also, focus on the following query: {user_query}" if i == 0: prompt = base_prompt else: prompt = ( f"Based on the previous analysis: \"{analysis}\". " "Provide further elaboration and refined insights, focusing on potential security threats, anomalous events, " "and details that would help a security team understand the situation better." ) if user_query: prompt += f" Remember to focus on: {user_query}" try: analysis = call_gemini(video_file, prompt) except Exception as e: analysis += f"\n[Error during iteration {i+1}: {e}]" break markdown_report = f"## Video Analysis Report\n\n**Summary:**\n\n{analysis}\n" key_frames_gallery = get_key_frames(video_file, analysis, user_query) if not key_frames_gallery: markdown_report += "\n*No key frames were extracted.*\n" else: markdown_report += "\n**Key Frames Extracted:**\n" for idx, (img, caption) in enumerate(key_frames_gallery, start=1): markdown_report += f"- **Frame {idx}:** {caption}\n" return markdown_report, key_frames_gallery def gradio_interface(video_file, user_query: str) -> (str, list): """ Gradio interface function that accepts an uploaded video file and an optional query, then returns a Markdown report and a gallery of key frame images with captions. """ if not video_file: return "Please upload a valid video file.", [] return analyze_video(video_file, user_query) iface = gr.Interface( fn=gradio_interface, inputs=[ gr.Video(label="Upload Video File"), gr.Textbox(label="Analysis Query (optional): guide the focus of the analysis", placeholder="e.g., focus on unusual movements near the entrance") ], outputs=[ gr.Markdown(label="Security & Surveillance Analysis Report"), gr.Gallery(label="Extracted Key Frames", columns=2) ], title="AI Video Analysis and Summariser Agent", description=( "This agentic video analysis tool uses Google's Gemini 2.0 Flash model via AI Studio " "to iteratively analyze an uploaded video for security and surveillance insights. " "Provide a video file and, optionally, a query to guide the analysis. The tool returns a detailed " "Markdown report along with a gallery of key frame images." ) ) if __name__ == "__main__": iface.launch()