Spaces:
Running
Running
Chandima Prabhath
Increase backoff duration for image fetching and limit returned prompt to first paragraph
0d9d5af
import os | |
import io | |
import time | |
import random | |
import logging | |
import requests | |
from PIL import Image, UnidentifiedImageError | |
# --- Logging setup --- | |
logger = logging.getLogger("flux") | |
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() | |
logger.setLevel(LOG_LEVEL) | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) | |
logger.addHandler(handler) | |
# --- Configuration --- | |
IMGBB_API_KEY = os.getenv("IMGBB_API_KEY") | |
DEFAULT_MODEL = "flux" | |
DEFAULT_WIDTH = 1920 | |
DEFAULT_HEIGHT = 1080 | |
MAX_RETRIES = 3 | |
BACKOFF_BASE = 2 # exponential backoff | |
def upload_to_imgbb(image_path: str, file_name: str) -> str | None: | |
""" | |
Uploads the image at image_path to ImgBB. | |
Returns the public URL or None on failure. | |
""" | |
if not IMGBB_API_KEY: | |
logger.warning("IMGBB_API_KEY not set, skipping upload") | |
return None | |
try: | |
with open(image_path, 'rb') as f: | |
files = {"image": (file_name, f.read())} | |
resp = requests.post( | |
"https://api.imgbb.com/1/upload", | |
params={"key": IMGBB_API_KEY}, | |
files=files, | |
timeout=15 | |
) | |
resp.raise_for_status() | |
data = resp.json().get("data", {}) | |
url = data.get("url") | |
if url: | |
logger.debug(f"Uploaded to ImgBB: {url}") | |
return url | |
else: | |
logger.error("ImgBB response missing URL") | |
return None | |
except Exception as e: | |
logger.error(f"ImgBB upload failed: {e}") | |
return None | |
def generate_image( | |
prompt: str, | |
request_id: str, | |
current_request_id: str, | |
image_dir: str, | |
model: str = None, | |
width: int = None, | |
height: int = None | |
) -> tuple[Image.Image, str, str, str] | None: | |
""" | |
Generate an image via Pollinations API, save locally, upload to ImgBB. | |
Returns: | |
(PIL.Image, local_path, returned_prompt, image_url) or None on failure. | |
""" | |
model = model or DEFAULT_MODEL | |
width = width or DEFAULT_WIDTH | |
height = height or DEFAULT_HEIGHT | |
# if the request has been superseded, bail early | |
if request_id != current_request_id: | |
logger.info("Request ID mismatch; cancelling generation") | |
return None | |
seed = random.randint(0, 2**31 - 1) | |
url = ( | |
f"https://image.pollinations.ai/prompt/{requests.utils.quote(prompt)}" | |
f"?nologo=true&safe=false&private=true&model={model}" | |
f"&enhance=true&width={width}&height={height}&seed={seed}" | |
) | |
logger.debug(f"Fetching image (seed={seed}): {url}") | |
backoff = 1000 # equivalent to 1 second | |
for attempt in range(1, MAX_RETRIES + 1): | |
try: | |
resp = requests.get(url, timeout=45) | |
if resp.status_code != 200: | |
raise RuntimeError(f"Status {resp.status_code}") | |
break | |
except Exception as e: | |
logger.warning(f"Attempt {attempt}/{MAX_RETRIES} failed: {e}") | |
if attempt == MAX_RETRIES: | |
logger.error("Max retries reached, aborting image fetch") | |
return None | |
time.sleep(backoff) | |
backoff *= BACKOFF_BASE | |
# verify still the active request | |
if request_id != current_request_id: | |
logger.info("Request ID mismatch after fetch; discarding result") | |
return None | |
# load image | |
try: | |
image = Image.open(io.BytesIO(resp.content)) | |
logger.debug(f"Image loaded: {image.size[0]}×{image.size[1]}") | |
except UnidentifiedImageError as e: | |
logger.error(f"Invalid image data: {e}") | |
return None | |
# try to extract prompt metadata from EXIF | |
returned_prompt = prompt | |
exif = image.info.get("exif", b"") | |
if exif: | |
try: | |
import re, json as _json | |
m = re.search(b'{"prompt":.*}', exif) | |
if m: | |
meta = _json.loads(m.group(0).decode()) | |
returned_prompt = meta.get("prompt", prompt) | |
except Exception as e: | |
logger.debug(f"EXIF parse failed: {e}") | |
# ensure output directory | |
os.makedirs(image_dir, exist_ok=True) | |
filename = f"flux_{int(time.time())}.png" | |
path = os.path.join(image_dir, filename) | |
try: | |
image.save(path, format="PNG") | |
logger.info(f"Image saved to {path}") | |
except Exception as e: | |
logger.error(f"Failed to save image: {e}") | |
return None | |
# upload | |
image_url = upload_to_imgbb(path, filename) or "" | |
return image, path, returned_prompt.split("\n\n", 1)[0], image_url | |