Spaces:
Runtime error
Runtime error
""" | |
OSINT engine for username and person search. | |
""" | |
from typing import Dict, List, Any, Optional | |
import asyncio | |
import json | |
import requests | |
from bs4 import BeautifulSoup | |
import whois | |
from holehe.core import * | |
from geopy.geocoders import Nominatim | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
from duckduckgo_search import DDGS | |
class OSINTEngine: | |
def __init__(self): | |
self.geolocator = Nominatim(user_agent="ise_search") | |
self.holehe_modules = import_submodules("holehe.modules") | |
self.known_platforms = [ | |
"twitter.com", "facebook.com", "instagram.com", "linkedin.com", | |
"github.com", "youtube.com", "reddit.com", "pinterest.com", | |
"medium.com", "tumblr.com", "flickr.com", "vimeo.com" | |
] | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across platforms.""" | |
results = { | |
"found": [], | |
"not_found": [], | |
"error": [] | |
} | |
# Manual platform check | |
for platform in self.known_platforms: | |
try: | |
url = f"https://{platform}/{username}" | |
response = requests.head(url, timeout=5, allow_redirects=True) | |
if response.status_code == 200: | |
results["found"].append({ | |
"platform": platform.split(".")[0].title(), | |
"url": url | |
}) | |
else: | |
results["not_found"].append(platform.split(".")[0].title()) | |
except Exception as e: | |
results["error"].append({ | |
"platform": platform, | |
"error": str(e) | |
}) | |
# Run holehe checks | |
try: | |
holehe_results = [] | |
for module in self.holehe_modules: | |
try: | |
check_func = getattr(module, "check") | |
out = await check_func(username) | |
if out and out.get("exists"): | |
results["found"].append({ | |
"platform": out["name"], | |
"url": out.get("url", ""), | |
"email": out.get("email", "") | |
}) | |
except Exception as e: | |
print(f"Error in holehe module {module.__name__}: {e}") | |
results["holehe"] = holehe_results | |
except Exception as e: | |
print(f"Error running holehe: {e}") | |
return results | |
async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]: | |
"""Search for person information.""" | |
results = { | |
"basic_info": {}, | |
"social_profiles": [], | |
"locations": [], | |
"possible_relatives": [], | |
"error": None | |
} | |
try: | |
# Geocode location if provided | |
if location: | |
try: | |
loc = self.geolocator.geocode(location) | |
if loc: | |
results["locations"].append({ | |
"address": loc.address, | |
"latitude": loc.latitude, | |
"longitude": loc.longitude | |
}) | |
except Exception as e: | |
print(f"Error geocoding location: {e}") | |
# Basic search query | |
search_query = f"{name}" | |
if location: | |
search_query += f" {location}" | |
if age: | |
search_query += f" {age} years old" | |
# Use DuckDuckGo for initial search | |
with DDGS() as ddgs: | |
search_results = [r for r in ddgs.text(search_query, max_results=10)] | |
for result in search_results: | |
try: | |
url = result["link"] | |
# Check if URL is from a known social platform | |
if any(platform in url.lower() for platform in self.known_platforms): | |
platform = next(p for p in self.known_platforms if p in url.lower()) | |
results["social_profiles"].append({ | |
"platform": platform.split(".")[0].title(), | |
"url": url, | |
"title": result.get("title", "") | |
}) | |
except Exception as e: | |
print(f"Error processing search result: {e}") | |
except Exception as e: | |
results["error"] = str(e) | |
return results | |
async def domain_lookup(self, domain: str) -> Dict[str, Any]: | |
"""Perform WHOIS lookup for a domain.""" | |
try: | |
w = whois.whois(domain) | |
return { | |
"domain_name": w.domain_name, | |
"registrar": w.registrar, | |
"creation_date": w.creation_date, | |
"expiration_date": w.expiration_date, | |
"name_servers": w.name_servers, | |
"status": w.status, | |
"emails": w.emails, | |
"dnssec": w.dnssec, | |
"name": w.name, | |
"org": w.org, | |
"address": w.address, | |
"city": w.city, | |
"state": w.state, | |
"zipcode": w.zipcode, | |
"country": w.country | |
} | |
except Exception as e: | |
return {"error": str(e)} | |
async def analyze_social_profile(self, url: str) -> Dict[str, Any]: | |
"""Analyze a social media profile.""" | |
results = { | |
"profile_info": {}, | |
"recent_activity": [], | |
"connections": [], | |
"error": None | |
} | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Extract basic profile info | |
results["profile_info"]["title"] = soup.title.string if soup.title else None | |
# Extract meta information | |
for meta in soup.find_all("meta"): | |
property = meta.get("property", "") | |
content = meta.get("content", "") | |
if "og:title" in property: | |
results["profile_info"]["og_title"] = content | |
elif "og:description" in property: | |
results["profile_info"]["og_description"] = content | |
elif "og:image" in property: | |
results["profile_info"]["og_image"] = content | |
except Exception as e: | |
results["error"] = str(e) | |
return results | |