File size: 5,232 Bytes
b9724da
 
 
 
 
 
e3c4ecf
b9724da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e371810
e3c4ecf
b9724da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65616bc
b9724da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0ffceca
 
f0fa946
0ffceca
 
 
 
 
 
b9724da
0ffceca
b9724da
0ffceca
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import requests
import time
import io
import os
import re
import json
import random
from PIL import Image, UnidentifiedImageError

# Load the ImgBB API key from the environment variables.
IMGBB_API_KEY = os.getenv("IMGBB_API_KEY")

def upload_to_imgbb(image_path, file_name):
    """
    Uploads the image located at image_path to ImgBB.
    Returns:
        str: URL of the uploaded image on ImgBB or None if failed.
    """
    try:
        with open(image_path, 'rb') as f:
            image_data = f.read()
        response = requests.post(
            "https://api.imgbb.com/1/upload",
            params={"key": IMGBB_API_KEY},
            files={"image": (file_name, image_data)}
        )
        response.raise_for_status()
        result = response.json()
        if result.get("data") and "url" in result["data"]:
            return result["data"]["url"]
        else:
            print("Failed to upload image to ImgBB.")
            return None
    except requests.RequestException as e:
        print(f"Error uploading image to ImgBB: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error uploading image to ImgBB: {e}")
        return None

def generate_image(prompt, request_id, current_request_id, image_dir, attempt=0):
    """
    Generate an image using the Pollinations API.
    
    Parameters:
        prompt (str): The prompt for image generation.
        width (int): Desired image width.
        height (int): Desired image height.
        request_id (int): The request id for the current operation.
        current_request_id (int): The current active request id.
        image_dir (str): Directory where image will be saved.
        attempt (int): Current attempt count (zero-indexed).
    
    Returns:
        tuple: (PIL.Image object, image_path (str), returned_prompt (str), image_url (str))
        or None if image fetch fails or request id mismatches.
    """
    model = "flux"
    width = 1920
    height = 1080
    randomSeed = random.randint(0, 9999999)
    url = f"https://image.pollinations.ai/prompt/{prompt}?nologo=true&safe=false&private=true&model={model}&enhance=true&width={width}&height={height}&seed={randomSeed}"
    print(f"Attempt {attempt + 1}: Fetching image with URL: {url}")
    
    try:
        response = requests.get(url, timeout=45)
    except Exception as e:
        print(f"Error fetching image: {e}")
        return None

    if response.status_code != 200:
        print(f"Failed to fetch image. Status code: {response.status_code}")
        return None

    if request_id != current_request_id:
        print("Request ID mismatch. Operation cancelled.")
        return None

    print("Image fetched successfully.")
    image_data = response.content

    try:
        image = Image.open(io.BytesIO(image_data))
        actual_width, actual_height = image.size
        print(f"Actual image dimensions: {actual_width}x{actual_height}")
        
        # Extract metadata from EXIF if available
        exif_data = image.info.get('exif', b'')
        returned_prompt = prompt
        if exif_data:
            json_match = re.search(b'{"prompt":.*}', exif_data)
            if json_match:
                json_str = json_match.group(0).decode('utf-8')
                try:
                    metadata_dict = json.loads(json_str)
                    returned_prompt = metadata_dict.get('prompt', prompt)
                except json.JSONDecodeError as e:
                    print(f"Failed to parse JSON in metadata: {e}")
            else:
                print("No JSON data found in EXIF")
        
        if (actual_width, actual_height) != (width, height):
            print(f"Warning: Received image dimensions ({actual_width}x{actual_height}) do not match requested dimensions ({width}x{height})")
    except UnidentifiedImageError:
        print("Error: Received data is not a valid image.")
        raise

    timestamp = int(time.time())
    image_filename = f"flux_{timestamp}.png"
    image_path = os.path.join(image_dir, image_filename)
    
    # Ensure the image directory exists
    os.makedirs(image_dir, exist_ok=True)
    
    try:
        image.save(image_path, 'PNG')
        print(f"Image saved to {image_path}")
        # Upload image to ImgBB
        image_url = upload_to_imgbb(image_path, image_filename)
        if image_url:
            print(f"Image uploaded to ImgBB: {image_url}")
        else:
            print("Failed to upload image to ImgBB.")
    except Exception as e:
        print(f"Error saving image: {e}")
        return None

    return image, image_path, returned_prompt, image_url

# if __name__ == "__main__":
#     from dotenv import load_dotenv

#     load_dotenv()
#     # Example usage
#     prompt = "Beach party, anime style, vibrant colors"
#     request_id = 1
#     current_request_id = 1
#     image_dir = "./images"
    
#     image, image_path, returned_prompt, image_url = generate_image(prompt, request_id, current_request_id, image_dir)
    
#     if image:
#         print(f"Image generated and saved at {image_path}")
#         print(f"Returned prompt: {returned_prompt}")
#         print(f"Image URL: {image_url}")
#     else:
#         print("Failed to generate image.")