""" OSINT engine for gathering intelligence from various sources. """ from typing import Dict, List, Any, Optional import asyncio import json from datetime import datetime import whois from holehe.core import import_submodules from holehe.core import get_functions from geopy.geocoders import Nominatim from geopy.exc import GeocoderTimedOut import python_sherlock from python_sherlock import sherlock_module from tenacity import retry, stop_after_attempt, wait_exponential class OSINTEngine: def __init__(self): self.holehe_modules = import_submodules("holehe.modules") self.holehe_functions = get_functions(self.holehe_modules) self.geolocator = Nominatim(user_agent="my_osint_app") async def search_username(self, username: str) -> Dict[str, Any]: """Search for username across multiple platforms.""" results = { "found": [], "not_found": [], "errors": [] } # Sherlock search try: sherlock_results = sherlock_module.search_username(username) for site, data in sherlock_results.items(): if data.get("status") == "found": results["found"].append({ "platform": site, "url": data.get("url", ""), "source": "sherlock" }) elif data.get("status") == "not found": results["not_found"].append(site) else: results["errors"].append(site) except Exception as e: print(f"Sherlock error: {e}") # Holehe search try: holehe_tasks = [] for platform in self.holehe_functions: holehe_tasks.append(platform(username)) holehe_results = await asyncio.gather(*holehe_tasks, return_exceptions=True) for result in holehe_results: if isinstance(result, Exception): continue if result.get("exists"): results["found"].append({ "platform": result.get("name", "unknown"), "url": result.get("url", ""), "source": "holehe" }) else: results["not_found"].append(result.get("name", "unknown")) except Exception as e: print(f"Holehe error: {e}") return results @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def search_domain(self, domain: str) -> Dict[str, Any]: """Get information about a domain.""" try: w = whois.whois(domain) return { "registrar": w.registrar, "creation_date": w.creation_date, "expiration_date": w.expiration_date, "last_updated": w.updated_date, "status": w.status, "name_servers": w.name_servers, "emails": w.emails } except Exception as e: return {"error": str(e)} @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def search_location(self, location: str) -> Dict[str, Any]: """Get information about a location.""" try: location_data = self.geolocator.geocode(location, timeout=10) if location_data: return { "address": location_data.address, "latitude": location_data.latitude, "longitude": location_data.longitude, "raw": location_data.raw } return {"error": "Location not found"} except GeocoderTimedOut: return {"error": "Geocoding service timed out"} except Exception as e: return {"error": str(e)} async def search_person(self, name: str, location: Optional[str] = None) -> Dict[str, Any]: """Search for information about a person.""" results = { "name": name, "location": location, "social_profiles": [], "possible_emails": [], "location_info": None } # Get location information if provided if location: results["location_info"] = await self.search_location(location) # Generate possible email formats name_parts = name.lower().split() if len(name_parts) >= 2: first, last = name_parts[0], name_parts[-1] common_domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"] email_formats = [ f"{first}.{last}@{domain}", f"{first}{last}@{domain}", f"{first[0]}{last}@{domain}", f"{first}_{last}@{domain}" ] results["possible_emails"] = email_formats return results async def search(self, query: str, search_type: str = "username") -> Dict[str, Any]: """Main search interface.""" try: if search_type == "username": return await self.search_username(query) elif search_type == "domain": return await self.search_domain(query) elif search_type == "location": return await self.search_location(query) elif search_type == "person": return await self.search_person(query) else: return {"error": f"Unknown search type: {search_type}"} except Exception as e: return {"error": str(e)}