""" OSINT engine for username and person search. """ from typing import Dict, List, Any, Optional import asyncio import json import requests from bs4 import BeautifulSoup import whois from holehe.core import * from geopy.geocoders import Nominatim from tenacity import retry, stop_after_attempt, wait_exponential from duckduckgo_search import DDGS class OSINTEngine: def __init__(self): self.geolocator = Nominatim(user_agent="ise_search") self.holehe_modules = import_submodules("holehe.modules") self.known_platforms = [ "twitter.com", "facebook.com", "instagram.com", "linkedin.com", "github.com", "youtube.com", "reddit.com", "pinterest.com", "medium.com", "tumblr.com", "flickr.com", "vimeo.com" ] async def search_username(self, username: str) -> Dict[str, Any]: """Search for username across platforms.""" results = { "found": [], "not_found": [], "error": [] } # Manual platform check for platform in self.known_platforms: try: url = f"https://{platform}/{username}" response = requests.head(url, timeout=5, allow_redirects=True) if response.status_code == 200: results["found"].append({ "platform": platform.split(".")[0].title(), "url": url }) else: results["not_found"].append(platform.split(".")[0].title()) except Exception as e: results["error"].append({ "platform": platform, "error": str(e) }) # Run holehe checks try: holehe_results = [] for module in self.holehe_modules: try: check_func = getattr(module, "check") out = await check_func(username) if out and out.get("exists"): results["found"].append({ "platform": out["name"], "url": out.get("url", ""), "email": out.get("email", "") }) except Exception as e: print(f"Error in holehe module {module.__name__}: {e}") results["holehe"] = holehe_results except Exception as e: print(f"Error running holehe: {e}") return results @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]: """Search for person information.""" results = { "basic_info": {}, "social_profiles": [], "locations": [], "possible_relatives": [], "error": None } try: # Geocode location if provided if location: try: loc = self.geolocator.geocode(location) if loc: results["locations"].append({ "address": loc.address, "latitude": loc.latitude, "longitude": loc.longitude }) except Exception as e: print(f"Error geocoding location: {e}") # Basic search query search_query = f"{name}" if location: search_query += f" {location}" if age: search_query += f" {age} years old" # Use DuckDuckGo for initial search with DDGS() as ddgs: search_results = [r for r in ddgs.text(search_query, max_results=10)] for result in search_results: try: url = result["link"] # Check if URL is from a known social platform if any(platform in url.lower() for platform in self.known_platforms): platform = next(p for p in self.known_platforms if p in url.lower()) results["social_profiles"].append({ "platform": platform.split(".")[0].title(), "url": url, "title": result.get("title", "") }) except Exception as e: print(f"Error processing search result: {e}") except Exception as e: results["error"] = str(e) return results @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def domain_lookup(self, domain: str) -> Dict[str, Any]: """Perform WHOIS lookup for a domain.""" try: w = whois.whois(domain) return { "domain_name": w.domain_name, "registrar": w.registrar, "creation_date": w.creation_date, "expiration_date": w.expiration_date, "name_servers": w.name_servers, "status": w.status, "emails": w.emails, "dnssec": w.dnssec, "name": w.name, "org": w.org, "address": w.address, "city": w.city, "state": w.state, "zipcode": w.zipcode, "country": w.country } except Exception as e: return {"error": str(e)} async def analyze_social_profile(self, url: str) -> Dict[str, Any]: """Analyze a social media profile.""" results = { "profile_info": {}, "recent_activity": [], "connections": [], "error": None } try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") # Extract basic profile info results["profile_info"]["title"] = soup.title.string if soup.title else None # Extract meta information for meta in soup.find_all("meta"): property = meta.get("property", "") content = meta.get("content", "") if "og:title" in property: results["profile_info"]["og_title"] = content elif "og:description" in property: results["profile_info"]["og_description"] = content elif "og:image" in property: results["profile_info"]["og_image"] = content except Exception as e: results["error"] = str(e) return results