Spaces:
Runtime error
Runtime error
""" | |
OSINT engine for gathering intelligence from various sources. | |
""" | |
from typing import Dict, List, Any, Optional | |
import asyncio | |
import json | |
from datetime import datetime | |
import whois | |
from holehe.core import import_submodules | |
from holehe.core import get_functions | |
from geopy.geocoders import Nominatim | |
from geopy.exc import GeocoderTimedOut | |
import python_sherlock | |
from python_sherlock import sherlock_module | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
class OSINTEngine: | |
def __init__(self): | |
self.holehe_modules = import_submodules("holehe.modules") | |
self.holehe_functions = get_functions(self.holehe_modules) | |
self.geolocator = Nominatim(user_agent="my_osint_app") | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across multiple platforms.""" | |
results = { | |
"found": [], | |
"not_found": [], | |
"errors": [] | |
} | |
# Sherlock search | |
try: | |
sherlock_results = sherlock_module.search_username(username) | |
for site, data in sherlock_results.items(): | |
if data.get("status") == "found": | |
results["found"].append({ | |
"platform": site, | |
"url": data.get("url", ""), | |
"source": "sherlock" | |
}) | |
elif data.get("status") == "not found": | |
results["not_found"].append(site) | |
else: | |
results["errors"].append(site) | |
except Exception as e: | |
print(f"Sherlock error: {e}") | |
# Holehe search | |
try: | |
holehe_tasks = [] | |
for platform in self.holehe_functions: | |
holehe_tasks.append(platform(username)) | |
holehe_results = await asyncio.gather(*holehe_tasks, return_exceptions=True) | |
for result in holehe_results: | |
if isinstance(result, Exception): | |
continue | |
if result.get("exists"): | |
results["found"].append({ | |
"platform": result.get("name", "unknown"), | |
"url": result.get("url", ""), | |
"source": "holehe" | |
}) | |
else: | |
results["not_found"].append(result.get("name", "unknown")) | |
except Exception as e: | |
print(f"Holehe error: {e}") | |
return results | |
async def search_domain(self, domain: str) -> Dict[str, Any]: | |
"""Get information about a domain.""" | |
try: | |
w = whois.whois(domain) | |
return { | |
"registrar": w.registrar, | |
"creation_date": w.creation_date, | |
"expiration_date": w.expiration_date, | |
"last_updated": w.updated_date, | |
"status": w.status, | |
"name_servers": w.name_servers, | |
"emails": w.emails | |
} | |
except Exception as e: | |
return {"error": str(e)} | |
async def search_location(self, location: str) -> Dict[str, Any]: | |
"""Get information about a location.""" | |
try: | |
location_data = self.geolocator.geocode(location, timeout=10) | |
if location_data: | |
return { | |
"address": location_data.address, | |
"latitude": location_data.latitude, | |
"longitude": location_data.longitude, | |
"raw": location_data.raw | |
} | |
return {"error": "Location not found"} | |
except GeocoderTimedOut: | |
return {"error": "Geocoding service timed out"} | |
except Exception as e: | |
return {"error": str(e)} | |
async def search_person(self, name: str, location: Optional[str] = None) -> Dict[str, Any]: | |
"""Search for information about a person.""" | |
results = { | |
"name": name, | |
"location": location, | |
"social_profiles": [], | |
"possible_emails": [], | |
"location_info": None | |
} | |
# Get location information if provided | |
if location: | |
results["location_info"] = await self.search_location(location) | |
# Generate possible email formats | |
name_parts = name.lower().split() | |
if len(name_parts) >= 2: | |
first, last = name_parts[0], name_parts[-1] | |
common_domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"] | |
email_formats = [ | |
f"{first}.{last}@{domain}", | |
f"{first}{last}@{domain}", | |
f"{first[0]}{last}@{domain}", | |
f"{first}_{last}@{domain}" | |
] | |
results["possible_emails"] = email_formats | |
return results | |
async def search(self, query: str, search_type: str = "username") -> Dict[str, Any]: | |
"""Main search interface.""" | |
try: | |
if search_type == "username": | |
return await self.search_username(query) | |
elif search_type == "domain": | |
return await self.search_domain(query) | |
elif search_type == "location": | |
return await self.search_location(query) | |
elif search_type == "person": | |
return await self.search_person(query) | |
else: | |
return {"error": f"Unknown search type: {search_type}"} | |
except Exception as e: | |
return {"error": str(e)} | |