File size: 10,792 Bytes
c79086f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import tempfile
import requests
import os

from time import sleep
from urllib.parse import urlparse
from typing import Optional, List
import yt_dlp
from google.genai import types

from PIL import Image
from smolagents import CodeAgent, tool, OpenAIServerModel, LiteLLMModel
from google import genai
from dotenv import load_dotenv
from model_provider import create_react_model, create_vision_model
import imageio

load_dotenv(override=True)

@tool
def use_vision_model(question: str, images: List[Image.Image]) -> str:
    """
    Use a Vision Model to answer a question about a set of images.  
    Always use this tool to ask questions about a set of images you have been provided.
    This function uses an image-to-text AI model.  
    You can ask a question about a list of one image or a list of multiple images.  
    So, if you have multiple images that you want to ask the same question of, pass the entire list of images to the model.
    Ensure your prompt is specific enough to retrieve the exact information you are looking for.
    
    Args:
        question: The question to ask about the images.  Type: str
        images: The list of images to as the question about.  Type: List[PIL.Image.Image]
    """
    image_model = create_vision_model()

    content = [
        {
            "type": "text",
            "text": question
        }
    ]
    print(f"Asking model a question about {len(images)} images")
    for image in images:
        content.append({
            "type": "image",
            "image": image  # ✅ Directly the PIL Image, no wrapping
        })

    messages = [
        {
            "role": "user",
            "content": content
        }
    ]

    output = image_model(messages).content
    print(f'Model returned: {output}')
    return output

@tool
def youtube_frames_to_images(url: str, sample_interval_frames: int = 24) -> List[Image.Image]:
    """
    Reviews a YouTube video and returns a List of PIL Images (List[PIL.Image.Image]), which can then be reviewed by a vision model.
    Only use this tool if you have been given a YouTube video that you need to analyze.
    This will generate a list of images, and you can use the use_vision_model tool to analyze those images
    Args:
        url: The Youtube URL
        sample_interval_frames: The sampling interval (default is 24 frames)
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        # Download the video locally
        ydl_opts = {
            'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
            'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'merge_output_format': 'mp4',
            'force_ipv4': True,  # Avoid IPv6 issues
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
        
        # Find the downloaded file
        video_path = None
        for file in os.listdir(tmpdir):
            if file.endswith('.mp4'):
                video_path = os.path.join(tmpdir, file)
                break
        
        if not video_path:
            raise RuntimeError("Failed to download video as mp4")

        # ✅ Fix: Use `imageio.get_reader()` instead of `imopen()`
        reader = imageio.get_reader(video_path)  # Works for frame-by-frame iteration
        # metadata = reader.get_meta_data()
        # fps = metadata.get('fps')
        
        # if fps is None:
        #     reader.close()
        #     raise RuntimeError("Unable to determine FPS from video metadata")

        # frame_interval = int(fps * sample_interval_frames)
        frame_interval = sample_interval_frames  # Use the provided interval directly
        images: List[Image.Image] = []

        # ✅ Iterate over frames using `get_reader()`
        for idx, frame in enumerate(reader):
            print(f"Processing frame {idx}")
            if idx % frame_interval == 0:
                images.append(Image.fromarray(frame))

        reader.close()
        return images

@tool
def review_youtube_video(url: str, question: str) -> str:
    """
    Reviews a YouTube video and answers a specific question about that video.  

    Args:
        url (str): the URL to the YouTube video.  Should be like this format: https://www.youtube.com/watch?v=9hE5-98ZeCg
        question (str): The question you are asking about the video
    """
    try:
        client = genai.Client(api_key=os.getenv('GEMINI_KEY'))
        model = 'gemini-2.0-flash-lite'
        response = client.models.generate_content(
            model=model,
            contents=types.Content(
                parts=[
                    types.Part(
                        file_data=types.FileData(file_uri=url)
                    ),
                    types.Part(text=question)
                ]
            )
        )
        return response.text
    except Exception as e:
        return f"Error asking {model} about video: {str(e)}"


@tool
def read_file(filepath: str ) -> str:
    """
    Used to read the content of a file.  Returns the content as a string.
    Will only work for text-based files, such as .txt files or code files.
    Do not use for audio or visual files. 
    
    Args:
        filepath (str): The path to the file to be read.

    Returns:
        str: Content of the file as a string.
    
    Raises:
        IOError: If there is an error opening or reading from the file.
    """
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            content = file.read()
        print(content)
        return content
    except FileNotFoundError:
        print(f"File not found: {filepath}")
    except IOError as e:
        print(f"Error reading file: {str(e)}")


@tool
def extract_text_from_image(image_path: str) -> str:
    """
    Extract text from an image using pytesseract (if available).
    
    Args:
        image_path: Path to the image file
        
    Returns:
        Extracted text or error message
    """
    try:
        # Try to import pytesseract
        import pytesseract
        from PIL import Image
        
        # Open the image
        image = Image.open(image_path)
        
        # Extract text
        text = pytesseract.image_to_string(image)
        
        return f"Extracted text from image:\n\n{text}"
    except ImportError:
        return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system."
    except Exception as e:
        return f"Error extracting text from image: {str(e)}"

@tool
def analyze_csv_file(file_path: str, query: str) -> str:
    """
    Analyze a CSV file using pandas and answer a question about it.  
    To use this file you need to have saved it in a location and pass that location to the function.
    The download_file_from_url tool will save it by name to tempfile.gettempdir()
    
    Args:
        file_path: Path to the CSV file
        query: Question about the data
        
    Returns:
        Analysis result or error message
    """
    try:
        import pandas as pd
        
        # Read the CSV file
        df = pd.read_csv(file_path)
        
        # Run various analyses based on the query
        result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
        result += f"Columns: {', '.join(df.columns)}\n\n"
        
        # Add summary statistics
        result += "Summary statistics:\n"
        result += str(df.describe())
        
        return result
    except ImportError:
        return "Error: pandas is not installed. Please install it with 'pip install pandas'."
    except Exception as e:
        return f"Error analyzing CSV file: {str(e)}"

@tool
def analyze_excel_file(file_path: str, query: str) -> str:
    """
    Analyze an Excel file using pandas and answer a question about it.
    To use this file you need to have saved it in a location and pass that location to the function.
    The download_file_from_url tool will save it by name to tempfile.gettempdir()
    
    Args:
        file_path: Path to the Excel file
        query: Question about the data
        
    Returns:
        Analysis result or error message
    """
    try:
        import pandas as pd
        
        # Read the Excel file
        df = pd.read_excel(file_path)
        
        # Run various analyses based on the query
        result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
        result += f"Columns: {', '.join(df.columns)}\n\n"
        
        # Add summary statistics
        result += "Summary statistics:\n"
        result += str(df.describe())
        
        return result
    except ImportError:
        return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'."
    except Exception as e:
        return f"Error analyzing Excel file: {str(e)}"

import whisper

@tool
def youtube_transcribe(url: str) -> str:
    """
    Transcribes a YouTube video.  Use when you need to process the audio from a YouTube video into Text.

    Args:
        url: Url of the YouTube video
    """
    model_size: str = "base"
    # Load model
    model = whisper.load_model(model_size)
    with tempfile.TemporaryDirectory() as tmpdir:
        # Download audio
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
            'quiet': True,
            'noplaylist': True,
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'wav',
                'preferredquality': '192',
            }],
            'force_ipv4': True,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)

        audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
        if not audio_path:
            raise RuntimeError("Failed to find audio")

        # Transcribe
        result = model.transcribe(audio_path)
        return result['text']

@tool
def transcribe_audio(audio_file_path: str) -> str:
    """
    Transcribes an audio file.  Use when you need to process audio data.
    DO NOT use this tool for YouTube video; use the youtube_transcribe tool to process audio data from YouTube.
    Use this tool when you have an audio file in .mp3, .wav, .aac, .ogg, .flac, .m4a, .alac or .wma

    Args:
        audio_file_path: Filepath to the audio file (str)
    """
    model_size: str = "small"
    # Load model
    model = whisper.load_model(model_size)
    result = model.transcribe(audio_file_path)
    return result['text']

# global driver
# driver = initialize_driver()