import gradio as gr from google import genai from google.genai import types from PIL import Image from io import BytesIO import base64 import os import json import random import urllib.parse # Initialize the Google Generative AI client with the API key from environment variables try: api_key = os.environ['GEMINI_API_KEY'] except KeyError: raise ValueError("Please set the GEMINI_API_KEY environment variable.") client = genai.Client(api_key=api_key) def clean_response_text(response_text): """ Clean the API response by removing Markdown code block markers. Args: response_text (str): The raw response text from the API. Returns: str: The cleaned response text. """ cleaned_text = response_text.strip() if cleaned_text.startswith("```json"): cleaned_text = cleaned_text[len("```json"):].strip() if cleaned_text.endswith("```"): cleaned_text = cleaned_text[:-len("```")].strip() return cleaned_text def generate_ideas(tag): """ Generate a diverse set of ideas related to the tag using the LLM. Args: tag (str): The tag to base the ideas on. Returns: list: A list of ideas as strings. """ prompt = f""" Generate a list of 5 diverse and creative ideas related to {tag} that can be used for a TikTok video. Each idea should be a short sentence describing a specific scene or concept. Return the response as a JSON object with a single key 'ideas' containing a list of 5 ideas. Ensure the response is strictly in JSON format. Example: {{"ideas": ["A neon-lit gaming setup with RGB lights flashing", "A futuristic robot assembling a gadget"]}} """ try: response = client.models.generate_content( model='gemini-2.0-flash', contents=[prompt], config=types.GenerateContentConfig(temperature=1.2) ) print(f"Raw response for ideas: {response.text}") # Debugging if not response.text or response.text.isspace(): raise ValueError("Empty response from API") cleaned_text = clean_response_text(response.text) response_json = json.loads(cleaned_text) if 'ideas' not in response_json or not isinstance(response_json['ideas'], list): raise ValueError("Invalid JSON format: 'ideas' key missing or not a list") return response_json['ideas'] except Exception as e: print(f"Error generating ideas: {e}") return [ f"A vibrant {tag} scene at sunset", f"A close-up of {tag} with neon lights", f"A futuristic take on {tag} with holograms", f"A cozy {tag} moment with warm lighting", f"An action-packed {tag} scene with dynamic colors" ] def generate_item(tag, ideas, max_retries=3): """ Generate a single feed item using one of the ideas, retrying if image generation fails. Args: tag (str): The tag to base the content on. ideas (list): List of ideas to choose from. max_retries (int): Maximum number of retries if image generation fails. Returns: dict: A dictionary with 'text' (str), 'image_base64' (str), and 'ideas' (list). """ for attempt in range(max_retries): selected_idea = random.choice(ideas) prompt = f""" Based on the idea "{selected_idea}", create content for a TikTok video about {tag}. Return a JSON object with two keys: - 'caption': A short, viral TikTok-style caption with hashtags. - 'image_prompt': A detailed image prompt for generating a high-quality visual scene. The image prompt should describe the scene vividly, specify a perspective and style, and ensure no text or letters are included. Ensure the response is strictly in JSON format. Example: {{"caption": "Neon vibes only! 🌌 #tech", "image_prompt": "A close-up view of a neon-lit gaming setup with RGB lights flashing, in a futuristic style, no text or letters"}} """ try: response = client.models.generate_content( model='gemini-2.0-flash', contents=[prompt], config=types.GenerateContentConfig(temperature=1.2) ) print(f"Raw response for item (attempt {attempt + 1}): {response.text}") # Debugging if not response.text or response.text.isspace(): raise ValueError("Empty response from API") cleaned_text = clean_response_text(response.text) response_json = json.loads(cleaned_text) if 'caption' not in response_json or 'image_prompt' not in response_json: raise ValueError("Invalid JSON format: 'caption' or 'image_prompt' key missing") text = response_json['caption'] image_prompt = response_json['image_prompt'] except Exception as e: print(f"Error generating item (attempt {attempt + 1}): {e}") text = f"Obsessed with {tag}! 🔥 #{tag}" image_prompt = f"A vivid scene of {selected_idea}, in a vibrant pop art style, no text or letters" # Attempt to generate the image try: image_response = client.models.generate_images( model='imagen-3.0-generate-002', prompt=image_prompt, config=types.GenerateContentConfig( number_of_images=1, aspect_ratio="9:16", person_generation="DONT_ALLOW" ) ) if image_response.generated_images and len(image_response.generated_images) > 0: generated_image = image_response.generated_images[0] image = Image.open(BytesIO(generated_image.image.image_bytes)) # Successfully generated an image, proceed buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return {'text': text, 'image_base64': img_str, 'ideas': ideas} else: print(f"Image generation failed (attempt {attempt + 1}): No images returned") if attempt == max_retries - 1: # Last attempt, use a gray placeholder image = Image.new('RGB', (360, 640), color='gray') buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return {'text': text, 'image_base64': img_str, 'ideas': ideas} # Retry with new ideas ideas = generate_ideas(tag) continue except Exception as e: print(f"Error generating image (attempt {attempt + 1}): {e}") if attempt == max_retries - 1: # Last attempt, use a gray placeholder image = Image.new('RGB', (360, 640), color='gray') buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return {'text': text, 'image_base64': img_str, 'ideas': ideas} # Retry with new ideas ideas = generate_ideas(tag) continue def start_feed(tag, current_index, feed_items): """ Start or update the feed based on the tag. Args: tag (str): The tag to generate content for. current_index (int): The current item index. feed_items (list): The current list of feed items. Returns: tuple: (current_tag, current_index, feed_items, html_content, share_links, is_loading) """ if not tag.strip(): tag = "trending" # Set loading state is_loading = True html_content = generate_html([], False, 0, tag, is_loading) share_links = "" try: ideas = generate_ideas(tag) item = generate_item(tag, ideas) feed_items = [item] current_index = 0 share_links = generate_share_links(item['image_base64'], item['text']) except Exception as e: print(f"Error in start_feed: {e}") feed_items = [] current_index = 0 html_content = """

Error generating content. Please try again!

""" is_loading = False return tag, current_index, feed_items, html_content, share_links, is_loading # Set loading state to False and update UI is_loading = False html_content = generate_html(feed_items, False, current_index, tag, is_loading) return tag, current_index, feed_items, html_content, share_links, is_loading def load_next(tag, current_index, feed_items): """ Load the next item in the feed. Args: tag (str): The tag to generate content for. current_index (int): The current item index. feed_items (list): The current list of feed items. Returns: tuple: (current_tag, current_index, feed_items, html_content, share_links, is_loading) """ is_loading = True html_content = generate_html(feed_items, False, current_index, tag, is_loading) share_links = "" try: if current_index + 1 < len(feed_items): current_index += 1 else: ideas = feed_items[-1]['ideas'] if feed_items else generate_ideas(tag) new_item = generate_item(tag, ideas) feed_items.append(new_item) current_index = len(feed_items) - 1 share_links = generate_share_links(feed_items[current_index]['image_base64'], feed_items[current_index]['text']) except Exception as e: print(f"Error in load_next: {e}") html_content = """

Error generating content. Please try again!

""" is_loading = False return tag, current_index, feed_items, html_content, share_links, is_loading is_loading = False html_content = generate_html(feed_items, False, current_index, tag, is_loading) return tag, current_index, feed_items, html_content, share_links, is_loading def load_previous(tag, current_index, feed_items): """ Load the previous item in the feed. Args: tag (str): The tag to generate content for. current_index (int): The current item index. feed_items (list): The current list of feed items. Returns: tuple: (current_tag, current_index, feed_items, html_content, share_links, is_loading) """ if current_index > 0: current_index -= 1 html_content = generate_html(feed_items, False, current_index, tag, False) share_links = generate_share_links(feed_items[current_index]['image_base64'], feed_items[current_index]['text']) return tag, current_index, feed_items, html_content, share_links, False def generate_share_links(image_base64, caption): """ Generate share links for social media platforms with a download link. Args: image_base64 (str): The base64-encoded image data. caption (str): The caption to share with the image. Returns: str: HTML string with share links and download instructions. """ image_data_url = f"data:image/png;base64,{image_base64}" encoded_caption = urllib.parse.quote(caption) # Generate share links with instructions to download the image share_links = """

To share, download the image and upload it to your platform:

Download Image
Share on TikTok Share on Instagram Share on Facebook Share on X Share on Pinterest
""".format(image_url=image_data_url, caption=encoded_caption) return share_links def generate_html(feed_items, scroll_to_latest=False, current_index=0, tag="", is_loading=False): """ Generate an HTML string to display the current feed item with click navigation. Args: feed_items (list): List of dictionaries containing 'text' and 'image_base64'. scroll_to_latest (bool): Whether to auto-scroll to the latest item (not used here). current_index (int): The index of the item to display. tag (str): The current tag for loading messages. is_loading (bool): Whether the feed is currently loading. Returns: str: HTML string representing the feed. """ loading_messages = [ f"Cooking up a {tag} masterpiece... 🍳", f"Snapping a vibrant {tag} moment... 📸", f"Creating a {tag} vibe that pops... ✨", f"Getting that perfect {tag} shot... 🎥", f"Bringing {tag} to life... 🌟" ] if is_loading: return f"""
{loading_messages[0]}
""" if not feed_items or current_index >= len(feed_items): return """

Enter a concept or idea to start your feed!

""" item = feed_items[current_index] html_str = """
{text}
""".format(image_base64=item['image_base64'], text=item['text']) return html_str # Define the Gradio interface with gr.Blocks( css=""" body { background-color: #000; color: #fff; font-family: Arial, sans-serif; } .gradio-container { max-width: 400px; margin: 0 auto; padding: 10px; } input, select, button { border-radius: 5px; background-color: #222; color: #fff; border: 1px solid #444; } button { background-color: #ff2d55; border: none; } button:hover { background-color: #e0264b; } .gr-button { width: 100%; margin-top: 10px; } .gr-form { background-color: #111; padding: 15px; border-radius: 10px; } """, title="Create Your Feed" ) as demo: # State variables current_tag = gr.State(value="") current_index = gr.State(value=0) feed_items = gr.State(value=[]) is_loading = gr.State(value=False) share_links = gr.State(value="") # Input section with gr.Column(elem_classes="gr-form"): gr.Markdown("### Create Your Feed") tag_input = gr.Textbox( label="Enter Concept or Ideas", # Renamed label value="", placeholder="e.g., sushi adventure, neon tech", # Updated placeholder submit_btn=False ) magic_button = gr.Button("✨ Generate Next Item", elem_classes="gr-button") # Output display feed_html = gr.HTML() share_html = gr.HTML(label="Share this item:") # Event handlers # Handle Enter keypress in the concept input tag_input.submit( fn=start_feed, inputs=[tag_input, current_index, feed_items], outputs=[current_tag, current_index, feed_items, feed_html, share_html, is_loading] ) # Handle magic button click to generate next item magic_button.click( fn=load_next, inputs=[current_tag, current_index, feed_items], outputs=[current_tag, current_index, feed_items, feed_html, share_html, is_loading] ) # Hidden button for previous item navigation previous_button = gr.Button("Previous", elem_id="previous-button", visible=False) # Handle click to go to previous item previous_button.click( fn=load_previous, inputs=[current_tag, current_index, feed_items], outputs=[current_tag, current_index, feed_items, feed_html, share_html, is_loading] ) # Launch the app demo.launch()