Spaces:
Sleeping
Sleeping
import time | |
import logging | |
import argparse | |
import os | |
import json | |
import random | |
import re | |
import uuid | |
from collections import defaultdict | |
from datetime import datetime | |
from typing import List, Dict, Any, Optional, Union, Tuple | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.common.action_chains import ActionChains | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import ( | |
TimeoutException, NoSuchElementException, WebDriverException | |
) | |
import gradio as gr | |
import pandas as pd | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
# Predefined advertisers list | |
ADVERTISERS = [ | |
{"id": "AR10051102910143528961", "name": "Theory Sabers"}, | |
{"id": "AR12645693856247971841", "name": "Artsabers"}, | |
{"id": "AR07257050693515608065", "name": "bmlightsabers"}, | |
{"id": "AR01506694249926623233", "name": "Padawan Outpost Ltd"}, | |
{"id": "AR10584025853845307393", "name": "GalaxySabers"}, | |
{"id": "AR16067963414479110145", "name": "nsabers"}, | |
{"id": "AR12875519274243850241", "name": "es-sabers"}, | |
{"id": "AR05144647067079016449", "name": "Ultra Sabers"}, | |
{"id": "AR15581800501283389441", "name": "SuperNeox"}, | |
{"id": "AR06148907109187584001", "name": "Sabertrio"} | |
] | |
##################################### | |
### FACEBOOK SCRAPER SECTION ####### | |
##################################### | |
# Constants for Facebook Scraper | |
FB_DEFAULT_TIMEOUT = 60 # seconds | |
FB_MIN_WAIT_TIME = 1 # minimum seconds for random waits | |
FB_MAX_WAIT_TIME = 3 # maximum seconds for random waits | |
FB_MAX_SCROLL_ATTEMPTS = 5 # maximum number of scroll attempts | |
FB_SELECTOR_HISTORY_FILE = "fb_selector_stats.json" # File to store selector success stats | |
# User agents for rotation | |
USER_AGENTS = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/118.0" | |
] | |
# Viewport sizes for randomization | |
VIEWPORT_SIZES = [ | |
(1366, 768), | |
(1920, 1080), | |
(1536, 864), | |
(1440, 900) | |
] | |
class SelectorStats: | |
"""Class to track and optimize selector performance""" | |
def __init__(self, file_path=FB_SELECTOR_HISTORY_FILE): | |
self.file_path = file_path | |
self.stats = self._load_stats() | |
def _load_stats(self) -> Dict: | |
"""Load stats from file or initialize if not exists""" | |
if os.path.exists(self.file_path): | |
try: | |
with open(self.file_path, 'r') as f: | |
return json.load(f) | |
except (json.JSONDecodeError, IOError) as e: | |
logger.warning(f"Error loading selector stats: {e}, initializing new stats") | |
# Initialize structure for platform stats | |
return { | |
"facebook": {"selectors": {}, "last_updated": datetime.now().isoformat()} | |
} | |
def update_selector_success(self, selector: str, count: int = 1) -> None: | |
"""Record successful use of a selector""" | |
platform = "facebook" # Only using Facebook for this version | |
if platform not in self.stats: | |
self.stats[platform] = {"selectors": {}, "last_updated": datetime.now().isoformat()} | |
if selector not in self.stats[platform]["selectors"]: | |
self.stats[platform]["selectors"][selector] = {"successes": 0, "attempts": 0} | |
self.stats[platform]["selectors"][selector]["successes"] += count | |
self.stats[platform]["selectors"][selector]["attempts"] += 1 | |
self.stats[platform]["last_updated"] = datetime.now().isoformat() | |
# Save after each update | |
self._save_stats() | |
def update_selector_attempt(self, selector: str) -> None: | |
"""Record attempt to use a selector regardless of success""" | |
platform = "facebook" # Only using Facebook for this version | |
if platform not in self.stats: | |
self.stats[platform] = {"selectors": {}, "last_updated": datetime.now().isoformat()} | |
if selector not in self.stats[platform]["selectors"]: | |
self.stats[platform]["selectors"][selector] = {"successes": 0, "attempts": 0} | |
self.stats[platform]["selectors"][selector]["attempts"] += 1 | |
self.stats[platform]["last_updated"] = datetime.now().isoformat() | |
# Don't save on every attempt to reduce disk I/O | |
def get_best_selectors(self, min_attempts: int = 3, max_count: int = 10) -> List[str]: | |
"""Get the best performing selectors for Facebook""" | |
platform = "facebook" # Only using Facebook for this version | |
if platform not in self.stats: | |
return [] | |
selectors = [] | |
for selector, data in self.stats[platform]["selectors"].items(): | |
if data["attempts"] >= min_attempts: | |
success_rate = data["successes"] / data["attempts"] if data["attempts"] > 0 else 0 | |
selectors.append((selector, success_rate)) | |
# Sort by success rate (descending) | |
selectors.sort(key=lambda x: x[1], reverse=True) | |
# Return top N selectors | |
return [s[0] for s in selectors[:max_count]] | |
def _save_stats(self) -> None: | |
"""Save stats to file""" | |
try: | |
with open(self.file_path, 'w') as f: | |
json.dump(self.stats, f, indent=2) | |
except IOError as e: | |
logger.error(f"Error saving selector stats: {e}") | |
class FacebookAdsScraper: | |
def __init__(self, headless=True, debug_mode=False): | |
"""Initialize the ads scraper with browser configuration""" | |
self.debug_mode = debug_mode | |
self.headless = headless | |
self.driver = self._setup_driver(headless) | |
# Initialize selector stats tracker | |
self.selector_stats = SelectorStats() | |
# Track navigation history for smart retry | |
self.navigation_history = [] | |
# Track success/failure for self-healing | |
self.success_rate = defaultdict(lambda: {"success": 0, "failure": 0}) | |
# Generate a session ID for this scraping session | |
self.session_id = str(uuid.uuid4())[:8] | |
def _setup_driver(self, headless): | |
"""Set up and configure the Chrome WebDriver with anti-detection measures""" | |
chrome_options = Options() | |
if headless: | |
chrome_options.add_argument("--headless") | |
# Select a random user agent | |
user_agent = random.choice(USER_AGENTS) | |
chrome_options.add_argument(f"--user-agent={user_agent}") | |
logger.info(f"Using user agent: {user_agent}") | |
# Select a random viewport size | |
viewport_width, viewport_height = random.choice(VIEWPORT_SIZES) | |
chrome_options.add_argument(f"--window-size={viewport_width},{viewport_height}") | |
logger.info(f"Using viewport size: {viewport_width}x{viewport_height}") | |
# Add common options to avoid detection | |
chrome_options.add_argument("--disable-blink-features=AutomationControlled") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--disable-gpu") | |
chrome_options.add_argument("--start-maximized") | |
chrome_options.add_argument("--enable-unsafe-swiftshader") | |
# Performance improvements | |
chrome_options.add_argument("--disable-extensions") | |
chrome_options.add_argument("--disable-notifications") | |
chrome_options.add_argument("--blink-settings=imagesEnabled=true") | |
# Add experimental options to avoid detection | |
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) | |
chrome_options.add_experimental_option("useAutomationExtension", False) | |
# Additional preferences to improve performance | |
chrome_options.add_experimental_option("prefs", { | |
"profile.default_content_setting_values.notifications": 2, | |
"profile.managed_default_content_settings.images": 1, | |
"profile.managed_default_content_settings.cookies": 1, | |
# Add some randomness to the profile | |
"profile.default_content_setting_values.plugins": random.randint(1, 3), | |
"profile.default_content_setting_values.popups": random.randint(1, 2) | |
}) | |
try: | |
# Try to create driver with service for newer Selenium versions | |
service = Service() | |
driver = webdriver.Chrome(service=service, options=chrome_options) | |
# Execute CDP commands to avoid detection (works in newer Chrome versions) | |
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { | |
"source": """ | |
Object.defineProperty(navigator, 'webdriver', { | |
get: () => undefined | |
}); | |
// Overwrite the languages with random order | |
Object.defineProperty(navigator, 'languages', { | |
get: () => ['en-US', 'en', 'de'].sort(() => 0.5 - Math.random()) | |
}); | |
// Modify plugins length | |
Object.defineProperty(navigator, 'plugins', { | |
get: () => { | |
// Randomize plugins length between 3 and 7 | |
const len = Math.floor(Math.random() * 5) + 3; | |
const plugins = { length: len }; | |
for (let i = 0; i < len; i++) { | |
plugins[i] = { | |
name: ['Flash', 'Chrome PDF Plugin', 'Native Client', 'Chrome PDF Viewer'][Math.floor(Math.random() * 4)], | |
filename: ['internal-pdf-viewer', 'mhjfbmdgcfjbbpaeojofohoefgiehjai', 'internal-nacl-plugin'][Math.floor(Math.random() * 3)] | |
}; | |
} | |
return plugins; | |
} | |
}); | |
""" | |
}) | |
except TypeError: | |
# Fallback for older Selenium versions | |
driver = webdriver.Chrome(options=chrome_options) | |
except Exception as e: | |
# If there's an issue with CDP, continue anyway | |
logger.warning(f"CDP command failed, continuing: {e}") | |
driver = webdriver.Chrome(options=chrome_options) | |
# Set default timeout | |
driver.set_page_load_timeout(FB_DEFAULT_TIMEOUT) | |
return driver | |
def random_wait(self, min_time=None, max_time=None): | |
"""Wait for a random amount of time to simulate human behavior""" | |
min_time = min_time or FB_MIN_WAIT_TIME | |
max_time = max_time or FB_MAX_WAIT_TIME | |
wait_time = random.uniform(min_time, max_time) | |
time.sleep(wait_time) | |
return wait_time | |
def human_like_scroll(self, scroll_attempts=None): | |
"""Scroll down the page in a human-like way""" | |
attempts = scroll_attempts or random.randint(3, FB_MAX_SCROLL_ATTEMPTS) | |
# Get page height before scrolling | |
initial_height = self.driver.execute_script("return document.body.scrollHeight") | |
for i in range(attempts): | |
# Calculate a random scroll amount (25-90% of viewport) | |
scroll_percent = random.uniform(0.25, 0.9) | |
viewport_height = self.driver.execute_script("return window.innerHeight") | |
scroll_amount = int(viewport_height * scroll_percent) | |
# Scroll with a random speed | |
scroll_steps = random.randint(5, 15) | |
current_position = self.driver.execute_script("return window.pageYOffset") | |
target_position = current_position + scroll_amount | |
for step in range(scroll_steps): | |
# Calculate next position with easing | |
t = (step + 1) / scroll_steps | |
# Ease in-out function | |
factor = t * t * (3.0 - 2.0 * t) | |
next_position = current_position + (target_position - current_position) * factor | |
self.driver.execute_script(f"window.scrollTo(0, {next_position})") | |
time.sleep(random.uniform(0.01, 0.05)) | |
# Occasionally pause longer as if reading content | |
if random.random() < 0.3: # 30% chance to pause | |
self.random_wait(1.5, 3.5) | |
else: | |
self.random_wait(0.5, 1.5) | |
# Log progress | |
logger.info(f"Human-like scroll {i + 1}/{attempts} completed") | |
# Check if we've reached the bottom of the page | |
new_height = self.driver.execute_script("return document.body.scrollHeight") | |
if new_height == initial_height and i > 1: | |
# We haven't loaded new content after a couple of scrolls | |
# Do one big scroll to the bottom to trigger any lazy loading | |
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)") | |
self.random_wait() | |
initial_height = new_height | |
def simulate_human_behavior(self): | |
"""Simulate random human-like interactions with the page""" | |
# Random chance to move the mouse around | |
if random.random() < 0.7: # 70% chance | |
try: | |
# Find a random element to hover over | |
elements = self.driver.find_elements(By.CSS_SELECTOR, "a, button, input, div") | |
if elements: | |
element = random.choice(elements) | |
ActionChains(self.driver).move_to_element(element).perform() | |
self.random_wait(0.2, 1.0) | |
except: | |
# Ignore any errors, this is just for randomness | |
pass | |
# Random chance to click somewhere non-interactive | |
if random.random() < 0.2: # 20% chance | |
try: | |
# Find a safe area to click (like a paragraph or heading) | |
safe_elements = self.driver.find_elements(By.CSS_SELECTOR, "p, h1, h2, h3, h4, span") | |
if safe_elements: | |
safe_element = random.choice(safe_elements) | |
ActionChains(self.driver).move_to_element(safe_element).click().perform() | |
self.random_wait(0.2, 1.0) | |
except: | |
# Ignore any errors, this is just for randomness | |
pass | |
def check_headless_visibility(self): | |
""" | |
Check if elements are visible in headless mode | |
Returns True if everything is working properly | |
""" | |
if not self.headless: | |
# If not in headless mode, no need to check | |
return True | |
logger.info("Performing headless visibility check...") | |
# Use a simpler page for testing interactivity | |
test_url = "https://www.example.com" | |
try: | |
self.driver.get(test_url) | |
# Just check if the page loads at all - don't try to interact with elements | |
WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
logger.info("Headless check passed: Page loaded successfully") | |
return True | |
except Exception as e: | |
logger.error(f"Headless check failed: {e}") | |
# Try switching to non-headless mode | |
logger.info("Switching to non-headless mode...") | |
self.driver.quit() | |
self.headless = False | |
self.driver = self._setup_driver(headless=False) | |
return True # Continue without rechecking | |
def fetch_facebook_ads(self, query): | |
"""Fetch ads from Facebook's Ad Library with anti-detection measures""" | |
ads_data = [] | |
base_url = "https://www.facebook.com/ads/library/" | |
logger.info(f"Fetching Facebook ads for {query}") | |
try: | |
# Add some randomness to URL parameters | |
params = { | |
"active_status": "all", | |
"ad_type": "all", | |
"country": "ALL", | |
"q": query, | |
# Random parameters to avoid fingerprinting | |
"_": int(time.time() * 1000), | |
"session_id": self.session_id | |
} | |
# Construct URL with parameters | |
url = base_url + "?" + "&".join(f"{k}={v}" for k, v in params.items()) | |
logger.info(f"Navigating to Facebook URL: {url}") | |
# Navigate to the URL | |
self.driver.get(url) | |
# Wait for page to initially load | |
try: | |
WebDriverWait(self.driver, FB_DEFAULT_TIMEOUT).until( | |
EC.any_of( | |
EC.presence_of_element_located((By.CSS_SELECTOR, "div[role='main']")), | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
) | |
except TimeoutException: | |
logger.warning("Timeout waiting for Facebook page to load initially, continuing anyway") | |
# Human-like scrolling to trigger lazy loading | |
self.human_like_scroll() | |
# Simulate human behavior | |
self.simulate_human_behavior() | |
# Save debug data at this point | |
if self.debug_mode: | |
self._save_debug_data("facebook_after_scroll", query) | |
# Find ad elements using self-healing selectors | |
ad_elements = self._find_facebook_ad_elements() | |
if not ad_elements: | |
logger.info("No Facebook ads found") | |
if self.debug_mode: | |
self._save_debug_data("facebook_no_ads", query) | |
# Return placeholder data as fallback | |
return self._generate_placeholder_facebook_data(query) | |
# Process the found ad elements | |
for i, ad in enumerate(ad_elements[:10]): # Limit to 10 ads for performance | |
try: | |
ad_data = { | |
"platform": "Facebook", | |
"query": query, | |
"timestamp": datetime.now().isoformat(), | |
"index": i + 1, | |
"session_id": self.session_id | |
} | |
# Extract data using smarter methods | |
full_text = ad.text.strip() | |
# Log first ad text for debugging | |
if i == 0: | |
logger.info(f"First Facebook ad full text (first 150 chars): {full_text[:150]}...") | |
# Smart data extraction | |
extracted_data = self._extract_facebook_ad_data(ad, full_text) | |
# Merge extracted data | |
ad_data.update(extracted_data) | |
# Add fallback values if needed | |
if "advertiser" not in ad_data or not ad_data["advertiser"]: | |
ad_data["advertiser"] = "Unknown Advertiser" | |
if "text" not in ad_data or not ad_data["text"]: | |
ad_data["text"] = "Ad content not available" | |
ads_data.append(ad_data) | |
except Exception as e: | |
logger.warning(f"Error processing Facebook ad {i + 1}: {e}") | |
return ads_data if ads_data else self._generate_placeholder_facebook_data(query) | |
except Exception as e: | |
logger.error(f"Error fetching Facebook ads: {e}") | |
if self.debug_mode: | |
self._save_debug_data("facebook_error", query) | |
return self._generate_placeholder_facebook_data(query) | |
def _find_facebook_ad_elements(self): | |
"""Find Facebook ad elements using a self-healing selector strategy""" | |
# Historical best performers | |
historical_best = self.selector_stats.get_best_selectors() | |
# Base selectors | |
base_selectors = [ | |
"div[class*='_7jvw']", | |
"div[data-testid='ad_library_card']", | |
"div[class*='AdLibraryCard']", | |
"div.AdLibraryCard", | |
"div[class*='adCard']", | |
"div[class*='ad_card']" | |
] | |
# Combine selectors, prioritizing historical best | |
combined_selectors = historical_best + [s for s in base_selectors if s not in historical_best] | |
# Try each selector | |
for selector in combined_selectors: | |
try: | |
# Record attempt | |
self.selector_stats.update_selector_attempt(selector) | |
elements = self.driver.find_elements(By.CSS_SELECTOR, selector) | |
if elements: | |
logger.info(f"Found {len(elements)} Facebook ads using selector: {selector}") | |
# Record success | |
self.selector_stats.update_selector_success(selector, len(elements)) | |
return elements | |
except Exception as e: | |
logger.debug(f"Facebook selector {selector} failed: {e}") | |
# No elements found with standard selectors, try a more aggressive approach | |
try: | |
# Look for text patterns that typically appear in ads | |
patterns = [ | |
"//div[contains(., 'Library ID:')]", | |
"//div[contains(., 'Sponsored')]", | |
"//div[contains(., 'Active')][contains(., 'Library ID')]", | |
"//div[contains(., 'Inactive')][contains(., 'Library ID')]" | |
] | |
for pattern in patterns: | |
elements = self.driver.find_elements(By.XPATH, pattern) | |
if elements: | |
ad_containers = [] | |
for element in elements: | |
try: | |
# Try to find containing card by navigating up | |
container = element | |
for _ in range(5): # Try up to 5 levels up | |
if container.get_attribute("class") and "card" in container.get_attribute( | |
"class").lower(): | |
ad_containers.append(container) | |
break | |
container = container.find_element(By.XPATH, "..") | |
except: | |
continue | |
if ad_containers: | |
logger.info(f"Found {len(ad_containers)} Facebook ads using text pattern approach") | |
# Record this special method | |
self.selector_stats.update_selector_success("text_pattern_method", len(ad_containers)) | |
return ad_containers | |
except Exception as e: | |
logger.debug(f"Facebook text pattern approach failed: {e}") | |
return [] | |
def _extract_facebook_ad_data(self, ad_element, full_text): | |
"""Extract data from Facebook ad using multiple intelligent methods""" | |
extracted_data = {} | |
# Process text content if available | |
if full_text: | |
# Split into lines | |
lines = full_text.split('\n') | |
# Check for status (Active/Inactive) | |
if lines and lines[0] in ["Active", "Inactive"]: | |
extracted_data["status"] = lines[0] | |
# Look for advertiser - typically after "See ad details" | |
for i, line in enumerate(lines): | |
if "See ad details" in line or "See summary details" in line: | |
if i + 1 < len(lines): | |
extracted_data["advertiser"] = lines[i + 1].strip() | |
break | |
else: | |
# First line is likely the advertiser | |
if lines: | |
extracted_data["advertiser"] = lines[0].strip() | |
# Extract ad content | |
# Look for patterns to determine content boundaries | |
content_start_idx = -1 | |
content_end_idx = len(lines) | |
# Find where "Sponsored" appears | |
for i, line in enumerate(lines): | |
if "Sponsored" in line: | |
content_start_idx = i + 1 | |
break | |
# If no "Sponsored" found, look for advertiser + status | |
if content_start_idx == -1: | |
# Skip metadata lines | |
metadata_patterns = [ | |
"Library ID:", | |
"Started running on", | |
"Platforms", | |
"Open Drop-down", | |
"See ad details", | |
"See summary details", | |
"This ad has multiple versions" | |
] | |
for i, line in enumerate(lines): | |
if any(pattern in line for pattern in metadata_patterns): | |
continue | |
if i > 0: # Skip first line (advertiser) | |
content_start_idx = i | |
break | |
# Find where UI elements start | |
ui_elements = [ | |
"Like", "Comment", "Share", "Learn More", "Shop Now", | |
"Sign Up", "Visit Instagram profile", "See More" | |
] | |
for i, line in enumerate(lines): | |
# Skip lines before content start | |
if i <= content_start_idx: | |
continue | |
if any(ui in line for ui in ui_elements): | |
content_end_idx = i | |
break | |
# Extract content between boundaries | |
if content_start_idx != -1 and content_start_idx < content_end_idx: | |
content_lines = lines[content_start_idx:content_end_idx] | |
extracted_data["text"] = "\n".join(content_lines).strip() | |
# If text extraction failed, try element-based approaches | |
if "text" not in extracted_data or not extracted_data["text"]: | |
facebook_text_selectors = [ | |
"div[data-ad-preview='message']", # Direct message container | |
"div[class*='_7jy6']", # Known ad text container | |
"div[data-testid='ad-creative-text']", # Test ID for ad text | |
"div[class*='_38ki']", # Another text container | |
"span[class*='_7oe']", # Text span | |
"div.text_exposed_root" # Exposed text root | |
] | |
for selector in facebook_text_selectors: | |
try: | |
elements = ad_element.find_elements(By.CSS_SELECTOR, selector) | |
text_content = " ".join([e.text.strip() for e in elements if e.text.strip()]) | |
if text_content: | |
extracted_data["text"] = text_content | |
break | |
except: | |
pass | |
# If advertiser extraction failed, try element-based approaches | |
if "advertiser" not in extracted_data or not extracted_data["advertiser"]: | |
facebook_advertiser_selectors = [ | |
"span[class*='fsl']", # Facebook specific large text class | |
"a[aria-label*='profile']", # Profile links often contain advertiser name | |
"h4", # Often contains advertiser name | |
"div[class*='_8jh5']", # Known advertiser class | |
"a[role='link']", # Links are often advertiser names | |
"div[class*='_3qn7']", # Another known advertiser container | |
"div[class*='_7jvw'] a", # Links within the ad card | |
] | |
for selector in facebook_advertiser_selectors: | |
try: | |
elements = ad_element.find_elements(By.CSS_SELECTOR, selector) | |
for element in elements: | |
text = element.text.strip() | |
if text and len(text) < 50: # Advertiser names are usually short | |
extracted_data["advertiser"] = text | |
break | |
if "advertiser" in extracted_data and extracted_data["advertiser"]: | |
break | |
except: | |
pass | |
return extracted_data | |
def _generate_placeholder_facebook_data(self, query): | |
"""Generate placeholder Facebook ad data when real ads cannot be scraped""" | |
logger.info(f"Returning placeholder Facebook ad data for query: {query}") | |
return [ | |
{ | |
"platform": "Facebook", | |
"query": query, | |
"advertiser": "Placeholder Advertiser 1", | |
"text": f"This is a placeholder ad for {query} since no actual ads could be scraped.", | |
"timestamp": datetime.now().isoformat(), | |
"index": 1, | |
"is_placeholder": True, | |
"session_id": self.session_id | |
}, | |
{ | |
"platform": "Facebook", | |
"query": query, | |
"advertiser": "Placeholder Advertiser 2", | |
"text": f"Another placeholder ad for {query}. Please check your scraping settings.", | |
"timestamp": datetime.now().isoformat(), | |
"index": 2, | |
"is_placeholder": True, | |
"session_id": self.session_id | |
} | |
] | |
def _save_debug_data(self, prefix, query): | |
"""Save debugging data for investigation""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
debug_dir = "debug_data" | |
if not os.path.exists(debug_dir): | |
os.makedirs(debug_dir) | |
# Save screenshot | |
screenshot_path = f"{debug_dir}/{prefix}_{query}_{timestamp}.png" | |
self.driver.save_screenshot(screenshot_path) | |
logger.info(f"Saved debug screenshot to {screenshot_path}") | |
# Save HTML | |
html_path = f"{debug_dir}/{prefix}_{query}_{timestamp}.html" | |
with open(html_path, "w", encoding="utf-8") as f: | |
f.write(self.driver.page_source) | |
logger.info(f"Saved debug HTML to {html_path}") | |
# Save sample of first ad structure if available | |
try: | |
ad_elements = self.driver.find_elements(By.CSS_SELECTOR, "div[class*='_7jvw']") | |
if ad_elements: | |
first_ad = ad_elements[0] | |
# Get sample HTML structure | |
first_ad_html = first_ad.get_attribute('outerHTML') | |
# Save first ad HTML | |
sample_path = f"{debug_dir}/{prefix}_sample_ad_{timestamp}.html" | |
with open(sample_path, "w", encoding="utf-8") as f: | |
f.write(first_ad_html) | |
logger.info(f"Saved sample ad HTML to {sample_path}") | |
# Log the text structure | |
logger.info(f"Sample ad text structure: {first_ad.text[:300]}...") | |
except Exception as e: | |
logger.error(f"Error saving ad sample: {e}") | |
def close(self): | |
"""Close the WebDriver and save stats""" | |
if self.driver: | |
self.driver.quit() | |
# Save selector stats one last time | |
self.selector_stats._save_stats() | |
# Facebook Gradio Interface Function | |
def fetch_facebook_ads(query): | |
"""Fetch Facebook ads only for Gradio interface""" | |
logger.info(f"Processing Facebook ad search for: {query}") | |
scraper = FacebookAdsScraper(headless=True, debug_mode=True) | |
# Perform headless check first | |
visibility_ok = scraper.check_headless_visibility() | |
if not visibility_ok: | |
logger.warning("Headless visibility check failed, results may be affected") | |
# Fetch ads from Facebook | |
facebook_ads = scraper.fetch_facebook_ads(query) | |
# Format for display | |
formatted_results = [] | |
for ad in facebook_ads: | |
formatted_ad = f"Platform: {ad['platform']}\n" | |
# Include status if available | |
if 'status' in ad: | |
formatted_ad += f"Status: {ad['status']}\n" | |
formatted_ad += f"Advertiser: {ad['advertiser']}\n" | |
# Format ad text with word wrapping | |
text_lines = [] | |
if ad['text'] and ad['text'] != "Ad content not available": | |
# Split long text into readable chunks | |
words = ad['text'].split() | |
current_line = "" | |
for word in words: | |
if len(current_line) + len(word) + 1 <= 80: # 80 chars per line | |
current_line += (" " + word if current_line else word) | |
else: | |
text_lines.append(current_line) | |
current_line = word | |
if current_line: | |
text_lines.append(current_line) | |
formatted_text = "\n".join(text_lines) | |
else: | |
formatted_text = ad['text'] | |
formatted_ad += f"Ad Text: {formatted_text}\n" | |
formatted_ad += f"Timestamp: {ad['timestamp']}\n" | |
if ad.get('is_placeholder', False): | |
formatted_ad += "[THIS IS PLACEHOLDER DATA]\n" | |
formatted_ad += "-" * 50 | |
formatted_results.append(formatted_ad) | |
scraper.close() | |
return "\n\n".join(formatted_results) if formatted_results else "No Facebook ads found for your query." | |
# Create a function to save ads to JSON | |
def save_ads_to_json(ads, query): | |
"""Save ads to a JSON file""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"facebook_ads_{query.replace(' ', '_')}_{timestamp}.json" | |
try: | |
with open(filename, 'w', encoding='utf-8') as f: | |
json.dump(ads, f, indent=2, ensure_ascii=False) | |
logger.info(f"Saved ads to {filename}") | |
return filename | |
except Exception as e: | |
logger.error(f"Error saving ads to JSON: {e}") | |
return None | |
##################################### | |
### GOOGLE ADS SCRAPER SECTION ##### | |
##################################### | |
# Constants for Google Ads Scraper | |
MAX_ADS_DEFAULT = 5 | |
# Import the actual GoogleAds class and regions | |
try: | |
from GoogleAds.main import GoogleAds, show_regions_list | |
from GoogleAds.regions import Regions | |
USING_ACTUAL_GOOGLE_ADS = True | |
logger.info("Successfully imported GoogleAds module") | |
except ImportError as e: | |
# Fallback to mock implementation if module is missing | |
logger.warning(f"GoogleAds module not found: {e}. Using mock implementation.") | |
USING_ACTUAL_GOOGLE_ADS = False | |
# Mock Regions dictionary - only used if real module fails to import | |
Regions = { | |
"GB": {"Region": "United Kingdom"} | |
} | |
def show_regions_list(): | |
"""Mock function - only used if real module fails to import""" | |
return [("GB", "United Kingdom"), ("US", "United States")] | |
# Mock GoogleAds class - only used if real module fails to import | |
class GoogleAds: | |
def __init__(self, region="GB"): | |
self.region = region | |
logger.warning(f"Using MOCK GoogleAds implementation with region: {region}") | |
logger.warning("Please install the GoogleAds module for actual data") | |
def creative_search_by_advertiser_id(self, advertiser_id, count=5): | |
# Mock implementation - only used if real module fails to import | |
logger.warning(f"MOCK: Searching for creatives from advertiser {advertiser_id}") | |
return [f"creative_{i}_{advertiser_id}" for i in range(min(count, 3))] | |
def get_detailed_ad(self, advertiser_id, creative_id): | |
# Mock implementation - only used if real module fails to import | |
logger.warning(f"MOCK: Getting details for creative {creative_id}") | |
# Find advertiser name | |
advertiser_name = "Unknown" | |
for adv in ADVERTISERS: | |
if adv["id"] == advertiser_id: | |
advertiser_name = adv["name"] | |
break | |
# Return mock ad details | |
return { | |
"Ad Format": "Text", | |
"Advertiser": advertiser_name, | |
"Advertiser Name": advertiser_name, | |
"Ad Title": f"MOCK DATA - INSTALL GOOGLE ADS MODULE", | |
"Ad Body": f"This is MOCK data because the GoogleAds module is not installed. Please install the proper module.", | |
"Last Shown": datetime.now().strftime("%Y-%m-%d"), | |
"Creative Id": creative_id, | |
"Ad Link": "#" | |
} | |
def clean_ad_text(text): | |
"""Clean ad text by removing special characters and formatting issues.""" | |
if text is None or not isinstance(text, str): | |
return "" | |
# Remove Unicode special characters often found in Google ads data | |
cleaned = text.replace('â¦', '') # Opening symbol | |
cleaned = cleaned.replace('â©', '') # Closing symbol | |
cleaned = cleaned.replace('<dynamically generated based on landing page content>', '[Dynamic Content]') | |
# Remove any other strange characters that might appear | |
cleaned = re.sub(r'[^\x00-\x7F]+', '', cleaned) | |
return cleaned.strip() | |
def get_regions_list(): | |
"""Get a limited list of regions - only GB and anywhere.""" | |
regions = [ | |
("anywhere", "Global (anywhere)"), | |
("GB", f"{Regions['GB']['Region']} (GB)") | |
] | |
return regions | |
def search_by_advertiser_id(advertiser_id: str, max_ads=MAX_ADS_DEFAULT, region="GB", progress=gr.Progress(), | |
provided_name=None) -> Tuple[ | |
str, Optional[pd.DataFrame], Optional[Dict]]: | |
try: | |
progress(0, desc="Initializing scraper...") | |
# Fix for region handling | |
region_val = region | |
if isinstance(region, tuple) and len(region) > 0: | |
region_val = region[0] | |
# Ensure 'anywhere' is handled correctly | |
if region_val == "Global (anywhere)" or "anywhere" in str(region_val).lower(): | |
region_val = "anywhere" | |
# Initialize the Google Ads scraper | |
scraper = GoogleAds(region=region_val) | |
progress(0.2, desc=f"Fetching ads for advertiser ID: {advertiser_id}") | |
# Get creative IDs for this advertiser | |
creative_ids = scraper.creative_search_by_advertiser_id(advertiser_id, count=max_ads) | |
if not creative_ids: | |
return f"No ads found for advertiser ID: {advertiser_id}", None, None | |
progress(0.3, desc=f"Found {len(creative_ids)} ads. Fetching details...") | |
# Fetch detailed information for each ad | |
ads_data = [] | |
ad_formats = {} | |
for i, creative_id in enumerate(creative_ids): | |
progress_val = 0.3 + (0.7 * (i / len(creative_ids))) | |
progress(progress_val, desc=f"Processing ad {i + 1}/{len(creative_ids)}") | |
try: | |
ad_details = scraper.get_detailed_ad(advertiser_id, creative_id) | |
# Fix encoding issues for Ad Title and Ad Body fields | |
if 'Ad Title' in ad_details: | |
ad_details['Ad Title'] = clean_ad_text(ad_details['Ad Title']) | |
if 'Ad Body' in ad_details: | |
ad_details['Ad Body'] = clean_ad_text(ad_details['Ad Body']) | |
ads_data.append(ad_details) | |
# Count ad formats | |
ad_format = ad_details.get("Ad Format", "Unknown") | |
ad_formats[ad_format] = ad_formats.get(ad_format, 0) + 1 | |
# Brief pause to avoid overwhelming the server | |
time.sleep(0.2) | |
except Exception as e: | |
print(f"Error fetching details for ad {creative_id}: {e}") | |
if not ads_data: | |
return f"Retrieved creative IDs but couldn't fetch ad details for advertiser ID: {advertiser_id}", None, None | |
# Create a DataFrame for display | |
df = pd.DataFrame(ads_data) | |
# Generate summary info | |
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
# Use provided name if available, otherwise try to determine from predefined list or ad data | |
advertiser_name = "Unknown" | |
# First, use the provided name if it exists | |
if provided_name: | |
advertiser_name = provided_name | |
else: | |
# Check our predefined list | |
for adv in ADVERTISERS: | |
if adv["id"] == advertiser_id: | |
advertiser_name = adv["name"] | |
break | |
# If still unknown, try to get from the ad data | |
if advertiser_name == "Unknown" and ads_data and len(ads_data) > 0: | |
# The field might be "Advertiser" or "Advertiser Name" depending on the version | |
for field in ["Advertiser", "Advertiser Name", "advertiser_name"]: | |
if field in ads_data[0]: | |
advertiser_name = ads_data[0][field] | |
break | |
summary = { | |
'advertiser_id': advertiser_id, | |
'advertiser_name': advertiser_name, | |
'ads_count': len(ads_data), | |
'timestamp': timestamp, | |
'region': region_val, | |
'ad_formats': ad_formats | |
} | |
# Find the earliest and latest ad | |
dates = [] | |
for ad in ads_data: | |
# The field might be "Last Shown" or "last_shown_date" depending on the version | |
for field in ["Last Shown", "last_shown_date"]: | |
if field in ad and ad[field]: | |
dates.append(ad[field]) | |
break | |
if dates: | |
summary['earliest_ad'] = min(dates) | |
summary['latest_ad'] = max(dates) | |
# Don't save the data, just prepare the summary info | |
summary = { | |
'advertiser_id': advertiser_id, | |
'advertiser_name': advertiser_name, | |
'ads_count': len(ads_data), | |
'timestamp': timestamp, | |
'region': region_val, | |
'ad_formats': ad_formats | |
} | |
# Find the earliest and latest ad | |
dates = [] | |
for ad in ads_data: | |
# The field might be "Last Shown" or "last_shown_date" depending on the version | |
for field in ["Last Shown", "last_shown_date"]: | |
if field in ad and ad[field]: | |
dates.append(ad[field]) | |
break | |
if dates: | |
summary['earliest_ad'] = min(dates) | |
summary['latest_ad'] = max(dates) | |
success_message = ( | |
f"Found {len(ads_data)} ads for advertiser '{advertiser_name}' (ID: {advertiser_id})." | |
) | |
progress(1.0, desc="Complete!") | |
return success_message, df, summary | |
except Exception as e: | |
error_message = f"Error searching for advertiser ID: {str(e)}" | |
return error_message, None, None | |
def process_advertiser_search(advertiser_selection, region, max_ads, progress=gr.Progress()): | |
"""Handle the advertiser selection form submission and update the UI.""" | |
# Extract advertiser ID and name from the selection format "ID: Name" | |
if not advertiser_selection: | |
return "Please select an advertiser to search", None, None, None | |
# Split the selection string to get the ID and name | |
parts = advertiser_selection.split(":", 1) | |
advertiser_id = parts[0].strip() | |
advertiser_name = parts[1].strip() if len(parts) > 1 else "Unknown" | |
# Perform the search | |
result_message, ads_df, summary_info = search_by_advertiser_id( | |
advertiser_id, max_ads, region, progress, advertiser_name | |
) | |
# Generate analysis if data is available | |
analysis_html = analyze_ads(ads_df, summary_info) if ads_df is not None and not ads_df.empty else None | |
return result_message, ads_df, analysis_html, summary_info | |
def analyze_ads(df: pd.DataFrame, summary: Dict) -> str: | |
""" | |
Analyze ads data and generate insights. | |
Args: | |
df: DataFrame containing ad data | |
summary: Dictionary with summary information | |
Returns: | |
HTML string with analysis results | |
""" | |
if df is None or df.empty or summary is None: | |
return "<h3>No data available for analysis</h3>" | |
try: | |
# Create a simple HTML report with the analysis | |
html = f""" | |
<div style="font-family: Arial, sans-serif;"> | |
<h2>{summary.get('advertiser_name', 'Unknown Advertiser')} - Ad Analysis</h2> | |
<div style="background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 20px;"> | |
<h3>Overview</h3> | |
<p><b>Advertiser ID:</b> {summary.get('advertiser_id', 'Unknown')}</p> | |
<p><b>Total Ads Found:</b> {summary['ads_count']}</p> | |
<p><b>Region:</b> {summary['region']}</p> | |
<p><b>Data Collected:</b> {summary['timestamp'].replace('_', ' ').replace('-', '/')}</p> | |
{f"<p><b>Ad Date Range:</b> {summary.get('earliest_ad')} to {summary.get('latest_ad')}</p>" if 'earliest_ad' in summary else ""} | |
</div> | |
<div style="display: flex; margin-bottom: 20px;"> | |
<div style="flex: 1; background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-right: 10px;"> | |
<h3>Ad Format Distribution</h3> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<tr style="background-color: #eaeaea;"> | |
<th style="text-align: left; padding: 8px; border-bottom: 1px solid #ddd;">Format</th> | |
<th style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">Count</th> | |
<th style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">Percentage</th> | |
</tr> | |
""" | |
total = sum(summary['ad_formats'].values()) | |
for format_name, count in summary['ad_formats'].items(): | |
percentage = (count / total) * 100 | |
html += f""" | |
<tr> | |
<td style="padding: 8px; border-bottom: 1px solid #ddd;">{format_name}</td> | |
<td style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">{count}</td> | |
<td style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">{percentage:.1f}%</td> | |
</tr> | |
""" | |
html += """ | |
</table> | |
</div> | |
""" | |
# Common words in ad titles | |
if 'Ad Title' in df.columns and not df['Ad Title'].isna().all(): | |
from collections import Counter | |
import re | |
# Extract words from titles | |
all_titles = ' '.join(df['Ad Title'].dropna().astype(str).tolist()) | |
words = re.findall(r'\b\w+\b', all_titles.lower()) | |
# Remove common stop words | |
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'by', 'of', 'is', | |
'are'} | |
filtered_words = [word for word in words if word not in stop_words and len(word) > 2] | |
# Count word frequencies | |
word_counts = Counter(filtered_words).most_common(10) | |
if word_counts: | |
html += """ | |
<div style="flex: 1; background-color: #f5f5f5; padding: 15px; border-radius: 5px;"> | |
<h3>Most Common Words in Ad Titles</h3> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<tr style="background-color: #eaeaea;"> | |
<th style="text-align: left; padding: 8px; border-bottom: 1px solid #ddd;">Word</th> | |
<th style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">Frequency</th> | |
</tr> | |
""" | |
for word, count in word_counts: | |
html += f""" | |
<tr> | |
<td style="padding: 8px; border-bottom: 1px solid #ddd;">{word}</td> | |
<td style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">{count}</td> | |
</tr> | |
""" | |
html += """ | |
</table> | |
</div> | |
""" | |
html += """ | |
</div> | |
<h3>SEO & Marketing Insights</h3> | |
<div style="background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 20px;"> | |
""" | |
# Add general insights | |
html += f""" | |
<h4>Competitive Intelligence</h4> | |
<ul> | |
<li>The advertiser has been active in advertising until {summary.get('latest_ad', 'recently')}</li> | |
<li>Their ad strategy focuses primarily on {max(summary['ad_formats'].items(), key=lambda x: x[1])[0]} ads</li> | |
<li>Consider monitoring changes in their ad frequency and creative strategy over time</li> | |
</ul> | |
<h4>UK Market Insights</h4> | |
<ul> | |
<li>The ads were collected for the {summary['region']} market</li> | |
<li>Regular monitoring can reveal seasonal UK advertising patterns</li> | |
<li>Compare with other regions to identify UK-specific marketing approaches</li> | |
</ul> | |
""" | |
html += """ | |
</div> | |
<h3>All Ad Examples</h3> | |
""" | |
# Add example ads (all of them, not just the most recent) | |
if not df.empty: | |
# Sort by Last Shown date if available | |
if 'Last Shown' in df.columns: | |
df = df.sort_values(by='Last Shown', ascending=False) | |
# Get all ads, not just the top 3 | |
for i, (_, ad) in enumerate(df.iterrows()): | |
html += f""" | |
<div style="background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 15px;"> | |
<h4>Ad {i + 1}: {ad.get('Creative Id', '')}</h4> | |
<p><b>Format:</b> {ad.get('Ad Format', 'Unknown')}</p> | |
<p><b>Last Shown:</b> {ad.get('Last Shown', 'Unknown')}</p> | |
""" | |
# Display title and body if available | |
if 'Ad Title' in ad and pd.notna(ad['Ad Title']) and ad['Ad Title']: | |
html += f"<p><b>Title:</b> {ad['Ad Title']}</p>" | |
if 'Ad Body' in ad and pd.notna(ad['Ad Body']) and ad['Ad Body']: | |
body = ad['Ad Body'] | |
if len(body) > 150: | |
body = body[:150] + "..." | |
html += f"<p><b>Body:</b> {body}</p>" | |
# Display image or video links if available | |
if 'Image URL' in ad and pd.notna(ad['Image URL']) and ad['Image URL']: | |
html += f"""<p><img src="{ad['Image URL']}" style="max-width: 300px; max-height: 200px;" /></p>""" | |
if 'Ad Link' in ad and pd.notna(ad['Ad Link']) and ad['Ad Link'] and ad.get('Ad Format') != 'Text': | |
html += f"""<p><b>Ad Link:</b> <a href="{ad['Ad Link']}" target="_blank">View Ad</a></p>""" | |
html += "</div>" | |
html += """ | |
</div> | |
""" | |
return html | |
except Exception as e: | |
return f"<h3>Error analyzing data: {str(e)}</h3>" | |
##################################### | |
### COMBINED INTERFACE SECTION ##### | |
##################################### | |
def create_combined_app(): | |
"""Create the combined Gradio interface with Facebook and Google Ads scrapers""" | |
# Create dropdown choices for advertiser selection | |
advertiser_choices = [f"{adv['id']}: {adv['name']}" for adv in ADVERTISERS] | |
with gr.Blocks(title="Combined Ads Transparency Scraper") as app: | |
gr.Markdown("# Combined Ads Transparency Scraper") | |
gr.Markdown("## Search for ads from Facebook and Google Ads transparency tools") | |
# Create tabs for the two different scrapers | |
with gr.Tabs() as tabs: | |
# Tab 1: Facebook Ad Library Scraper | |
with gr.TabItem("Facebook Ad Library"): | |
gr.Markdown("### Facebook Ad Library Search") | |
gr.Markdown("Search for ads by brand, domain, or keyword") | |
with gr.Row(): | |
fb_query_input = gr.Textbox( | |
label="Search Query", | |
placeholder="Enter brand, domain or product name", | |
value="" | |
) | |
fb_search_button = gr.Button("Find Facebook Ads", variant="primary") | |
fb_results_output = gr.Textbox(label="Search Results", lines=20) | |
fb_save_button = gr.Button("Save Results to JSON") | |
fb_save_status = gr.Textbox(label="Save Status", lines=1) | |
# Define the save function for Facebook | |
def save_fb_results(query, results_text): | |
if not results_text or "No Facebook ads found" in results_text: | |
return "No ads to save" | |
# Get the scraper to fetch fresh ads for JSON format | |
scraper = FacebookAdsScraper(headless=True, debug_mode=False) | |
ads = scraper.fetch_facebook_ads(query) | |
scraper.close() | |
# Save to JSON | |
filename = save_ads_to_json(ads, query) | |
if filename: | |
return f"Saved {len(ads)} ads to {filename}" | |
else: | |
return "Error saving ads to JSON" | |
# Connect Facebook interface components | |
fb_search_button.click( | |
fn=fetch_facebook_ads, | |
inputs=[fb_query_input], | |
outputs=[fb_results_output] | |
) | |
fb_save_button.click( | |
fn=save_fb_results, | |
inputs=[fb_query_input, fb_results_output], | |
outputs=[fb_save_status] | |
) | |
# Tab 2: Lightsaber Companies Google Ads Scraper | |
with gr.TabItem("Google Ads (Lightsaber Companies)"): | |
gr.Markdown("### Lightsaber Companies Ads Transparency Scraper") | |
gr.Markdown("View Google Ads data for popular lightsaber companies") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
advertiser_dropdown = gr.Dropdown( | |
choices=advertiser_choices, | |
label="Select Lightsaber Company", | |
info="Choose a company to view their Google Ads data" | |
) | |
with gr.Row(): | |
region_dropdown = gr.Dropdown( | |
choices=get_regions_list(), | |
value="GB", # UK is the default | |
label="Region", | |
info="Choose between Global or UK" | |
) | |
max_ads_slider = gr.Slider( | |
minimum=1, | |
maximum=10, | |
value=5, | |
step=1, | |
label="Max Ads to Retrieve" | |
) | |
search_button = gr.Button("Search Ads", variant="primary") | |
with gr.Column(scale=2): | |
result_message = gr.Markdown(label="Search Result") | |
# Tabs for displaying Google Ads search results | |
with gr.Tabs() as google_result_tabs: | |
with gr.Tab("Analysis"): | |
analysis_html = gr.HTML() | |
with gr.Tab("Raw Data"): | |
ads_table = gr.DataFrame() | |
# State for storing summary info | |
summary_info = gr.State() | |
# Connect the Google Ads inputs to the output function | |
search_button.click( | |
fn=process_advertiser_search, | |
inputs=[advertiser_dropdown, region_dropdown, max_ads_slider], | |
outputs=[result_message, ads_table, analysis_html, summary_info] | |
) | |
# About section for the combined app | |
with gr.Accordion("About This Tool", open=False): | |
gr.Markdown(""" | |
## About Combined Ads Transparency Scraper | |
This tool combines two different ad transparency scrapers: | |
1. **Facebook Ad Library Scraper**: Search for any advertiser's ads on Facebook. | |
2. **Google Ads Transparency Scraper**: View ads for popular lightsaber companies. | |
### Technical Details | |
- The Facebook scraper uses Selenium WebDriver with anti-detection techniques. | |
- The Google Ads scraper leverages the Google Ad Transparency API. | |
- Both scrapers include adaptive error handling and fallback mechanisms. | |
### Usage Notes | |
- Facebook scraping may take 30-60 seconds to complete | |
- Search results are not stored permanently | |
- Use the "Save Results" button to save data for later analysis | |
**Note**: This tool is intended for research and educational purposes only. | |
""") | |
return app | |
# Main execution | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Combined Ads Transparency Scraper") | |
parser.add_argument("--headless", action="store_true", default=True, help="Run in headless mode") | |
parser.add_argument("--debug", action="store_true", help="Enable debug mode with extra logging") | |
parser.add_argument("--fb-query", type=str, help="Facebook search query to run directly without Gradio") | |
parser.add_argument("--google-advertiser", type=str, help="Google Ads advertiser ID to run directly without Gradio") | |
parser.add_argument("--save", action="store_true", help="Save results to JSON file when using direct query") | |
args = parser.parse_args() | |
if args.fb_query: | |
# Run direct query mode for Facebook | |
scraper = FacebookAdsScraper(headless=args.headless, debug_mode=args.debug) | |
scraper.check_headless_visibility() | |
facebook_ads = scraper.fetch_facebook_ads(args.fb_query) | |
# Display results | |
print(f"\nFound {len(facebook_ads)} Facebook ads for '{args.fb_query}'") | |
if facebook_ads: | |
for i, ad in enumerate(facebook_ads): | |
print(f"\n--- Ad {i + 1} ---") | |
print(f"Platform: {ad['platform']}") | |
if 'status' in ad: | |
print(f"Status: {ad['status']}") | |
print(f"Advertiser: {ad['advertiser']}") | |
print(f"Text: {ad['text']}") | |
if ad.get('is_placeholder', False): | |
print("[THIS IS PLACEHOLDER DATA]") | |
# Save to JSON if requested | |
if args.save: | |
filename = save_ads_to_json(facebook_ads, args.fb_query) | |
if filename: | |
print(f"\nSaved {len(facebook_ads)} ads to {filename}") | |
else: | |
print("No Facebook ads found.") | |
scraper.close() | |
elif args.google_advertiser: | |
# Run direct query mode for Google Ads | |
advertiser_id = args.google_advertiser | |
# Find advertiser name if it's in our list | |
advertiser_name = "Unknown" | |
for adv in ADVERTISERS: | |
if adv["id"] == advertiser_id: | |
advertiser_name = adv["name"] | |
break | |
print(f"\nSearching for Google Ads from advertiser '{advertiser_name}' (ID: {advertiser_id})") | |
# Use a dummy progress object for CLI | |
class DummyProgress: | |
def __call__(self, value, desc=None): | |
if desc: | |
print(f"{desc} ({value * 100:.0f}%)") | |
result_message, ads_df, summary_info = search_by_advertiser_id( | |
advertiser_id, | |
max_ads=5, | |
region="GB", | |
progress=DummyProgress(), | |
provided_name=advertiser_name | |
) | |
print(f"\n{result_message}") | |
if ads_df is not None and not ads_df.empty: | |
print("\nFound ads:") | |
for i, (_, ad) in enumerate(ads_df.iterrows()): | |
print(f"\n--- Ad {i + 1} ---") | |
print(f"Format: {ad.get('Ad Format', 'Unknown')}") | |
print(f"Title: {ad.get('Ad Title', 'Unknown')}") | |
body_text = ad.get('Ad Body', 'Unknown') | |
if len(body_text) > 100: | |
body_text = body_text[:100] + "..." | |
print(f"Body: {body_text}") | |
print(f"Last Shown: {ad.get('Last Shown', 'Unknown')}") | |
print(f"Creative ID: {ad.get('Creative Id', 'Unknown')}") | |
else: | |
print("No Google ads found or error occurred.") | |
else: | |
# Run Gradio interface | |
app = create_combined_app() | |
print("Starting Combined Ads Transparency Scraper") | |
print("Facebook: Search for any brand or company") | |
print("Google Ads: Available lightsaber companies:") | |
for adv in ADVERTISERS: | |
print(f" - {adv['name']}") | |
app.launch() | |