File size: 6,133 Bytes
f8aaa9d
7c2c622
f8aaa9d
7c2c622
f8aaa9d
c43728b
f8aaa9d
 
0f96bc2
f8aaa9d
 
 
 
0425992
f8aaa9d
 
0f96bc2
830c9fb
f8aaa9d
 
c43728b
f8aaa9d
001b623
d638712
0425992
f8aaa9d
001b623
 
f8aaa9d
 
 
001b623
0425992
c43728b
f8aaa9d
 
 
d638712
 
 
 
 
 
 
 
 
 
7c2c622
f8aaa9d
7c2c622
0f96bc2
7c2c622
 
 
cba459f
7c2c622
cba459f
7c2c622
 
 
63595a8
7c2c622
d638712
63595a8
4938676
0425992
7c2c622
 
 
 
0425992
4938676
7c2c622
63595a8
7c2c622
63595a8
7c2c622
d638712
 
 
 
 
 
 
 
 
 
0f96bc2
7c2c622
001b623
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c622
 
001b623
0f96bc2
d638712
0425992
d638712
7c2c622
 
0425992
7c2c622
f8aaa9d
0425992
63595a8
 
d638712
0425992
d638712
63595a8
7c2c622
 
 
 
 
 
 
 
001b623
f8aaa9d
001b623
d638712
f8aaa9d
001b623
 
 
f8aaa9d
 
 
0f96bc2
03c6357
0f96bc2
 
 
 
3f2c22a
0f96bc2
f8aaa9d
 
63595a8
 
 
f8aaa9d
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import os
import json
import gradio as gr
import cv2
from google import genai
from google.genai.types import Part
from tenacity import retry, stop_after_attempt, wait_random_exponential

# Retrieve API key from environment variables.
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("Please set the GOOGLE_API_KEY environment variable.")

# Initialize the Gemini API client via AI Studio.
client = genai.Client(api_key=GOOGLE_API_KEY)

# Use the Gemini 2.0 Flash model.
MODEL_NAME = "gemini-2.0-flash-001"

@retry(wait=wait_random_exponential(multiplier=1, max=60), stop=stop_after_attempt(3))
def call_gemini(video_file: str, prompt: str) -> str:
    """
    Call the Gemini model with the provided video file and prompt.
    The video file is read as bytes and passed with MIME type "video/mp4",
    and the prompt is wrapped as a text part.
    """
    with open(video_file, "rb") as f:
        file_bytes = f.read()
    response = client.models.generate_content(
        model=MODEL_NAME,
        contents=[
            Part(file_data=file_bytes, mime_type="video/mp4"),
            Part(text=prompt)
        ]
    )
    return response.text

def safe_call_gemini(video_file: str, prompt: str) -> str:
    """
    Wrapper for call_gemini that catches exceptions and returns a fallback string.
    """
    try:
        return call_gemini(video_file, prompt)
    except Exception as e:
        print("Gemini call failed:", e)
        return "No summary available."

def hhmmss_to_seconds(time_str: str) -> float:
    """
    Convert a HH:MM:SS formatted string into seconds.
    """
    parts = time_str.strip().split(":")
    parts = [float(p) for p in parts]
    if len(parts) == 3:
        return parts[0] * 3600 + parts[1] * 60 + parts[2]
    elif len(parts) == 2:
        return parts[0] * 60 + parts[1]
    else:
        return parts[0]

def get_key_frames(video_file: str, summary: str, user_query: str) -> list:
    """
    Ask Gemini to output key timestamps and descriptions as plain text.
    The prompt instructs the model to output one line per event in the format:
    HH:MM:SS - description
    We then parse these lines and extract the corresponding frames using OpenCV.
    
    Returns a list of tuples: (image_array, caption)
    """
    prompt = (
        "List the key timestamps in the video and a brief description of the event at that time. "
        "Output one line per event in the following format: HH:MM:SS - description. Do not include any extra text."
    )
    prompt += f" Video Summary: {summary}"
    if user_query:
        prompt += f" Focus on: {user_query}"
    
    # Use the safe call to get a response or fallback text.
    key_frames_response = safe_call_gemini(video_file, prompt)
    lines = key_frames_response.strip().split("\n")
    key_frames = []
    for line in lines:
        if " - " in line:
            parts = line.split(" - ", 1)
            timestamp = parts[0].strip()
            description = parts[1].strip()
            key_frames.append({"timestamp": timestamp, "description": description})
    
    extracted_frames = []
    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        print("Error: Could not open the uploaded video file.")
        return extracted_frames

    for frame_obj in key_frames:
        ts = frame_obj.get("timestamp")
        description = frame_obj.get("description", "")
        try:
            seconds = hhmmss_to_seconds(ts)
        except Exception:
            continue
        cap.set(cv2.CAP_PROP_POS_MSEC, seconds * 1000)
        ret, frame = cap.read()
        if ret:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            caption = f"{ts}: {description}"
            extracted_frames.append((frame_rgb, caption))
    cap.release()
    return extracted_frames

def analyze_video(video_file: str, user_query: str) -> (str, list):
    """
    Perform video analysis on the uploaded file.
    First, call Gemini with a simple prompt to get a brief summary.
    Then, call Gemini to list key timestamps and descriptions.
    
    Returns:
      - A Markdown report summarizing the video.
      - A gallery list of key frames (each as a tuple of (image, caption)).
    """
    summary_prompt = "Summarize this video."
    if user_query:
        summary_prompt += f" Also focus on: {user_query}"
    summary = safe_call_gemini(video_file, summary_prompt)
    
    markdown_report = f"## Video Analysis Report\n\n**Summary:**\n\n{summary}\n"
    key_frames_gallery = get_key_frames(video_file, summary, user_query)
    if not key_frames_gallery:
        markdown_report += "\n*No key frames were extracted.*\n"
    else:
        markdown_report += "\n**Key Frames Extracted:**\n"
        for idx, (img, caption) in enumerate(key_frames_gallery, start=1):
            markdown_report += f"- **Frame {idx}:** {caption}\n"
    return markdown_report, key_frames_gallery

def gradio_interface(video_file, user_query: str) -> (str, list):
    """
    Gradio interface function that accepts an uploaded video file and an optional query,
    then returns a Markdown report and a gallery of key frame images with captions.
    """
    if not video_file:
        return "Please upload a valid video file.", []
    return analyze_video(video_file, user_query)

iface = gr.Interface(
    fn=gradio_interface,
    inputs=[
        gr.Video(label="Upload Video File"),
        gr.Textbox(label="Analysis Query (optional): guide the focus of the analysis", placeholder="e.g., focus on unusual movements near the entrance")
    ],
    outputs=[
        gr.Markdown(label="Security & Surveillance Analysis Report"),
        gr.Gallery(label="Extracted Key Frames", columns=2)
    ],
    title="AI Video Analysis and Summariser Agent",
    description=(
        "This tool uses Google's Gemini 2.0 Flash model via AI Studio to analyze an uploaded video. "
        "It returns a brief summary and extracts key frames based on that summary. "
        "Provide a video file and, optionally, a query to guide the analysis."
    )
)

if __name__ == "__main__":
    iface.launch()