|
import os |
|
from typing import Optional, Dict, Any |
|
from llama_index.readers.whisper import WhisperReader |
|
from llama_index.core.tools import FunctionTool |
|
from llama_index.core import SimpleDirectoryReader |
|
from llama_index.readers.file import ( |
|
ImageReader |
|
) |
|
import base64 |
|
import sys |
|
import traceback |
|
from PIL import Image |
|
from llama_index.llms.openai import OpenAI |
|
from llama_index.llms.anthropic import Anthropic |
|
|
|
class WhisperTranscriber: |
|
"""Class for transcribing audio using OpenAI's Whisper model.""" |
|
|
|
def __init__(self, model: str = "whisper-1", api_key: Optional[str] = None): |
|
"""Initialize the WhisperTranscriber with specified model and API key.""" |
|
self.api_key = api_key or os.getenv("OPENAI_API_KEY") |
|
self.model = model |
|
self.reader = WhisperReader( |
|
model=self.model, |
|
api_key=self.api_key, |
|
) |
|
|
|
def transcribe(self, audio_file_path: str) -> str: |
|
""" |
|
Transcribe an audio file to text. |
|
|
|
Args: |
|
audio_file_path: Path to the audio file (.mp3, .wav, etc.) |
|
|
|
Returns: |
|
Transcribed text from the audio file |
|
""" |
|
try: |
|
|
|
documents = self.reader.load_data(audio_file_path) |
|
|
|
|
|
if documents and len(documents) > 0: |
|
transcription = " ".join([doc.text for doc in documents if hasattr(doc, 'text')]) |
|
return transcription |
|
return "No transcription was generated from the audio file." |
|
except Exception as e: |
|
return f"Error transcribing audio file: {str(e)}" |
|
|
|
|
|
|
|
whisper_transcriber = WhisperTranscriber() |
|
|
|
|
|
transcribe_audio_tool = FunctionTool.from_defaults( |
|
name="transcribe_audio", |
|
description="Transcribes speech from an audio file to text using OpenAI's Whisper model. Provide the full path to the audio file.", |
|
fn=whisper_transcriber.transcribe |
|
) |
|
|
|
|
|
def encode_image_to_base64(file_path: str) -> str: |
|
""" |
|
Reads an image file and encodes it to a base64 string. |
|
|
|
This function focuses exclusively on generating a base64 encoded string from an image file. |
|
|
|
Args: |
|
file_path (str): Path to the image file to be encoded |
|
|
|
Returns: |
|
str: The base64 encoded string of the image |
|
|
|
Raises: |
|
FileNotFoundError: If the specified file doesn't exist |
|
ValueError: If the file has an unsupported extension |
|
|
|
Examples: |
|
>>> base64_data = encode_image_to_base64("data/photo.jpg") |
|
""" |
|
|
|
if not os.path.exists(file_path): |
|
raise FileNotFoundError(f"File not found at {file_path}") |
|
|
|
|
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
supported_formats = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp'] |
|
|
|
if file_ext not in supported_formats: |
|
raise ValueError(f"Unsupported file extension: {file_ext}. Supported extensions are: {', '.join(supported_formats)}") |
|
|
|
with open(file_path, "rb") as image_file: |
|
encoded_string = base64.b64encode(image_file.read()) |
|
base64_image = encoded_string.decode('utf-8') |
|
|
|
return base64_image |
|
|
|
|
|
encode_image_tool = FunctionTool.from_defaults( |
|
name="encode_image_to_base64", |
|
description="Reads an image file and converts it to a base64 encoded string. Use this tool to prepare images for vision analysis.", |
|
fn=encode_image_to_base64 |
|
) |
|
|
|
class VisionAnalyzerAgent: |
|
""" |
|
A specialized agent for analyzing images using vision models. |
|
|
|
This agent can process images, analyze their content, and provide detailed descriptions |
|
or answer questions about the visual elements. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
model_provider: str = "openai", |
|
model_name: str = "gpt-4o", |
|
api_key: Optional[str] = None, |
|
**kwargs |
|
): |
|
""" |
|
Initialize a VisionAnalyzerAgent. |
|
|
|
Args: |
|
model_provider: The LLM provider to use ("anthropic" or "openai") |
|
model_name: The specific model name to use |
|
api_key: API key for the provider (defaults to environment variable) |
|
**kwargs: Additional parameters for the model |
|
""" |
|
self.model_provider = model_provider.lower() |
|
self.model_name = model_name |
|
self.api_key = api_key |
|
|
|
|
|
if self.model_provider == "anthropic": |
|
self.client = Anthropic(api_key=api_key or os.getenv("ANTHROPIC_API_KEY")) |
|
elif self.model_provider == "openai": |
|
self.client = OpenAI(api_key=api_key or os.getenv("OPENAI_API_KEY")) |
|
else: |
|
raise ValueError(f"Unsupported model provider: {model_provider}. " |
|
f"Supported providers are: anthropic, openai") |
|
|
|
def analyze_image(self, image_base64: str, query: str = "Describe this image in detail.") -> str: |
|
""" |
|
Analyze an image using the vision model. |
|
|
|
Args: |
|
image_base64: Base64 encoded image data |
|
query: The question or instruction for image analysis |
|
|
|
Returns: |
|
str: The analysis result from the vision model |
|
""" |
|
|
|
if self.model_provider == "anthropic": |
|
|
|
try: |
|
|
|
mime_type = "image/jpeg" |
|
if image_base64.startswith('/9j/'): |
|
mime_type = "image/jpeg" |
|
elif image_base64.startswith('iVBORw0KGgo'): |
|
mime_type = "image/png" |
|
|
|
|
|
response = self.client.messages.create( |
|
model=self.model_name, |
|
max_tokens=1024, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": query |
|
}, |
|
{ |
|
"type": "image", |
|
"source": { |
|
"type": "base64", |
|
"media_type": mime_type, |
|
"data": image_base64 |
|
} |
|
} |
|
] |
|
} |
|
] |
|
) |
|
return response.content[0].text |
|
|
|
except Exception as e: |
|
return f"Error analyzing image with Anthropic: {str(e)}" |
|
|
|
elif self.model_provider == "openai": |
|
|
|
try: |
|
response = self.client.chat.completions.create( |
|
model=self.model_name, |
|
max_tokens=1024*20, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": query |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:image/jpeg;base64,{image_base64}" |
|
} |
|
} |
|
] |
|
} |
|
] |
|
) |
|
return response.choices[0].message.content |
|
|
|
except Exception as e: |
|
return f"Error analyzing image with OpenAI: {str(e)}" |
|
|
|
else: |
|
return "Unsupported model provider" |
|
|
|
|
|
def analyze_image_with_vision(image_path: str, query: str = "Describe this image in detail.") -> str: |
|
""" |
|
Analyze an image using a vision-enabled model. |
|
|
|
Args: |
|
image_path: Path to the image file |
|
query: The question or instruction for image analysis |
|
|
|
Returns: |
|
str: The analysis result from the vision model |
|
""" |
|
try: |
|
|
|
base64_image = encode_image_to_base64(image_path) |
|
|
|
|
|
vision_agent = VisionAnalyzerAgent() |
|
result = vision_agent.analyze_image(base64_image, query) |
|
|
|
return result |
|
except Exception as e: |
|
return f"Error analyzing image: {str(e)}" |
|
|
|
|
|
vision_analyzer_tool = FunctionTool.from_defaults( |
|
name="analyze_image_with_vision", |
|
description="Analyzes images using a vision-enabled model. Provide the image path and an optional query/instruction.", |
|
fn=analyze_image_with_vision |
|
) |