import time import logging import argparse import os import json import random import re import uuid from collections import defaultdict from datetime import datetime from typing import List, Dict, Any, Optional, Union, Tuple from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ( TimeoutException, NoSuchElementException, WebDriverException ) import gradio as gr import pandas as pd # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Predefined advertisers list ADVERTISERS = [ {"id": "AR10051102910143528961", "name": "Theory Sabers"}, {"id": "AR12645693856247971841", "name": "Artsabers"}, {"id": "AR07257050693515608065", "name": "bmlightsabers"}, {"id": "AR01506694249926623233", "name": "Padawan Outpost Ltd"}, {"id": "AR10584025853845307393", "name": "GalaxySabers"}, {"id": "AR16067963414479110145", "name": "nsabers"}, {"id": "AR12875519274243850241", "name": "es-sabers"}, {"id": "AR05144647067079016449", "name": "Ultra Sabers"}, {"id": "AR15581800501283389441", "name": "SuperNeox"}, {"id": "AR06148907109187584001", "name": "Sabertrio"} ] ##################################### ### FACEBOOK SCRAPER SECTION ####### ##################################### # Constants for Facebook Scraper FB_DEFAULT_TIMEOUT = 60 # seconds FB_MIN_WAIT_TIME = 1 # minimum seconds for random waits FB_MAX_WAIT_TIME = 3 # maximum seconds for random waits FB_MAX_SCROLL_ATTEMPTS = 5 # maximum number of scroll attempts FB_SELECTOR_HISTORY_FILE = "fb_selector_stats.json" # File to store selector success stats # User agents for rotation USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/118.0" ] # Viewport sizes for randomization VIEWPORT_SIZES = [ (1366, 768), (1920, 1080), (1536, 864), (1440, 900) ] class SelectorStats: """Class to track and optimize selector performance""" def __init__(self, file_path=FB_SELECTOR_HISTORY_FILE): self.file_path = file_path self.stats = self._load_stats() def _load_stats(self) -> Dict: """Load stats from file or initialize if not exists""" if os.path.exists(self.file_path): try: with open(self.file_path, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: logger.warning(f"Error loading selector stats: {e}, initializing new stats") # Initialize structure for platform stats return { "facebook": {"selectors": {}, "last_updated": datetime.now().isoformat()} } def update_selector_success(self, selector: str, count: int = 1) -> None: """Record successful use of a selector""" platform = "facebook" # Only using Facebook for this version if platform not in self.stats: self.stats[platform] = {"selectors": {}, "last_updated": datetime.now().isoformat()} if selector not in self.stats[platform]["selectors"]: self.stats[platform]["selectors"][selector] = {"successes": 0, "attempts": 0} self.stats[platform]["selectors"][selector]["successes"] += count self.stats[platform]["selectors"][selector]["attempts"] += 1 self.stats[platform]["last_updated"] = datetime.now().isoformat() # Save after each update self._save_stats() def update_selector_attempt(self, selector: str) -> None: """Record attempt to use a selector regardless of success""" platform = "facebook" # Only using Facebook for this version if platform not in self.stats: self.stats[platform] = {"selectors": {}, "last_updated": datetime.now().isoformat()} if selector not in self.stats[platform]["selectors"]: self.stats[platform]["selectors"][selector] = {"successes": 0, "attempts": 0} self.stats[platform]["selectors"][selector]["attempts"] += 1 self.stats[platform]["last_updated"] = datetime.now().isoformat() # Don't save on every attempt to reduce disk I/O def get_best_selectors(self, min_attempts: int = 3, max_count: int = 10) -> List[str]: """Get the best performing selectors for Facebook""" platform = "facebook" # Only using Facebook for this version if platform not in self.stats: return [] selectors = [] for selector, data in self.stats[platform]["selectors"].items(): if data["attempts"] >= min_attempts: success_rate = data["successes"] / data["attempts"] if data["attempts"] > 0 else 0 selectors.append((selector, success_rate)) # Sort by success rate (descending) selectors.sort(key=lambda x: x[1], reverse=True) # Return top N selectors return [s[0] for s in selectors[:max_count]] def _save_stats(self) -> None: """Save stats to file""" try: with open(self.file_path, 'w') as f: json.dump(self.stats, f, indent=2) except IOError as e: logger.error(f"Error saving selector stats: {e}") class FacebookAdsScraper: def __init__(self, headless=True, debug_mode=False): """Initialize the ads scraper with browser configuration""" self.debug_mode = debug_mode self.headless = headless self.driver = self._setup_driver(headless) # Initialize selector stats tracker self.selector_stats = SelectorStats() # Track navigation history for smart retry self.navigation_history = [] # Track success/failure for self-healing self.success_rate = defaultdict(lambda: {"success": 0, "failure": 0}) # Generate a session ID for this scraping session self.session_id = str(uuid.uuid4())[:8] def _setup_driver(self, headless): """Set up and configure the Chrome WebDriver with anti-detection measures""" chrome_options = Options() if headless: chrome_options.add_argument("--headless") # Select a random user agent user_agent = random.choice(USER_AGENTS) chrome_options.add_argument(f"--user-agent={user_agent}") logger.info(f"Using user agent: {user_agent}") # Select a random viewport size viewport_width, viewport_height = random.choice(VIEWPORT_SIZES) chrome_options.add_argument(f"--window-size={viewport_width},{viewport_height}") logger.info(f"Using viewport size: {viewport_width}x{viewport_height}") # Add common options to avoid detection chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--start-maximized") chrome_options.add_argument("--enable-unsafe-swiftshader") # Performance improvements chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-notifications") chrome_options.add_argument("--blink-settings=imagesEnabled=true") # Add experimental options to avoid detection chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option("useAutomationExtension", False) # Additional preferences to improve performance chrome_options.add_experimental_option("prefs", { "profile.default_content_setting_values.notifications": 2, "profile.managed_default_content_settings.images": 1, "profile.managed_default_content_settings.cookies": 1, # Add some randomness to the profile "profile.default_content_setting_values.plugins": random.randint(1, 3), "profile.default_content_setting_values.popups": random.randint(1, 2) }) try: # Try to create driver with service for newer Selenium versions service = Service() driver = webdriver.Chrome(service=service, options=chrome_options) # Execute CDP commands to avoid detection (works in newer Chrome versions) driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // Overwrite the languages with random order Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en', 'de'].sort(() => 0.5 - Math.random()) }); // Modify plugins length Object.defineProperty(navigator, 'plugins', { get: () => { // Randomize plugins length between 3 and 7 const len = Math.floor(Math.random() * 5) + 3; const plugins = { length: len }; for (let i = 0; i < len; i++) { plugins[i] = { name: ['Flash', 'Chrome PDF Plugin', 'Native Client', 'Chrome PDF Viewer'][Math.floor(Math.random() * 4)], filename: ['internal-pdf-viewer', 'mhjfbmdgcfjbbpaeojofohoefgiehjai', 'internal-nacl-plugin'][Math.floor(Math.random() * 3)] }; } return plugins; } }); """ }) except TypeError: # Fallback for older Selenium versions driver = webdriver.Chrome(options=chrome_options) except Exception as e: # If there's an issue with CDP, continue anyway logger.warning(f"CDP command failed, continuing: {e}") driver = webdriver.Chrome(options=chrome_options) # Set default timeout driver.set_page_load_timeout(FB_DEFAULT_TIMEOUT) return driver def random_wait(self, min_time=None, max_time=None): """Wait for a random amount of time to simulate human behavior""" min_time = min_time or FB_MIN_WAIT_TIME max_time = max_time or FB_MAX_WAIT_TIME wait_time = random.uniform(min_time, max_time) time.sleep(wait_time) return wait_time def human_like_scroll(self, scroll_attempts=None): """Scroll down the page in a human-like way""" attempts = scroll_attempts or random.randint(3, FB_MAX_SCROLL_ATTEMPTS) # Get page height before scrolling initial_height = self.driver.execute_script("return document.body.scrollHeight") for i in range(attempts): # Calculate a random scroll amount (25-90% of viewport) scroll_percent = random.uniform(0.25, 0.9) viewport_height = self.driver.execute_script("return window.innerHeight") scroll_amount = int(viewport_height * scroll_percent) # Scroll with a random speed scroll_steps = random.randint(5, 15) current_position = self.driver.execute_script("return window.pageYOffset") target_position = current_position + scroll_amount for step in range(scroll_steps): # Calculate next position with easing t = (step + 1) / scroll_steps # Ease in-out function factor = t * t * (3.0 - 2.0 * t) next_position = current_position + (target_position - current_position) * factor self.driver.execute_script(f"window.scrollTo(0, {next_position})") time.sleep(random.uniform(0.01, 0.05)) # Occasionally pause longer as if reading content if random.random() < 0.3: # 30% chance to pause self.random_wait(1.5, 3.5) else: self.random_wait(0.5, 1.5) # Log progress logger.info(f"Human-like scroll {i + 1}/{attempts} completed") # Check if we've reached the bottom of the page new_height = self.driver.execute_script("return document.body.scrollHeight") if new_height == initial_height and i > 1: # We haven't loaded new content after a couple of scrolls # Do one big scroll to the bottom to trigger any lazy loading self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)") self.random_wait() initial_height = new_height def simulate_human_behavior(self): """Simulate random human-like interactions with the page""" # Random chance to move the mouse around if random.random() < 0.7: # 70% chance try: # Find a random element to hover over elements = self.driver.find_elements(By.CSS_SELECTOR, "a, button, input, div") if elements: element = random.choice(elements) ActionChains(self.driver).move_to_element(element).perform() self.random_wait(0.2, 1.0) except: # Ignore any errors, this is just for randomness pass # Random chance to click somewhere non-interactive if random.random() < 0.2: # 20% chance try: # Find a safe area to click (like a paragraph or heading) safe_elements = self.driver.find_elements(By.CSS_SELECTOR, "p, h1, h2, h3, h4, span") if safe_elements: safe_element = random.choice(safe_elements) ActionChains(self.driver).move_to_element(safe_element).click().perform() self.random_wait(0.2, 1.0) except: # Ignore any errors, this is just for randomness pass def check_headless_visibility(self): """ Check if elements are visible in headless mode Returns True if everything is working properly """ if not self.headless: # If not in headless mode, no need to check return True logger.info("Performing headless visibility check...") # Use a simpler page for testing interactivity test_url = "https://www.example.com" try: self.driver.get(test_url) # Just check if the page loads at all - don't try to interact with elements WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) logger.info("Headless check passed: Page loaded successfully") return True except Exception as e: logger.error(f"Headless check failed: {e}") # Try switching to non-headless mode logger.info("Switching to non-headless mode...") self.driver.quit() self.headless = False self.driver = self._setup_driver(headless=False) return True # Continue without rechecking def fetch_facebook_ads(self, query): """Fetch ads from Facebook's Ad Library with anti-detection measures""" ads_data = [] base_url = "https://www.facebook.com/ads/library/" logger.info(f"Fetching Facebook ads for {query}") try: # Add some randomness to URL parameters params = { "active_status": "all", "ad_type": "all", "country": "ALL", "q": query, # Random parameters to avoid fingerprinting "_": int(time.time() * 1000), "session_id": self.session_id } # Construct URL with parameters url = base_url + "?" + "&".join(f"{k}={v}" for k, v in params.items()) logger.info(f"Navigating to Facebook URL: {url}") # Navigate to the URL self.driver.get(url) # Wait for page to initially load try: WebDriverWait(self.driver, FB_DEFAULT_TIMEOUT).until( EC.any_of( EC.presence_of_element_located((By.CSS_SELECTOR, "div[role='main']")), EC.presence_of_element_located((By.TAG_NAME, "body")) ) ) except TimeoutException: logger.warning("Timeout waiting for Facebook page to load initially, continuing anyway") # Human-like scrolling to trigger lazy loading self.human_like_scroll() # Simulate human behavior self.simulate_human_behavior() # Save debug data at this point if self.debug_mode: self._save_debug_data("facebook_after_scroll", query) # Find ad elements using self-healing selectors ad_elements = self._find_facebook_ad_elements() if not ad_elements: logger.info("No Facebook ads found") if self.debug_mode: self._save_debug_data("facebook_no_ads", query) # Return placeholder data as fallback return self._generate_placeholder_facebook_data(query) # Process the found ad elements for i, ad in enumerate(ad_elements[:10]): # Limit to 10 ads for performance try: ad_data = { "platform": "Facebook", "query": query, "timestamp": datetime.now().isoformat(), "index": i + 1, "session_id": self.session_id } # Extract data using smarter methods full_text = ad.text.strip() # Log first ad text for debugging if i == 0: logger.info(f"First Facebook ad full text (first 150 chars): {full_text[:150]}...") # Smart data extraction extracted_data = self._extract_facebook_ad_data(ad, full_text) # Merge extracted data ad_data.update(extracted_data) # Add fallback values if needed if "advertiser" not in ad_data or not ad_data["advertiser"]: ad_data["advertiser"] = "Unknown Advertiser" if "text" not in ad_data or not ad_data["text"]: ad_data["text"] = "Ad content not available" ads_data.append(ad_data) except Exception as e: logger.warning(f"Error processing Facebook ad {i + 1}: {e}") return ads_data if ads_data else self._generate_placeholder_facebook_data(query) except Exception as e: logger.error(f"Error fetching Facebook ads: {e}") if self.debug_mode: self._save_debug_data("facebook_error", query) return self._generate_placeholder_facebook_data(query) def _find_facebook_ad_elements(self): """Find Facebook ad elements using a self-healing selector strategy""" # Historical best performers historical_best = self.selector_stats.get_best_selectors() # Base selectors base_selectors = [ "div[class*='_7jvw']", "div[data-testid='ad_library_card']", "div[class*='AdLibraryCard']", "div.AdLibraryCard", "div[class*='adCard']", "div[class*='ad_card']" ] # Combine selectors, prioritizing historical best combined_selectors = historical_best + [s for s in base_selectors if s not in historical_best] # Try each selector for selector in combined_selectors: try: # Record attempt self.selector_stats.update_selector_attempt(selector) elements = self.driver.find_elements(By.CSS_SELECTOR, selector) if elements: logger.info(f"Found {len(elements)} Facebook ads using selector: {selector}") # Record success self.selector_stats.update_selector_success(selector, len(elements)) return elements except Exception as e: logger.debug(f"Facebook selector {selector} failed: {e}") # No elements found with standard selectors, try a more aggressive approach try: # Look for text patterns that typically appear in ads patterns = [ "//div[contains(., 'Library ID:')]", "//div[contains(., 'Sponsored')]", "//div[contains(., 'Active')][contains(., 'Library ID')]", "//div[contains(., 'Inactive')][contains(., 'Library ID')]" ] for pattern in patterns: elements = self.driver.find_elements(By.XPATH, pattern) if elements: ad_containers = [] for element in elements: try: # Try to find containing card by navigating up container = element for _ in range(5): # Try up to 5 levels up if container.get_attribute("class") and "card" in container.get_attribute( "class").lower(): ad_containers.append(container) break container = container.find_element(By.XPATH, "..") except: continue if ad_containers: logger.info(f"Found {len(ad_containers)} Facebook ads using text pattern approach") # Record this special method self.selector_stats.update_selector_success("text_pattern_method", len(ad_containers)) return ad_containers except Exception as e: logger.debug(f"Facebook text pattern approach failed: {e}") return [] def _extract_facebook_ad_data(self, ad_element, full_text): """Extract data from Facebook ad using multiple intelligent methods""" extracted_data = {} # Process text content if available if full_text: # Split into lines lines = full_text.split('\n') # Check for status (Active/Inactive) if lines and lines[0] in ["Active", "Inactive"]: extracted_data["status"] = lines[0] # Look for advertiser - typically after "See ad details" for i, line in enumerate(lines): if "See ad details" in line or "See summary details" in line: if i + 1 < len(lines): extracted_data["advertiser"] = lines[i + 1].strip() break else: # First line is likely the advertiser if lines: extracted_data["advertiser"] = lines[0].strip() # Extract ad content # Look for patterns to determine content boundaries content_start_idx = -1 content_end_idx = len(lines) # Find where "Sponsored" appears for i, line in enumerate(lines): if "Sponsored" in line: content_start_idx = i + 1 break # If no "Sponsored" found, look for advertiser + status if content_start_idx == -1: # Skip metadata lines metadata_patterns = [ "Library ID:", "Started running on", "Platforms", "Open Drop-down", "See ad details", "See summary details", "This ad has multiple versions" ] for i, line in enumerate(lines): if any(pattern in line for pattern in metadata_patterns): continue if i > 0: # Skip first line (advertiser) content_start_idx = i break # Find where UI elements start ui_elements = [ "Like", "Comment", "Share", "Learn More", "Shop Now", "Sign Up", "Visit Instagram profile", "See More" ] for i, line in enumerate(lines): # Skip lines before content start if i <= content_start_idx: continue if any(ui in line for ui in ui_elements): content_end_idx = i break # Extract content between boundaries if content_start_idx != -1 and content_start_idx < content_end_idx: content_lines = lines[content_start_idx:content_end_idx] extracted_data["text"] = "\n".join(content_lines).strip() # If text extraction failed, try element-based approaches if "text" not in extracted_data or not extracted_data["text"]: facebook_text_selectors = [ "div[data-ad-preview='message']", # Direct message container "div[class*='_7jy6']", # Known ad text container "div[data-testid='ad-creative-text']", # Test ID for ad text "div[class*='_38ki']", # Another text container "span[class*='_7oe']", # Text span "div.text_exposed_root" # Exposed text root ] for selector in facebook_text_selectors: try: elements = ad_element.find_elements(By.CSS_SELECTOR, selector) text_content = " ".join([e.text.strip() for e in elements if e.text.strip()]) if text_content: extracted_data["text"] = text_content break except: pass # If advertiser extraction failed, try element-based approaches if "advertiser" not in extracted_data or not extracted_data["advertiser"]: facebook_advertiser_selectors = [ "span[class*='fsl']", # Facebook specific large text class "a[aria-label*='profile']", # Profile links often contain advertiser name "h4", # Often contains advertiser name "div[class*='_8jh5']", # Known advertiser class "a[role='link']", # Links are often advertiser names "div[class*='_3qn7']", # Another known advertiser container "div[class*='_7jvw'] a", # Links within the ad card ] for selector in facebook_advertiser_selectors: try: elements = ad_element.find_elements(By.CSS_SELECTOR, selector) for element in elements: text = element.text.strip() if text and len(text) < 50: # Advertiser names are usually short extracted_data["advertiser"] = text break if "advertiser" in extracted_data and extracted_data["advertiser"]: break except: pass return extracted_data def _generate_placeholder_facebook_data(self, query): """Generate placeholder Facebook ad data when real ads cannot be scraped""" logger.info(f"Returning placeholder Facebook ad data for query: {query}") return [ { "platform": "Facebook", "query": query, "advertiser": "Placeholder Advertiser 1", "text": f"This is a placeholder ad for {query} since no actual ads could be scraped.", "timestamp": datetime.now().isoformat(), "index": 1, "is_placeholder": True, "session_id": self.session_id }, { "platform": "Facebook", "query": query, "advertiser": "Placeholder Advertiser 2", "text": f"Another placeholder ad for {query}. Please check your scraping settings.", "timestamp": datetime.now().isoformat(), "index": 2, "is_placeholder": True, "session_id": self.session_id } ] def _save_debug_data(self, prefix, query): """Save debugging data for investigation""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") debug_dir = "debug_data" if not os.path.exists(debug_dir): os.makedirs(debug_dir) # Save screenshot screenshot_path = f"{debug_dir}/{prefix}_{query}_{timestamp}.png" self.driver.save_screenshot(screenshot_path) logger.info(f"Saved debug screenshot to {screenshot_path}") # Save HTML html_path = f"{debug_dir}/{prefix}_{query}_{timestamp}.html" with open(html_path, "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.info(f"Saved debug HTML to {html_path}") # Save sample of first ad structure if available try: ad_elements = self.driver.find_elements(By.CSS_SELECTOR, "div[class*='_7jvw']") if ad_elements: first_ad = ad_elements[0] # Get sample HTML structure first_ad_html = first_ad.get_attribute('outerHTML') # Save first ad HTML sample_path = f"{debug_dir}/{prefix}_sample_ad_{timestamp}.html" with open(sample_path, "w", encoding="utf-8") as f: f.write(first_ad_html) logger.info(f"Saved sample ad HTML to {sample_path}") # Log the text structure logger.info(f"Sample ad text structure: {first_ad.text[:300]}...") except Exception as e: logger.error(f"Error saving ad sample: {e}") def close(self): """Close the WebDriver and save stats""" if self.driver: self.driver.quit() # Save selector stats one last time self.selector_stats._save_stats() # Facebook Gradio Interface Function def fetch_facebook_ads(query): """Fetch Facebook ads only for Gradio interface""" logger.info(f"Processing Facebook ad search for: {query}") scraper = FacebookAdsScraper(headless=True, debug_mode=True) # Perform headless check first visibility_ok = scraper.check_headless_visibility() if not visibility_ok: logger.warning("Headless visibility check failed, results may be affected") # Fetch ads from Facebook facebook_ads = scraper.fetch_facebook_ads(query) # Format for display formatted_results = [] for ad in facebook_ads: formatted_ad = f"Platform: {ad['platform']}\n" # Include status if available if 'status' in ad: formatted_ad += f"Status: {ad['status']}\n" formatted_ad += f"Advertiser: {ad['advertiser']}\n" # Format ad text with word wrapping text_lines = [] if ad['text'] and ad['text'] != "Ad content not available": # Split long text into readable chunks words = ad['text'].split() current_line = "" for word in words: if len(current_line) + len(word) + 1 <= 80: # 80 chars per line current_line += (" " + word if current_line else word) else: text_lines.append(current_line) current_line = word if current_line: text_lines.append(current_line) formatted_text = "\n".join(text_lines) else: formatted_text = ad['text'] formatted_ad += f"Ad Text: {formatted_text}\n" formatted_ad += f"Timestamp: {ad['timestamp']}\n" if ad.get('is_placeholder', False): formatted_ad += "[THIS IS PLACEHOLDER DATA]\n" formatted_ad += "-" * 50 formatted_results.append(formatted_ad) scraper.close() return "\n\n".join(formatted_results) if formatted_results else "No Facebook ads found for your query." # Create a function to save ads to JSON def save_ads_to_json(ads, query): """Save ads to a JSON file""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"facebook_ads_{query.replace(' ', '_')}_{timestamp}.json" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(ads, f, indent=2, ensure_ascii=False) logger.info(f"Saved ads to {filename}") return filename except Exception as e: logger.error(f"Error saving ads to JSON: {e}") return None ##################################### ### GOOGLE ADS SCRAPER SECTION ##### ##################################### # Constants for Google Ads Scraper MAX_ADS_DEFAULT = 5 # Import the actual GoogleAds class and regions try: from GoogleAds.main import GoogleAds, show_regions_list from GoogleAds.regions import Regions USING_ACTUAL_GOOGLE_ADS = True logger.info("Successfully imported GoogleAds module") except ImportError as e: # Fallback to mock implementation if module is missing logger.warning(f"GoogleAds module not found: {e}. Using mock implementation.") USING_ACTUAL_GOOGLE_ADS = False # Mock Regions dictionary - only used if real module fails to import Regions = { "GB": {"Region": "United Kingdom"} } def show_regions_list(): """Mock function - only used if real module fails to import""" return [("GB", "United Kingdom"), ("US", "United States")] # Mock GoogleAds class - only used if real module fails to import class GoogleAds: def __init__(self, region="GB"): self.region = region logger.warning(f"Using MOCK GoogleAds implementation with region: {region}") logger.warning("Please install the GoogleAds module for actual data") def creative_search_by_advertiser_id(self, advertiser_id, count=5): # Mock implementation - only used if real module fails to import logger.warning(f"MOCK: Searching for creatives from advertiser {advertiser_id}") return [f"creative_{i}_{advertiser_id}" for i in range(min(count, 3))] def get_detailed_ad(self, advertiser_id, creative_id): # Mock implementation - only used if real module fails to import logger.warning(f"MOCK: Getting details for creative {creative_id}") # Find advertiser name advertiser_name = "Unknown" for adv in ADVERTISERS: if adv["id"] == advertiser_id: advertiser_name = adv["name"] break # Return mock ad details return { "Ad Format": "Text", "Advertiser": advertiser_name, "Advertiser Name": advertiser_name, "Ad Title": f"MOCK DATA - INSTALL GOOGLE ADS MODULE", "Ad Body": f"This is MOCK data because the GoogleAds module is not installed. Please install the proper module.", "Last Shown": datetime.now().strftime("%Y-%m-%d"), "Creative Id": creative_id, "Ad Link": "#" } def clean_ad_text(text): """Clean ad text by removing special characters and formatting issues.""" if text is None or not isinstance(text, str): return "" # Remove Unicode special characters often found in Google ads data cleaned = text.replace('â¦', '') # Opening symbol cleaned = cleaned.replace('â©', '') # Closing symbol cleaned = cleaned.replace('', '[Dynamic Content]') # Remove any other strange characters that might appear cleaned = re.sub(r'[^\x00-\x7F]+', '', cleaned) return cleaned.strip() def get_regions_list(): """Get a limited list of regions - only GB and anywhere.""" regions = [ ("anywhere", "Global (anywhere)"), ("GB", f"{Regions['GB']['Region']} (GB)") ] return regions def search_by_advertiser_id(advertiser_id: str, max_ads=MAX_ADS_DEFAULT, region="GB", progress=gr.Progress(), provided_name=None) -> Tuple[ str, Optional[pd.DataFrame], Optional[Dict]]: try: progress(0, desc="Initializing scraper...") # Fix for region handling region_val = region if isinstance(region, tuple) and len(region) > 0: region_val = region[0] # Ensure 'anywhere' is handled correctly if region_val == "Global (anywhere)" or "anywhere" in str(region_val).lower(): region_val = "anywhere" # Initialize the Google Ads scraper scraper = GoogleAds(region=region_val) progress(0.2, desc=f"Fetching ads for advertiser ID: {advertiser_id}") # Get creative IDs for this advertiser creative_ids = scraper.creative_search_by_advertiser_id(advertiser_id, count=max_ads) if not creative_ids: return f"No ads found for advertiser ID: {advertiser_id}", None, None progress(0.3, desc=f"Found {len(creative_ids)} ads. Fetching details...") # Fetch detailed information for each ad ads_data = [] ad_formats = {} for i, creative_id in enumerate(creative_ids): progress_val = 0.3 + (0.7 * (i / len(creative_ids))) progress(progress_val, desc=f"Processing ad {i + 1}/{len(creative_ids)}") try: ad_details = scraper.get_detailed_ad(advertiser_id, creative_id) # Fix encoding issues for Ad Title and Ad Body fields if 'Ad Title' in ad_details: ad_details['Ad Title'] = clean_ad_text(ad_details['Ad Title']) if 'Ad Body' in ad_details: ad_details['Ad Body'] = clean_ad_text(ad_details['Ad Body']) ads_data.append(ad_details) # Count ad formats ad_format = ad_details.get("Ad Format", "Unknown") ad_formats[ad_format] = ad_formats.get(ad_format, 0) + 1 # Brief pause to avoid overwhelming the server time.sleep(0.2) except Exception as e: print(f"Error fetching details for ad {creative_id}: {e}") if not ads_data: return f"Retrieved creative IDs but couldn't fetch ad details for advertiser ID: {advertiser_id}", None, None # Create a DataFrame for display df = pd.DataFrame(ads_data) # Generate summary info timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # Use provided name if available, otherwise try to determine from predefined list or ad data advertiser_name = "Unknown" # First, use the provided name if it exists if provided_name: advertiser_name = provided_name else: # Check our predefined list for adv in ADVERTISERS: if adv["id"] == advertiser_id: advertiser_name = adv["name"] break # If still unknown, try to get from the ad data if advertiser_name == "Unknown" and ads_data and len(ads_data) > 0: # The field might be "Advertiser" or "Advertiser Name" depending on the version for field in ["Advertiser", "Advertiser Name", "advertiser_name"]: if field in ads_data[0]: advertiser_name = ads_data[0][field] break summary = { 'advertiser_id': advertiser_id, 'advertiser_name': advertiser_name, 'ads_count': len(ads_data), 'timestamp': timestamp, 'region': region_val, 'ad_formats': ad_formats } # Find the earliest and latest ad dates = [] for ad in ads_data: # The field might be "Last Shown" or "last_shown_date" depending on the version for field in ["Last Shown", "last_shown_date"]: if field in ad and ad[field]: dates.append(ad[field]) break if dates: summary['earliest_ad'] = min(dates) summary['latest_ad'] = max(dates) # Don't save the data, just prepare the summary info summary = { 'advertiser_id': advertiser_id, 'advertiser_name': advertiser_name, 'ads_count': len(ads_data), 'timestamp': timestamp, 'region': region_val, 'ad_formats': ad_formats } # Find the earliest and latest ad dates = [] for ad in ads_data: # The field might be "Last Shown" or "last_shown_date" depending on the version for field in ["Last Shown", "last_shown_date"]: if field in ad and ad[field]: dates.append(ad[field]) break if dates: summary['earliest_ad'] = min(dates) summary['latest_ad'] = max(dates) success_message = ( f"Found {len(ads_data)} ads for advertiser '{advertiser_name}' (ID: {advertiser_id})." ) progress(1.0, desc="Complete!") return success_message, df, summary except Exception as e: error_message = f"Error searching for advertiser ID: {str(e)}" return error_message, None, None def process_advertiser_search(advertiser_selection, region, max_ads, progress=gr.Progress()): """Handle the advertiser selection form submission and update the UI.""" # Extract advertiser ID and name from the selection format "ID: Name" if not advertiser_selection: return "Please select an advertiser to search", None, None, None # Split the selection string to get the ID and name parts = advertiser_selection.split(":", 1) advertiser_id = parts[0].strip() advertiser_name = parts[1].strip() if len(parts) > 1 else "Unknown" # Perform the search result_message, ads_df, summary_info = search_by_advertiser_id( advertiser_id, max_ads, region, progress, advertiser_name ) # Generate analysis if data is available analysis_html = analyze_ads(ads_df, summary_info) if ads_df is not None and not ads_df.empty else None return result_message, ads_df, analysis_html, summary_info def analyze_ads(df: pd.DataFrame, summary: Dict) -> str: """ Analyze ads data and generate insights. Args: df: DataFrame containing ad data summary: Dictionary with summary information Returns: HTML string with analysis results """ if df is None or df.empty or summary is None: return "

No data available for analysis

" try: # Create a simple HTML report with the analysis html = f"""

{summary.get('advertiser_name', 'Unknown Advertiser')} - Ad Analysis

Overview

Advertiser ID: {summary.get('advertiser_id', 'Unknown')}

Total Ads Found: {summary['ads_count']}

Region: {summary['region']}

Data Collected: {summary['timestamp'].replace('_', ' ').replace('-', '/')}

{f"

Ad Date Range: {summary.get('earliest_ad')} to {summary.get('latest_ad')}

" if 'earliest_ad' in summary else ""}

Ad Format Distribution

""" total = sum(summary['ad_formats'].values()) for format_name, count in summary['ad_formats'].items(): percentage = (count / total) * 100 html += f""" """ html += """
Format Count Percentage
{format_name} {count} {percentage:.1f}%
""" # Common words in ad titles if 'Ad Title' in df.columns and not df['Ad Title'].isna().all(): from collections import Counter import re # Extract words from titles all_titles = ' '.join(df['Ad Title'].dropna().astype(str).tolist()) words = re.findall(r'\b\w+\b', all_titles.lower()) # Remove common stop words stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'by', 'of', 'is', 'are'} filtered_words = [word for word in words if word not in stop_words and len(word) > 2] # Count word frequencies word_counts = Counter(filtered_words).most_common(10) if word_counts: html += """

Most Common Words in Ad Titles

""" for word, count in word_counts: html += f""" """ html += """
Word Frequency
{word} {count}
""" html += """

SEO & Marketing Insights

""" # Add general insights html += f"""

Competitive Intelligence

  • The advertiser has been active in advertising until {summary.get('latest_ad', 'recently')}
  • Their ad strategy focuses primarily on {max(summary['ad_formats'].items(), key=lambda x: x[1])[0]} ads
  • Consider monitoring changes in their ad frequency and creative strategy over time

UK Market Insights

  • The ads were collected for the {summary['region']} market
  • Regular monitoring can reveal seasonal UK advertising patterns
  • Compare with other regions to identify UK-specific marketing approaches
""" html += """

All Ad Examples

""" # Add example ads (all of them, not just the most recent) if not df.empty: # Sort by Last Shown date if available if 'Last Shown' in df.columns: df = df.sort_values(by='Last Shown', ascending=False) # Get all ads, not just the top 3 for i, (_, ad) in enumerate(df.iterrows()): html += f"""

Ad {i + 1}: {ad.get('Creative Id', '')}

Format: {ad.get('Ad Format', 'Unknown')}

Last Shown: {ad.get('Last Shown', 'Unknown')}

""" # Display title and body if available if 'Ad Title' in ad and pd.notna(ad['Ad Title']) and ad['Ad Title']: html += f"

Title: {ad['Ad Title']}

" if 'Ad Body' in ad and pd.notna(ad['Ad Body']) and ad['Ad Body']: body = ad['Ad Body'] if len(body) > 150: body = body[:150] + "..." html += f"

Body: {body}

" # Display image or video links if available if 'Image URL' in ad and pd.notna(ad['Image URL']) and ad['Image URL']: html += f"""

""" if 'Ad Link' in ad and pd.notna(ad['Ad Link']) and ad['Ad Link'] and ad.get('Ad Format') != 'Text': html += f"""

Ad Link: View Ad

""" html += "
" html += """
""" return html except Exception as e: return f"

Error analyzing data: {str(e)}

" ##################################### ### COMBINED INTERFACE SECTION ##### ##################################### def create_combined_app(): """Create the combined Gradio interface with Facebook and Google Ads scrapers""" # Create dropdown choices for advertiser selection advertiser_choices = [f"{adv['id']}: {adv['name']}" for adv in ADVERTISERS] with gr.Blocks(title="Combined Ads Transparency Scraper") as app: gr.Markdown("# Combined Ads Transparency Scraper") gr.Markdown("## Search for ads from Facebook and Google Ads transparency tools") # Create tabs for the two different scrapers with gr.Tabs() as tabs: # Tab 1: Facebook Ad Library Scraper with gr.TabItem("Facebook Ad Library"): gr.Markdown("### Facebook Ad Library Search") gr.Markdown("Search for ads by brand, domain, or keyword") with gr.Row(): fb_query_input = gr.Textbox( label="Search Query", placeholder="Enter brand, domain or product name", value="" ) fb_search_button = gr.Button("Find Facebook Ads", variant="primary") fb_results_output = gr.Textbox(label="Search Results", lines=20) fb_save_button = gr.Button("Save Results to JSON") fb_save_status = gr.Textbox(label="Save Status", lines=1) # Define the save function for Facebook def save_fb_results(query, results_text): if not results_text or "No Facebook ads found" in results_text: return "No ads to save" # Get the scraper to fetch fresh ads for JSON format scraper = FacebookAdsScraper(headless=True, debug_mode=False) ads = scraper.fetch_facebook_ads(query) scraper.close() # Save to JSON filename = save_ads_to_json(ads, query) if filename: return f"Saved {len(ads)} ads to {filename}" else: return "Error saving ads to JSON" # Connect Facebook interface components fb_search_button.click( fn=fetch_facebook_ads, inputs=[fb_query_input], outputs=[fb_results_output] ) fb_save_button.click( fn=save_fb_results, inputs=[fb_query_input, fb_results_output], outputs=[fb_save_status] ) # Tab 2: Lightsaber Companies Google Ads Scraper with gr.TabItem("Google Ads (Lightsaber Companies)"): gr.Markdown("### Lightsaber Companies Ads Transparency Scraper") gr.Markdown("View Google Ads data for popular lightsaber companies") with gr.Row(): with gr.Column(scale=3): advertiser_dropdown = gr.Dropdown( choices=advertiser_choices, label="Select Lightsaber Company", info="Choose a company to view their Google Ads data" ) with gr.Row(): region_dropdown = gr.Dropdown( choices=get_regions_list(), value="GB", # UK is the default label="Region", info="Choose between Global or UK" ) max_ads_slider = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="Max Ads to Retrieve" ) search_button = gr.Button("Search Ads", variant="primary") with gr.Column(scale=2): result_message = gr.Markdown(label="Search Result") # Tabs for displaying Google Ads search results with gr.Tabs() as google_result_tabs: with gr.Tab("Analysis"): analysis_html = gr.HTML() with gr.Tab("Raw Data"): ads_table = gr.DataFrame() # State for storing summary info summary_info = gr.State() # Connect the Google Ads inputs to the output function search_button.click( fn=process_advertiser_search, inputs=[advertiser_dropdown, region_dropdown, max_ads_slider], outputs=[result_message, ads_table, analysis_html, summary_info] ) # About section for the combined app with gr.Accordion("About This Tool", open=False): gr.Markdown(""" ## About Combined Ads Transparency Scraper This tool combines two different ad transparency scrapers: 1. **Facebook Ad Library Scraper**: Search for any advertiser's ads on Facebook. 2. **Google Ads Transparency Scraper**: View ads for popular lightsaber companies. ### Technical Details - The Facebook scraper uses Selenium WebDriver with anti-detection techniques. - The Google Ads scraper leverages the Google Ad Transparency API. - Both scrapers include adaptive error handling and fallback mechanisms. ### Usage Notes - Facebook scraping may take 30-60 seconds to complete - Search results are not stored permanently - Use the "Save Results" button to save data for later analysis **Note**: This tool is intended for research and educational purposes only. """) return app # Main execution if __name__ == "__main__": parser = argparse.ArgumentParser(description="Combined Ads Transparency Scraper") parser.add_argument("--headless", action="store_true", default=True, help="Run in headless mode") parser.add_argument("--debug", action="store_true", help="Enable debug mode with extra logging") parser.add_argument("--fb-query", type=str, help="Facebook search query to run directly without Gradio") parser.add_argument("--google-advertiser", type=str, help="Google Ads advertiser ID to run directly without Gradio") parser.add_argument("--save", action="store_true", help="Save results to JSON file when using direct query") args = parser.parse_args() if args.fb_query: # Run direct query mode for Facebook scraper = FacebookAdsScraper(headless=args.headless, debug_mode=args.debug) scraper.check_headless_visibility() facebook_ads = scraper.fetch_facebook_ads(args.fb_query) # Display results print(f"\nFound {len(facebook_ads)} Facebook ads for '{args.fb_query}'") if facebook_ads: for i, ad in enumerate(facebook_ads): print(f"\n--- Ad {i + 1} ---") print(f"Platform: {ad['platform']}") if 'status' in ad: print(f"Status: {ad['status']}") print(f"Advertiser: {ad['advertiser']}") print(f"Text: {ad['text']}") if ad.get('is_placeholder', False): print("[THIS IS PLACEHOLDER DATA]") # Save to JSON if requested if args.save: filename = save_ads_to_json(facebook_ads, args.fb_query) if filename: print(f"\nSaved {len(facebook_ads)} ads to {filename}") else: print("No Facebook ads found.") scraper.close() elif args.google_advertiser: # Run direct query mode for Google Ads advertiser_id = args.google_advertiser # Find advertiser name if it's in our list advertiser_name = "Unknown" for adv in ADVERTISERS: if adv["id"] == advertiser_id: advertiser_name = adv["name"] break print(f"\nSearching for Google Ads from advertiser '{advertiser_name}' (ID: {advertiser_id})") # Use a dummy progress object for CLI class DummyProgress: def __call__(self, value, desc=None): if desc: print(f"{desc} ({value * 100:.0f}%)") result_message, ads_df, summary_info = search_by_advertiser_id( advertiser_id, max_ads=5, region="GB", progress=DummyProgress(), provided_name=advertiser_name ) print(f"\n{result_message}") if ads_df is not None and not ads_df.empty: print("\nFound ads:") for i, (_, ad) in enumerate(ads_df.iterrows()): print(f"\n--- Ad {i + 1} ---") print(f"Format: {ad.get('Ad Format', 'Unknown')}") print(f"Title: {ad.get('Ad Title', 'Unknown')}") body_text = ad.get('Ad Body', 'Unknown') if len(body_text) > 100: body_text = body_text[:100] + "..." print(f"Body: {body_text}") print(f"Last Shown: {ad.get('Last Shown', 'Unknown')}") print(f"Creative ID: {ad.get('Creative Id', 'Unknown')}") else: print("No Google ads found or error occurred.") else: # Run Gradio interface app = create_combined_app() print("Starting Combined Ads Transparency Scraper") print("Facebook: Search for any brand or company") print("Google Ads: Available lightsaber companies:") for adv in ADVERTISERS: print(f" - {adv['name']}") app.launch()