import os import requests import zipfile import gradio as gr import shutil from PIL import Image import random import time import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration OUTPUT_DIR = "downloaded_images" IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") ZIP_FILE = os.path.join(OUTPUT_DIR, "images.zip") os.makedirs(OUTPUT_DIR, exist_ok=True) # Constants ITEMS_PER_PAGE = 40 DAILY_IMAGE_LIMIT = 2000 MAX_PAGES = min(DAILY_IMAGE_LIMIT // ITEMS_PER_PAGE, 10) IMAGES_PER_ROW = 4 MAX_ROWS = 6 TOTAL_IMAGES = IMAGES_PER_ROW * MAX_ROWS # API Configurations (Your original keys) API_CONFIGS = { "pexels": { "base_url": "https://api.pexels.com/v1/search", "headers": {"Authorization": "klHADHclpse2e2xSP9h747AgfE1Rx0wioemGhXYtedjZzvJ1WBUKwz7g"}, "image_key": ["src", "medium"], "result_key": "photos", "delay": 2 }, "unsplash": { "base_url": "https://api.unsplash.com/search/photos", "headers": {}, "image_key": ["urls", "small"], "result_key": "results", "delay": 2, "client_id": "UKkhpD_Rs5-s1gIlVX28iNs_8E4ysPhQniyIpDpKUnU" }, "pixabay": { "base_url": "https://pixabay.com/api/", "headers": {}, "image_key": ["webformatURL"], "result_key": "hits", "delay": 1, "api_key": "45122300-cd3621e1539e8e95430ee3efc" } } def fetch_image_urls(api_name, category, num_images): config = API_CONFIGS[api_name] num_pages_needed = (num_images + ITEMS_PER_PAGE - 1) // ITEMS_PER_PAGE # Select random pages to fetch from (no tracking) all_pages = list(range(1, MAX_PAGES + 1)) if len(all_pages) < num_pages_needed: logger.warning(f"Insufficient pages available: {len(all_pages)} < {num_pages_needed}") return [] selected_pages = random.sample(all_pages, num_pages_needed) logger.info(f"Selected pages for {api_name}: {selected_pages}") image_urls = [] for page in selected_pages: if api_name == "pixabay": url = f"{config['base_url']}?key={config['api_key']}&q={category.lower()}&per_page={ITEMS_PER_PAGE}&page={page}" params = {} elif api_name == "unsplash": url = config["base_url"] params = { "query": category.lower(), "per_page": ITEMS_PER_PAGE, "page": page, "client_id": config["client_id"] } else: # pexels url = config["base_url"] params = { "query": category.lower(), "per_page": ITEMS_PER_PAGE, "page": page } try: logger.info(f"Requesting {url} with params {params} for {api_name}") time.sleep(config.get("delay", 0)) response = requests.get(url, headers=config["headers"], params=params, timeout=10) response.raise_for_status() data_response = response.json() results = data_response.get(config["result_key"], []) if not results: logger.warning(f"No {config['result_key']} in response for {api_name}, page {page}: {data_response}") continue page_urls = [] for item in results: if len(image_urls) >= num_images: break image_url = item for key in config["image_key"]: image_url = image_url.get(key) if isinstance(image_url, dict) else None if image_url is None: break if image_url: page_urls.append(image_url) else: logger.warning(f"No image URL found in item for {api_name}: {item}") if page_urls: image_urls.extend(page_urls) logger.info(f"Successfully fetched {len(page_urls)} images from page {page} for {api_name}") else: logger.warning(f"No valid URLs extracted from page {page} for {api_name}") except requests.exceptions.RequestException as e: logger.error(f"Error fetching page {page} from {api_name}: {e}") logger.error(f"Response: {response.text}") if response.status_code == 401: logger.error(f"401 Unauthorized for {api_name}. Replace API key.") elif response.status_code == 429: logger.error(f"429 Rate Limit Exceeded for {api_name}. Increase delay or wait.") break logger.info(f"Total URLs fetched for {api_name}: {len(image_urls)}") return image_urls[:num_images] def download_images(image_urls): if not image_urls: logger.warning("No image URLs provided to download") return 0, [] if os.path.exists(IMAGES_DIR): shutil.rmtree(IMAGES_DIR) os.makedirs(IMAGES_DIR, exist_ok=True) downloaded_count = 0 image_paths = [] for idx, url in enumerate(image_urls, 1): try: response = requests.get(url, stream=True, timeout=10) response.raise_for_status() image_path = os.path.join(IMAGES_DIR, f"img{idx}.jpg") with open(image_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) Image.open(image_path).verify() downloaded_count += 1 image_paths.append(image_path) logger.info(f"Downloaded {idx}/{len(image_urls)}: {url}") except requests.exceptions.RequestException as e: logger.error(f"Error downloading {url}: {e}") except Exception as e: logger.error(f"Invalid image or error saving {url}: {e}") logger.info(f"Total images downloaded: {downloaded_count}/{len(image_urls)}") return downloaded_count, image_paths def create_zip_file(selected_image_paths): if os.path.exists(ZIP_FILE): os.remove(ZIP_FILE) with zipfile.ZipFile(ZIP_FILE, "w", zipfile.ZIP_DEFLATED) as zipf: for image_path in selected_image_paths: arcname = os.path.relpath(image_path, OUTPUT_DIR) zipf.write(image_path, arcname) return ZIP_FILE def process_and_display(api_name, category, num_images): num_images = int(num_images) if num_images > TOTAL_IMAGES: num_images = TOTAL_IMAGES logger.info(f"Starting process for {api_name} with category '{category}' and {num_images} images") image_urls = fetch_image_urls(api_name, category, num_images) if not image_urls: logger.warning(f"No images fetched from {api_name}") return "No images available or API limit reached. Check logs for details.", None, [], [None] * TOTAL_IMAGES, [False] * TOTAL_IMAGES logger.info(f"Proceeding to download {len(image_urls)} images from {api_name}") downloaded_count, image_paths = download_images(image_urls) if downloaded_count == 0: logger.warning(f"No images downloaded from {api_name}") return "No images were successfully downloaded. Check logs for details.", None, [], [None] * TOTAL_IMAGES, [False] * TOTAL_IMAGES status = f"Successfully downloaded {downloaded_count}/{num_images} images from {api_name}. Select images to include in ZIP below." image_outputs = [image_paths[i] if i < len(image_paths) else None for i in range(TOTAL_IMAGES)] checkbox_outputs = [True if i < len(image_paths) else False for i in range(TOTAL_IMAGES)] logger.info(f"Process completed for {api_name}: {downloaded_count} images prepared for display") return status, None, image_paths, image_outputs, checkbox_outputs def process_zip_submission(image_paths, *checkbox_states): if not image_paths: return "No images available to process.", None selected_image_paths = [image_paths[i] for i, state in enumerate(checkbox_states) if state] if not selected_image_paths: return "No images selected for ZIP.", None zip_path = create_zip_file(selected_image_paths) logger.info(f"ZIP file created with {len(selected_image_paths)} images at {zip_path}") return f"ZIP file created with {len(selected_image_paths)} images at {zip_path}", zip_path # Gradio Interface css = """ .fetch-button { background-color: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; } .fetch-button:hover { background-color: #45a049; } .zip-button { background-color: #2196F3; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; } .zip-button:hover { background-color: #1e88e5; } .status-box { border: 1px solid #ddd; background-color: #f9f9f9; padding: 10px; border-radius: 5px; } .input-group { border: 1px solid #ddd; padding: 15px; border-radius: 5px; background-color: #f0f0f0; } .image-container { position: relative; width: 100%; height: 150px; overflow: hidden; border-radius: 5px; } .image-container img { width: 100%; height: 100%; object-fit: cover; } .overlay { position: absolute; bottom: 5px; right: 5px; background-color: rgba(0, 0, 0, 0.6); padding: 5px; border-radius: 5px; display: flex; align-items: center; gap: 5px; color: white; font-size: 12px; } .overlay label { margin: 0; color: white; } .overlay input[type="checkbox"] { margin: 0; } """ with gr.Blocks(title="Stock Photo Downloader", css=css) as demo: gr.Markdown("

📸 Stock Photo Downloader

") gr.Markdown("

Fetch high-quality stock photos from Pexels, Unsplash, and Pixabay.

") with gr.Group(elem_classes=["input-group"]): gr.Markdown("### 🔍 Choose Your Parameters") with gr.Row(): api_input = gr.Dropdown(label="API Source", choices=["pexels", "unsplash", "pixabay"], value="pexels", info="Select the stock photo provider.") category_input = gr.Dropdown(label="Category", choices=["nature", "business", "people", "technology", "food", "travel", "animals", "fashion"], value="nature", allow_custom_value=True, info="Choose a category or enter a custom keyword.") num_images_input = gr.Dropdown(label="Number of Images (Max 24)", choices=["4", "8", "12", "16", "20", "24"], value="4", info="How many images to fetch (up to 24).") download_button = gr.Button("Fetch and Display Images", elem_classes=["fetch-button"]) # Combine Status and Download sections in a single row with two columns with gr.Row(): with gr.Column(): gr.Markdown("### 📊 Status") status_output = gr.Textbox( label="Status", interactive=False, placeholder="Status updates will appear here...", elem_classes=["status-box"], show_label=False ) with gr.Column(): gr.Markdown("### 💾 Download Your Images") zip_output = gr.File(label="Download ZIP", visible=False) gr.Markdown("### 🖼️ Image Gallery") gr.Markdown("

Select images to include in your ZIP file.

") image_paths_state = gr.State() # Fixed grid layout with dynamically visible rows image_outputs = [] checkbox_outputs = [] gallery_rows = [] for row in range(MAX_ROWS): with gr.Row(visible=False) as row_component: gallery_rows.append(row_component) for col in range(IMAGES_PER_ROW): idx = row * IMAGES_PER_ROW + col with gr.Column(min_width=150): with gr.Group(elem_classes=["image-container"]): image_output = gr.Image( label=f"Image {idx+1}", show_label=False, visible=False, height=150, width=150 ) with gr.Row(elem_classes=["overlay"]): checkbox_output = gr.Checkbox( label=f"Image {idx+1}", value=False, visible=False, scale=0 ) image_outputs.append(image_output) checkbox_outputs.append(checkbox_output) gr.Markdown("### 📦 Create ZIP File") submit_button = gr.Button("Create ZIP of Selected Images", elem_classes=["zip-button"]) def on_download(api_name, category, num_images): status, zip_path, image_paths, image_outs, checkbox_outs = process_and_display(api_name, category, num_images) num_downloaded = len(image_paths) # Update visibility for images and checkboxes image_updates = [ gr.Image(value=img, visible=img is not None, label=f"Image {i+1}", height=150, width=150) for i, img in enumerate(image_outs) ] checkbox_updates = [ gr.Checkbox(value=chk, visible=i < num_downloaded, label=f"Image {i+1}", scale=0) for i, chk in enumerate(checkbox_outs) ] # Update row visibility: show a row only if it contains at least one visible image row_updates = [] for row_idx in range(MAX_ROWS): start_idx = row_idx * IMAGES_PER_ROW end_idx = start_idx + IMAGES_PER_ROW row_has_images = any(image_outs[i] is not None for i in range(start_idx, min(end_idx, len(image_outs)))) row_updates.append(gr.Row(visible=row_has_images)) return ( status, zip_path, image_paths, *image_updates, *checkbox_updates, *row_updates ) def on_submit(image_paths, *checkbox_states): status, zip_path = process_zip_submission(image_paths, *checkbox_states) return status, gr.File(value=zip_path, visible=True) if zip_path else gr.File(visible=False) download_button.click( fn=on_download, inputs=[api_input, category_input, num_images_input], outputs=[status_output, zip_output, image_paths_state] + image_outputs + checkbox_outputs + gallery_rows ) submit_button.click( fn=on_submit, inputs=[image_paths_state] + checkbox_outputs, outputs=[status_output, zip_output] ) demo.launch()