File size: 4,554 Bytes
b9724da
5f26e9c
 
e3c4ecf
5f26e9c
 
b9724da
 
5f26e9c
 
 
 
 
 
 
 
 
b9724da
5f26e9c
 
 
 
 
b9724da
5f26e9c
b9724da
5f26e9c
 
b9724da
5f26e9c
 
 
 
b9724da
 
5f26e9c
 
b9724da
 
5f26e9c
 
b9724da
5f26e9c
 
 
 
 
 
b9724da
5f26e9c
b9724da
 
5f26e9c
b9724da
 
5f26e9c
 
 
 
 
 
 
 
 
b9724da
5f26e9c
 
b9724da
5f26e9c
b9724da
5f26e9c
 
 
b9724da
5f26e9c
 
 
b9724da
 
5f26e9c
 
 
 
 
 
 
 
0d9d5af
5f26e9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9724da
5f26e9c
b9724da
 
5f26e9c
b9724da
5f26e9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9724da
5f26e9c
b9724da
5f26e9c
 
 
b9724da
5f26e9c
 
b9724da
5f26e9c
b9724da
 
5f26e9c
 
0d9d5af
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os
import io
import time
import random
import logging
import requests
from PIL import Image, UnidentifiedImageError

# --- Logging setup ---
logger = logging.getLogger("flux")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(handler)

# --- Configuration ---
IMGBB_API_KEY = os.getenv("IMGBB_API_KEY")
DEFAULT_MODEL = "flux"
DEFAULT_WIDTH = 1920
DEFAULT_HEIGHT = 1080
MAX_RETRIES   = 3
BACKOFF_BASE  = 2  # exponential backoff

def upload_to_imgbb(image_path: str, file_name: str) -> str | None:
    """
    Uploads the image at image_path to ImgBB.
    Returns the public URL or None on failure.
    """
    if not IMGBB_API_KEY:
        logger.warning("IMGBB_API_KEY not set, skipping upload")
        return None

    try:
        with open(image_path, 'rb') as f:
            files = {"image": (file_name, f.read())}
        resp = requests.post(
            "https://api.imgbb.com/1/upload",
            params={"key": IMGBB_API_KEY},
            files=files,
            timeout=15
        )
        resp.raise_for_status()
        data = resp.json().get("data", {})
        url  = data.get("url")
        if url:
            logger.debug(f"Uploaded to ImgBB: {url}")
            return url
        else:
            logger.error("ImgBB response missing URL")
            return None
    except Exception as e:
        logger.error(f"ImgBB upload failed: {e}")
        return None

def generate_image(
    prompt: str,
    request_id: str,
    current_request_id: str,
    image_dir: str,
    model: str = None,
    width: int = None,
    height: int = None
) -> tuple[Image.Image, str, str, str] | None:
    """
    Generate an image via Pollinations API, save locally, upload to ImgBB.

    Returns:
      (PIL.Image, local_path, returned_prompt, image_url) or None on failure.
    """
    model  = model or DEFAULT_MODEL
    width  = width or DEFAULT_WIDTH
    height = height or DEFAULT_HEIGHT

    # if the request has been superseded, bail early
    if request_id != current_request_id:
        logger.info("Request ID mismatch; cancelling generation")
        return None

    seed = random.randint(0, 2**31 - 1)
    url  = (
        f"https://image.pollinations.ai/prompt/{requests.utils.quote(prompt)}"
        f"?nologo=true&safe=false&private=true&model={model}"
        f"&enhance=true&width={width}&height={height}&seed={seed}"
    )
    logger.debug(f"Fetching image (seed={seed}): {url}")

    backoff = 1000 # equivalent to 1 second
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = requests.get(url, timeout=45)
            if resp.status_code != 200:
                raise RuntimeError(f"Status {resp.status_code}")
            break
        except Exception as e:
            logger.warning(f"Attempt {attempt}/{MAX_RETRIES} failed: {e}")
            if attempt == MAX_RETRIES:
                logger.error("Max retries reached, aborting image fetch")
                return None
            time.sleep(backoff)
            backoff *= BACKOFF_BASE

    # verify still the active request
    if request_id != current_request_id:
        logger.info("Request ID mismatch after fetch; discarding result")
        return None

    # load image
    try:
        image = Image.open(io.BytesIO(resp.content))
        logger.debug(f"Image loaded: {image.size[0]}×{image.size[1]}")
    except UnidentifiedImageError as e:
        logger.error(f"Invalid image data: {e}")
        return None

    # try to extract prompt metadata from EXIF
    returned_prompt = prompt
    exif = image.info.get("exif", b"")
    if exif:
        try:
            import re, json as _json
            m = re.search(b'{"prompt":.*}', exif)
            if m:
                meta = _json.loads(m.group(0).decode())
                returned_prompt = meta.get("prompt", prompt)
        except Exception as e:
            logger.debug(f"EXIF parse failed: {e}")

    # ensure output directory
    os.makedirs(image_dir, exist_ok=True)
    filename = f"flux_{int(time.time())}.png"
    path     = os.path.join(image_dir, filename)

    try:
        image.save(path, format="PNG")
        logger.info(f"Image saved to {path}")
    except Exception as e:
        logger.error(f"Failed to save image: {e}")
        return None

    # upload
    image_url = upload_to_imgbb(path, filename) or ""
    return image, path, returned_prompt.split("\n\n", 1)[0], image_url