|
import gradio as gr |
|
from openai import OpenAI |
|
import requests |
|
import json |
|
from typing import List, Dict, Optional, Tuple |
|
import random |
|
|
|
class GifChatBot: |
|
def __init__(self): |
|
self.openai_client = None |
|
self.giphy_key = None |
|
self.chat_history = [] |
|
self.is_initialized = False |
|
self.session = requests.Session() |
|
|
|
|
|
self.MINIMUM_FILE_SIZE = 500 * 1024 |
|
self.MAXIMUM_FILE_SIZE = 5 * 1024 * 1024 |
|
self.MAX_RETRIES = 10 |
|
|
|
def verify_gif_size(self, gif_url: str) -> bool: |
|
"""Verify if GIF meets size requirements""" |
|
try: |
|
|
|
response = self.session.head(gif_url, timeout=3) |
|
if response.status_code != 200: |
|
return False |
|
|
|
|
|
content_length = response.headers.get('content-length') |
|
if not content_length: |
|
return False |
|
|
|
file_size = int(content_length) |
|
return self.MINIMUM_FILE_SIZE <= file_size <= self.MAXIMUM_FILE_SIZE |
|
|
|
except Exception as error: |
|
print(f"Size verification error: {error}") |
|
return False |
|
|
|
def get_gif(self, search_query: str) -> Optional[str]: |
|
"""Get a GIF meeting size requirements with retries""" |
|
for attempt in range(self.MAX_RETRIES): |
|
try: |
|
|
|
offset = attempt * 10 |
|
|
|
params = { |
|
'api_key': self.giphy_key, |
|
'q': search_query, |
|
'limit': 10, |
|
'offset': offset, |
|
'rating': 'pg-13' |
|
} |
|
|
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/search", |
|
params=params, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if data["data"]: |
|
|
|
gifs = list(data["data"]) |
|
random.shuffle(gifs) |
|
|
|
|
|
for gif in gifs: |
|
gif_url = gif["images"]["original"]["url"] |
|
if self.verify_gif_size(gif_url): |
|
print(f"Found valid GIF on attempt {attempt + 1}") |
|
return gif_url |
|
|
|
print(f"No valid GIFs found in attempt {attempt + 1}, retrying...") |
|
else: |
|
print("No GIFs found in search") |
|
break |
|
|
|
except Exception as error: |
|
print(f"Error in attempt {attempt + 1}: {error}") |
|
continue |
|
|
|
|
|
return self._get_trending_gif() |
|
|
|
def _get_trending_gif(self) -> Optional[str]: |
|
"""Get a trending GIF meeting size requirements""" |
|
try: |
|
params = { |
|
'api_key': self.giphy_key, |
|
'limit': 25, |
|
'rating': 'pg-13' |
|
} |
|
|
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/trending", |
|
params=params, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if data["data"]: |
|
gifs = list(data["data"]) |
|
random.shuffle(gifs) |
|
|
|
for gif in gifs: |
|
gif_url = gif["images"]["original"]["url"] |
|
if self.verify_gif_size(gif_url): |
|
print("Found valid trending GIF") |
|
return gif_url |
|
|
|
except Exception as error: |
|
print(f"Trending GIF error: {error}") |
|
return None |
|
|
|
def setup_keys(self, openai_key: str, giphy_key: str) -> str: |
|
"""Initialize API clients with user's keys""" |
|
try: |
|
self.openai_client = OpenAI(api_key=openai_key) |
|
self.giphy_key = giphy_key |
|
self._test_giphy_key() |
|
self._test_openai_key() |
|
self.is_initialized = True |
|
return "β
Setup successful! Let's chat!" |
|
except Exception as error: |
|
self.is_initialized = False |
|
return f"β Error setting up: {str(error)}" |
|
|
|
def _test_giphy_key(self): |
|
"""Test if GIPHY key is valid""" |
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/trending", |
|
params={"api_key": self.giphy_key, "limit": 1} |
|
) |
|
if response.status_code != 200: |
|
raise Exception("Invalid GIPHY API key") |
|
|
|
def _test_openai_key(self): |
|
"""Test if OpenAI key is valid""" |
|
try: |
|
self.openai_client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[{"role": "user", "content": "test"}], |
|
max_tokens=5 |
|
) |
|
except Exception: |
|
raise Exception("Invalid OpenAI API key") |
|
|
|
def reset_chat(self) -> Tuple[List[Dict[str, str]], str]: |
|
"""Reset the chat history""" |
|
self.chat_history = [] |
|
return [], "" |
|
|
|
def format_message(self, role: str, content: str) -> Dict[str, str]: |
|
"""Format message in the new Gradio chat format""" |
|
return {"role": role, "content": content} |
|
|
|
def chat(self, message: str, history: List[Dict[str, str]]) -> Tuple[str, List[Dict[str, str]], str]: |
|
"""Main chat function with natural GIF integration""" |
|
if not self.is_initialized: |
|
return message, history, "Please set up your API keys first!" |
|
|
|
if not message.strip(): |
|
return message, history, "" |
|
|
|
try: |
|
|
|
system_message = """You are a supportive, empathetic friend who uses GIFs naturally in conversation. |
|
When using GIFs, keep search terms simple and contextual: |
|
|
|
Examples: |
|
- User feeling hungry -> [GIF: hungry] |
|
- User feeling sad -> [GIF: comforting hug] |
|
- User celebrating -> [GIF: celebration] |
|
- User confused -> [GIF: confused] |
|
|
|
Keep your responses: |
|
1. Empathetic and natural |
|
2. Context-aware (reference previous messages) |
|
3. Use GIFs that match the emotion |
|
|
|
Use 0-1 GIFs per message unless the moment really calls for more.""" |
|
|
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
for chat in history: |
|
messages.append({"role": chat["role"], "content": chat["content"]}) |
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
response = self.openai_client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
temperature=0.9, |
|
max_tokens=150 |
|
) |
|
|
|
|
|
ai_message = response.choices[0].message.content |
|
final_response = "" |
|
|
|
parts = ai_message.split("[GIF:") |
|
final_response += parts[0] |
|
|
|
for part in parts[1:]: |
|
gif_desc_end = part.find("]") |
|
if gif_desc_end != -1: |
|
gif_desc = part[:gif_desc_end].strip() |
|
print(f"Searching for GIF: {gif_desc}") |
|
gif_url = self.get_gif(gif_desc) |
|
if gif_url: |
|
final_response += f"\n\n" |
|
print(f"Added GIF: {gif_url}") |
|
final_response += part[gif_desc_end + 1:] |
|
|
|
history.append(self.format_message("user", message)) |
|
history.append(self.format_message("assistant", final_response)) |
|
return "", history, "" |
|
|
|
except Exception as error: |
|
error_message = f"Oops! Something went wrong: {str(error)}" |
|
return message, history, error_message |
|
|
|
def create_interface(): |
|
"""Create the Gradio interface""" |
|
bot = GifChatBot() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as interface: |
|
gr.Markdown(""" |
|
# π Friendly Chat Bot with GIFs |
|
Chat with an empathetic AI friend who expresses themselves through GIFs! |
|
Enter your API keys below to start. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
openai_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="sk-...", |
|
type="password", |
|
scale=2 |
|
) |
|
with gr.Column(scale=1): |
|
giphy_key = gr.Textbox( |
|
label="GIPHY API Key", |
|
placeholder="Enter your GIPHY API key", |
|
type="password", |
|
scale=2 |
|
) |
|
|
|
setup_button = gr.Button("Set up Keys", variant="primary") |
|
setup_status = gr.Textbox(label="Setup Status") |
|
|
|
chatbot = gr.Chatbot( |
|
label="Chat", |
|
bubble_full_width=False, |
|
height=450, |
|
type="messages" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
message_box = gr.Textbox( |
|
label="Type your message", |
|
placeholder="Say something...", |
|
show_label=False, |
|
container=False |
|
) |
|
with gr.Column(scale=1): |
|
clear_button = gr.Button("Clear Chat", variant="secondary") |
|
|
|
error_box = gr.Textbox(label="Error Messages", visible=True) |
|
|
|
|
|
setup_button.click( |
|
bot.setup_keys, |
|
inputs=[openai_key, giphy_key], |
|
outputs=setup_status |
|
) |
|
|
|
message_box.submit( |
|
bot.chat, |
|
inputs=[message_box, chatbot], |
|
outputs=[message_box, chatbot, error_box] |
|
) |
|
|
|
clear_button.click( |
|
bot.reset_chat, |
|
outputs=[chatbot, error_box] |
|
) |
|
|
|
gr.Markdown(""" |
|
### Tips: |
|
- π€ Share how you're feeling - the AI responds empathetically |
|
- π The conversation is context-aware |
|
- π― GIFs are chosen to match the emotion |
|
- π Use 'Clear Chat' to start fresh |
|
""") |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |