|
import gradio as gr |
|
from openai import OpenAI |
|
import requests |
|
import json |
|
from typing import List, Dict, Optional, Tuple |
|
import random |
|
import time |
|
from datetime import datetime |
|
|
|
class GifChatBot: |
|
def __init__(self): |
|
self.openai_client = None |
|
self.giphy_key = None |
|
self.chat_history = [] |
|
self.is_initialized = False |
|
self.session = requests.Session() |
|
|
|
|
|
self.session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
|
|
self.minimum_gif_size = 10000 |
|
self.maximum_gif_size = 2000000 |
|
self.minimum_dimension = 200 |
|
self.maximum_dimension = 1000 |
|
self.maximum_frames = 50 |
|
|
|
def verify_gif_url(self, gif_url: str) -> bool: |
|
"""Verify GIF URL with relaxed validation for GIPHY URLs""" |
|
try: |
|
|
|
if 'media.giphy.com' in gif_url: |
|
|
|
if '/media/giphy.com/media/' in gif_url or 'giphy.gif' in gif_url: |
|
return True |
|
|
|
|
|
head_response = self.session.head(gif_url, timeout=3) |
|
if head_response.status_code != 200: |
|
print(f"HEAD request failed with status {head_response.status_code}") |
|
return False |
|
|
|
content_type = head_response.headers.get('content-type', '').lower() |
|
if not ('gif' in content_type or 'image' in content_type): |
|
print("Invalid content type:", content_type) |
|
return False |
|
|
|
return True |
|
|
|
except Exception as error: |
|
print(f"GIF validation error: {error}") |
|
|
|
return 'media.giphy.com' in gif_url |
|
|
|
def _is_good_quality_gif(self, gif: Dict) -> bool: |
|
"""Comprehensive quality check for GIFs""" |
|
try: |
|
|
|
if not gif.get("images") or not gif.get("images").get("original"): |
|
print("Missing image data") |
|
return False |
|
|
|
original = gif.get("images").get("original") |
|
|
|
|
|
try: |
|
width = int(original.get("width", 0)) |
|
height = int(original.get("height", 0)) |
|
|
|
if width < self.minimum_dimension or height < self.minimum_dimension: |
|
print("GIF too small:", width, height) |
|
return False |
|
if width > self.maximum_dimension or height > self.maximum_dimension: |
|
print("GIF too large:", width, height) |
|
return False |
|
except (ValueError, TypeError): |
|
print("Invalid dimensions") |
|
return False |
|
|
|
|
|
try: |
|
frames = int(original.get("frames", self.maximum_frames)) |
|
if frames > self.maximum_frames: |
|
print("Too many frames:", frames) |
|
return False |
|
except (ValueError, TypeError): |
|
print("Invalid frame count") |
|
return False |
|
|
|
|
|
try: |
|
size = int(original.get("size", 0)) |
|
if size < self.minimum_gif_size or size > self.maximum_gif_size: |
|
print("Invalid size:", size) |
|
return False |
|
except (ValueError, TypeError): |
|
print("Invalid size data") |
|
return False |
|
|
|
|
|
gif_id = gif.get("id", "") |
|
if len(gif_id) < 10: |
|
print("Old GIF (short ID)") |
|
return False |
|
|
|
|
|
if gif.get("is_trending"): |
|
print("Currently trending") |
|
return True |
|
|
|
|
|
trending_datetime = gif.get("trending_datetime") |
|
if trending_datetime and trending_datetime != "0000-00-00 00:00:00": |
|
try: |
|
trending_date = datetime.strptime(trending_datetime, "%Y-%m-%d %H:%M:%S") |
|
days_ago = (datetime.now() - trending_date).days |
|
if days_ago < 90: |
|
print("Recently trending") |
|
return True |
|
except: |
|
pass |
|
|
|
return True |
|
|
|
except Exception as error: |
|
print(f"Quality check error: {error}") |
|
return False |
|
|
|
def get_gif(self, search_query: str) -> Optional[str]: |
|
"""Search and validate GIPHY GIFs""" |
|
try: |
|
print(f"Searching for GIF: {search_query}") |
|
|
|
|
|
parameters = { |
|
'api_key': self.giphy_key, |
|
'q': search_query, |
|
'limit': 25, |
|
'rating': 'pg-13', |
|
'bundle': 'messaging_non_clips', |
|
'sort': 'relevant' |
|
} |
|
|
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/search", |
|
params=parameters, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if data.get("data"): |
|
|
|
gifs = sorted( |
|
data["data"], |
|
key=lambda x: len(x.get("id", "")), |
|
reverse=True |
|
) |
|
|
|
print(f"Found {len(gifs)} GIFs, testing quality...") |
|
|
|
|
|
for gif in gifs: |
|
if not self._is_good_quality_gif(gif): |
|
continue |
|
|
|
gif_url = gif["images"]["original"]["url"] |
|
print(f"Testing GIF URL: {gif_url}") |
|
|
|
if self.verify_gif_url(gif_url): |
|
print("Found good GIF!") |
|
return gif_url |
|
else: |
|
print("GIF validation failed") |
|
|
|
print("No good GIFs found, trying trending...") |
|
return self._get_trending_gif() |
|
else: |
|
print("No GIFs found in search") |
|
else: |
|
print(f"Search failed with status {response.status_code}") |
|
|
|
return None |
|
|
|
except Exception as error: |
|
print(f"GIPHY search error: {error}") |
|
return None |
|
|
|
def _get_trending_gif(self) -> Optional[str]: |
|
"""Get a verified trending GIF""" |
|
try: |
|
print("Fetching trending GIF...") |
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/trending", |
|
params={ |
|
'api_key': self.giphy_key, |
|
'limit': 25, |
|
'rating': 'pg-13', |
|
'bundle': 'messaging_non_clips' |
|
}, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
data = response.json() |
|
if data.get("data"): |
|
gifs = list(data["data"]) |
|
random.shuffle(gifs) |
|
|
|
print(f"Testing {len(gifs)} trending GIFs...") |
|
for gif in gifs: |
|
if not self._is_good_quality_gif(gif): |
|
continue |
|
|
|
gif_url = gif["images"]["original"]["url"] |
|
print(f"Testing trending GIF: {gif_url}") |
|
|
|
if self.verify_gif_url(gif_url): |
|
print("Found good trending GIF!") |
|
return gif_url |
|
else: |
|
print("Trending GIF validation failed") |
|
|
|
print("No good trending GIFs found") |
|
return None |
|
|
|
except Exception as error: |
|
print(f"Error getting trending GIF: {error}") |
|
return None |
|
|
|
def setup_keys(self, openai_key: str, giphy_key: str) -> str: |
|
"""Initialize API clients with user's keys""" |
|
try: |
|
self.openai_client = OpenAI(api_key=openai_key) |
|
self.giphy_key = giphy_key |
|
self._test_giphy_key() |
|
self._test_openai_key() |
|
self.is_initialized = True |
|
return "β
Setup successful! Let's chat!" |
|
except Exception as error: |
|
self.is_initialized = False |
|
return f"β Error setting up: {str(error)}" |
|
|
|
def _test_giphy_key(self): |
|
"""Test if GIPHY key is valid""" |
|
response = self.session.get( |
|
"https://api.giphy.com/v1/gifs/trending", |
|
params={"api_key": self.giphy_key, "limit": 1} |
|
) |
|
if response.status_code != 200: |
|
raise Exception("Invalid GIPHY API key") |
|
|
|
def _test_openai_key(self): |
|
"""Test if OpenAI key is valid""" |
|
try: |
|
self.openai_client.chat.completions.create( |
|
model="gpt-4", |
|
messages=[{"role": "user", "content": "test"}], |
|
max_tokens=5 |
|
) |
|
except Exception: |
|
raise Exception("Invalid OpenAI API key") |
|
|
|
def reset_chat(self) -> Tuple[List[Dict[str, str]], str]: |
|
"""Reset the chat history""" |
|
self.chat_history = [] |
|
return [], "" |
|
|
|
def format_message(self, role: str, content: str) -> Dict[str, str]: |
|
"""Format message in the new Gradio chat format""" |
|
return {"role": role, "content": content} |
|
|
|
def chat(self, message: str, history: List[Dict[str, str]]) -> Tuple[str, List[Dict[str, str]], str]: |
|
"""Main chat function with natural GIF integration""" |
|
if not self.is_initialized: |
|
return message, history, "Please set up your API keys first!" |
|
|
|
if not message.strip(): |
|
return message, history, "" |
|
|
|
try: |
|
|
|
system_message = """You are a supportive, empathetic friend who uses GIFs naturally and proactively in conversation regardless of whether the user request it or not. |
|
When using GIFs, keep search terms simple and contextual: |
|
|
|
Examples: |
|
- User feeling hungry -> [GIF: hungry] |
|
- User feeling sad -> [GIF: comforting hug] |
|
- User celebrating -> [GIF: celebration] |
|
- User confused -> [GIF: confused] |
|
|
|
Keep your responses: |
|
1. Empathetic and natural |
|
2. Context-aware (reference previous messages when relevant) |
|
3. Use GIFs that match the emotional context |
|
|
|
Use 0-1 GIFs per message unless the moment really calls for more. |
|
Keep search terms simple and universal for better GIF matching.""" |
|
|
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
for chat in history: |
|
messages.append({"role": chat["role"], "content": chat["content"]}) |
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
response = self.openai_client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
temperature=0.9, |
|
max_tokens=150 |
|
) |
|
|
|
|
|
ai_message = response.choices[0].message.content |
|
final_response = "" |
|
|
|
parts = ai_message.split("[GIF:") |
|
final_response += parts[0] |
|
|
|
for part in parts[1:]: |
|
gif_desc_end = part.find("]") |
|
if gif_desc_end != -1: |
|
gif_desc = part[:gif_desc_end].strip() |
|
print(f"Looking for GIF: {gif_desc}") |
|
gif_url = self.get_gif(gif_desc) |
|
if gif_url: |
|
final_response += f"\n\n" |
|
print(f"Added GIF: {gif_url}") |
|
else: |
|
print("No suitable GIF found") |
|
final_response += part[gif_desc_end + 1:] |
|
|
|
|
|
history.append(self.format_message("user", message)) |
|
history.append(self.format_message("assistant", final_response)) |
|
return "", history, "" |
|
|
|
except Exception as error: |
|
error_message = f"Oops! Something went wrong: {str(error)}" |
|
print(f"Chat error: {error}") |
|
return message, history, error_message |
|
|
|
def create_interface(): |
|
"""Create the Gradio interface""" |
|
bot = GifChatBot() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as interface: |
|
gr.Markdown(""" |
|
# π Friendly Chat Bot with GIFs |
|
Chat with an empathetic AI friend who expresses themselves through GIFs! |
|
Enter your API keys below to start. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
openai_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="sk-...", |
|
type="password", |
|
scale=2 |
|
) |
|
with gr.Column(scale=1): |
|
giphy_key = gr.Textbox( |
|
label="GIPHY API Key", |
|
placeholder="Enter your GIPHY API key", |
|
type="password", |
|
scale=2 |
|
) |
|
|
|
setup_button = gr.Button("Set up Keys", variant="primary") |
|
setup_status = gr.Textbox(label="Setup Status") |
|
|
|
chatbot = gr.Chatbot( |
|
label="Chat", |
|
bubble_full_width=False, |
|
height=450, |
|
type="messages" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
message_box = gr.Textbox( |
|
label="Type your message", |
|
placeholder="Say something...", |
|
show_label=False, |
|
container=False |
|
) |
|
with gr.Column(scale=1): |
|
clear_button = gr.Button("Clear Chat", variant="secondary") |
|
|
|
error_box = gr.Textbox(label="Error Messages", visible=True) |
|
|
|
|
|
setup_button.click( |
|
bot.setup_keys, |
|
inputs=[openai_key, giphy_key], |
|
outputs=setup_status |
|
) |
|
|
|
message_box.submit( |
|
bot.chat, |
|
inputs=[message_box, chatbot], |
|
outputs=[message_box, chatbot, error_box] |
|
) |
|
|
|
clear_button.click( |
|
bot.reset_chat, |
|
outputs=[chatbot, error_box] |
|
) |
|
|
|
gr.Markdown(""" |
|
### Tips: |
|
- π€ Share how you're feeling - the AI responds empathetically |
|
- π The conversation is context-aware |
|
- π― GIFs are chosen to match the emotion |
|
- π Use 'Clear Chat' to start fresh |
|
|
|
Note: All GIFs are validated for quality and availability before being used! |
|
""") |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |