codelion commited on
Commit
d38e256
·
verified ·
1 Parent(s): b3e97a9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +111 -25
app.py CHANGED
@@ -1,6 +1,8 @@
1
  import os
2
  import time
 
3
  import gradio as gr
 
4
  from google import genai
5
  from google.genai import types
6
 
@@ -11,22 +13,28 @@ if not GOOGLE_API_KEY:
11
 
12
  # Initialize the Gemini API client
13
  client = genai.Client(api_key=GOOGLE_API_KEY)
14
- MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Model from the notebook that supports video analysis
15
 
16
- def upload_and_process_video(video_file: str) -> types.File:
17
  """
18
  Upload a video file to the Gemini API and wait for processing.
19
 
20
  Args:
21
  video_file (str): Path to the video file
 
22
 
23
  Returns:
24
  types.File: Processed video file object
25
  """
26
  try:
27
  video_file_obj = client.files.upload(file=video_file)
 
 
28
  while video_file_obj.state == "PROCESSING":
29
- print(f"Processing {video_file}...")
 
 
 
30
  time.sleep(10)
31
  video_file_obj = client.files.get(name=video_file_obj.name)
32
 
@@ -38,74 +46,152 @@ def upload_and_process_video(video_file: str) -> types.File:
38
  except Exception as e:
39
  raise Exception(f"Error uploading video: {str(e)}")
40
 
41
- def analyze_video(video_file: str, user_query: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  """
43
- Analyze the video using the Gemini API and return a summary.
44
 
45
  Args:
46
  video_file (str): Path to the video file
47
  user_query (str): Optional query to guide the analysis
48
 
49
  Returns:
50
- str: Markdown-formatted report
51
  """
52
  # Validate input
53
  if not video_file or not os.path.exists(video_file):
54
- return "Please upload a valid video file."
55
  if not video_file.lower().endswith('.mp4'):
56
- return "Please upload an MP4 video file."
57
 
58
  try:
59
  # Upload and process the video
60
  video_file_obj = upload_and_process_video(video_file)
61
 
62
- # Prepare prompt
63
- prompt = "Provide a detailed summary of this video."
64
  if user_query:
65
- prompt += f" Focus on: {user_query}"
 
 
 
 
 
 
66
 
67
- # Analyze video with Gemini API
68
- response = client.models.generate_content(
 
 
 
 
 
 
 
69
  model=MODEL_NAME,
70
- contents=[
71
- video_file_obj, # Pass the processed video file object
72
- prompt
73
- ]
74
  )
75
- summary = response.text
 
 
 
76
 
77
  # Generate Markdown report
78
  markdown_report = (
79
  "## Video Analysis Report\n\n"
80
  f"**Summary:**\n{summary}\n"
 
81
  )
82
- return markdown_report
 
 
 
 
 
 
 
83
 
84
  except Exception as e:
85
  error_msg = (
86
  "## Video Analysis Report\n\n"
87
  f"**Error:** Unable to analyze video.\n"
88
  f"Details: {str(e)}\n"
 
89
  )
90
- return error_msg
91
 
92
  # Define the Gradio interface
93
  iface = gr.Interface(
94
  fn=analyze_video,
95
  inputs=[
96
- gr.Video(label="Upload Video File (MP4)"), # Removed type="filepath"
97
  gr.Textbox(label="Analysis Query (optional)",
98
  placeholder="e.g., focus on main events or themes")
99
  ],
100
- outputs=gr.Markdown(label="Video Analysis Report"),
 
 
 
101
  title="AI Video Analysis Agent with Gemini",
102
  description=(
103
- "Upload an MP4 video to get a summary using Google's Gemini API. "
104
- "This tool analyzes the video content directly without audio or frame extraction. "
105
  "Optionally, provide a query to guide the analysis."
106
  )
107
  )
108
 
109
  if __name__ == "__main__":
110
- # Launch with share=True to create a public link
111
  iface.launch(share=True)
 
1
  import os
2
  import time
3
+ import json
4
  import gradio as gr
5
+ import cv2
6
  from google import genai
7
  from google.genai import types
8
 
 
13
 
14
  # Initialize the Gemini API client
15
  client = genai.Client(api_key=GOOGLE_API_KEY)
16
+ MODEL_NAME = "gemini-2.5-pro-exp-03-25" # Model supporting video analysis
17
 
18
+ def upload_and_process_video(video_file: str, timeout: int = 300) -> types.File:
19
  """
20
  Upload a video file to the Gemini API and wait for processing.
21
 
22
  Args:
23
  video_file (str): Path to the video file
24
+ timeout (int): Maximum time to wait for processing in seconds (default: 5 minutes)
25
 
26
  Returns:
27
  types.File: Processed video file object
28
  """
29
  try:
30
  video_file_obj = client.files.upload(file=video_file)
31
+ start_time = time.time()
32
+
33
  while video_file_obj.state == "PROCESSING":
34
+ elapsed_time = time.time() - start_time
35
+ if elapsed_time > timeout:
36
+ raise TimeoutError(f"Video processing timed out after {timeout} seconds.")
37
+ print(f"Processing {video_file}... ({int(elapsed_time)}s elapsed)")
38
  time.sleep(10)
39
  video_file_obj = client.files.get(name=video_file_obj.name)
40
 
 
46
  except Exception as e:
47
  raise Exception(f"Error uploading video: {str(e)}")
48
 
49
+ def hhmmss_to_seconds(timestamp: str) -> float:
50
+ """
51
+ Convert HH:MM:SS timestamp to seconds.
52
+
53
+ Args:
54
+ timestamp (str): Time in HH:MM:SS format
55
+
56
+ Returns:
57
+ float: Time in seconds
58
+ """
59
+ h, m, s = map(float, timestamp.split(":"))
60
+ return h * 3600 + m * 60 + s
61
+
62
+ def extract_key_frames(video_file: str, key_frames_json: str) -> list:
63
+ """
64
+ Extract key frames from the video based on JSON data.
65
+
66
+ Args:
67
+ video_file (str): Path to the video file
68
+ key_frames_json (str): JSON string with key frames data
69
+
70
+ Returns:
71
+ list: List of tuples (image, caption)
72
+ """
73
+ try:
74
+ key_frames = json.loads(key_frames_json)
75
+ if not isinstance(key_frames, list):
76
+ raise ValueError("Key frames data must be a list of objects.")
77
+
78
+ extracted_frames = []
79
+ cap = cv2.VideoCapture(video_file)
80
+ if not cap.isOpened():
81
+ raise ValueError("Could not open video file.")
82
+
83
+ for frame in key_frames:
84
+ timestamp = frame.get("timecode", frame.get("timestamp", ""))
85
+ title = frame.get("title", frame.get("caption", "Untitled"))
86
+ if not timestamp:
87
+ continue
88
+
89
+ seconds = hhmmss_to_seconds(timestamp)
90
+ cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000)
91
+ ret, frame_img = cap.read()
92
+ if ret:
93
+ frame_rgb = cv2.cvtColor(frame_img, cv2.COLOR_BGR2RGB)
94
+ caption = f"{timestamp}: {title}"
95
+ extracted_frames.append((frame_rgb, caption))
96
+
97
+ cap.release()
98
+ return extracted_frames
99
+ except Exception as e:
100
+ print(f"Error extracting frames: {str(e)}")
101
+ return []
102
+
103
+ def analyze_video(video_file: str, user_query: str) -> tuple[str, list]:
104
  """
105
+ Analyze the video using the Gemini API and extract key frames.
106
 
107
  Args:
108
  video_file (str): Path to the video file
109
  user_query (str): Optional query to guide the analysis
110
 
111
  Returns:
112
+ tuple: (Markdown report, list of key frames as (image, caption) tuples)
113
  """
114
  # Validate input
115
  if not video_file or not os.path.exists(video_file):
116
+ return "Please upload a valid video file.", []
117
  if not video_file.lower().endswith('.mp4'):
118
+ return "Please upload an MP4 video file.", []
119
 
120
  try:
121
  # Upload and process the video
122
  video_file_obj = upload_and_process_video(video_file)
123
 
124
+ # Step 1: Generate detailed summary
125
+ summary_prompt = "Provide a detailed summary of this video with timestamps for key sections."
126
  if user_query:
127
+ summary_prompt += f" Focus on: {user_query}"
128
+
129
+ summary_response = client.models.generate_content(
130
+ model=MODEL_NAME,
131
+ contents=[video_file_obj, summary_prompt]
132
+ )
133
+ summary = summary_response.text
134
 
135
+ # Step 2: Extract key frames in an agentic loop
136
+ key_frames_prompt = (
137
+ "Identify key frames in this video and return them as a JSON array. "
138
+ "Each object should have 'timecode' (in HH:MM:SS format) and 'title' describing the scene."
139
+ )
140
+ if user_query:
141
+ key_frames_prompt += f" Focus on: {user_query}"
142
+
143
+ key_frames_response = client.models.generate_content(
144
  model=MODEL_NAME,
145
+ contents=[video_file_obj, key_frames_prompt]
 
 
 
146
  )
147
+ key_frames_json = key_frames_response.text
148
+
149
+ # Parse and extract frames
150
+ key_frames = extract_key_frames(video_file, key_frames_json)
151
 
152
  # Generate Markdown report
153
  markdown_report = (
154
  "## Video Analysis Report\n\n"
155
  f"**Summary:**\n{summary}\n"
156
+ f"**Video URI:** {video_file_obj.uri}\n"
157
  )
158
+ if key_frames:
159
+ markdown_report += "\n**Key Frames Identified:**\n"
160
+ for i, (_, caption) in enumerate(key_frames, 1):
161
+ markdown_report += f"- Frame {i}: {caption}\n"
162
+ else:
163
+ markdown_report += "\n*No key frames extracted.*\n"
164
+
165
+ return markdown_report, key_frames
166
 
167
  except Exception as e:
168
  error_msg = (
169
  "## Video Analysis Report\n\n"
170
  f"**Error:** Unable to analyze video.\n"
171
  f"Details: {str(e)}\n"
172
+ "Please check your API key, ensure the video is valid, or try again later."
173
  )
174
+ return error_msg, []
175
 
176
  # Define the Gradio interface
177
  iface = gr.Interface(
178
  fn=analyze_video,
179
  inputs=[
180
+ gr.Video(label="Upload Video File (MP4)"),
181
  gr.Textbox(label="Analysis Query (optional)",
182
  placeholder="e.g., focus on main events or themes")
183
  ],
184
+ outputs=[
185
+ gr.Markdown(label="Video Analysis Report"),
186
+ gr.Gallery(label="Key Frames", columns=2)
187
+ ],
188
  title="AI Video Analysis Agent with Gemini",
189
  description=(
190
+ "Upload an MP4 video to get a detailed summary and key frames using Google's Gemini API. "
191
+ "This tool analyzes the video content directly and extracts key moments as images. "
192
  "Optionally, provide a query to guide the analysis."
193
  )
194
  )
195
 
196
  if __name__ == "__main__":
 
197
  iface.launch(share=True)