ISE / engines /osint.py
fikird
fix: Update dependencies and imports
1f9ba54
raw
history blame
5.82 kB
"""
OSINT engine for gathering intelligence from various sources.
"""
from typing import Dict, List, Any, Optional
import asyncio
import json
from datetime import datetime
import whois
from holehe.core import import_submodules
from holehe.core import get_functions
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
import python_sherlock
from python_sherlock import sherlock_module
from tenacity import retry, stop_after_attempt, wait_exponential
class OSINTEngine:
def __init__(self):
self.holehe_modules = import_submodules("holehe.modules")
self.holehe_functions = get_functions(self.holehe_modules)
self.geolocator = Nominatim(user_agent="my_osint_app")
async def search_username(self, username: str) -> Dict[str, Any]:
"""Search for username across multiple platforms."""
results = {
"found": [],
"not_found": [],
"errors": []
}
# Sherlock search
try:
sherlock_results = sherlock_module.search_username(username)
for site, data in sherlock_results.items():
if data.get("status") == "found":
results["found"].append({
"platform": site,
"url": data.get("url", ""),
"source": "sherlock"
})
elif data.get("status") == "not found":
results["not_found"].append(site)
else:
results["errors"].append(site)
except Exception as e:
print(f"Sherlock error: {e}")
# Holehe search
try:
holehe_tasks = []
for platform in self.holehe_functions:
holehe_tasks.append(platform(username))
holehe_results = await asyncio.gather(*holehe_tasks, return_exceptions=True)
for result in holehe_results:
if isinstance(result, Exception):
continue
if result.get("exists"):
results["found"].append({
"platform": result.get("name", "unknown"),
"url": result.get("url", ""),
"source": "holehe"
})
else:
results["not_found"].append(result.get("name", "unknown"))
except Exception as e:
print(f"Holehe error: {e}")
return results
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def search_domain(self, domain: str) -> Dict[str, Any]:
"""Get information about a domain."""
try:
w = whois.whois(domain)
return {
"registrar": w.registrar,
"creation_date": w.creation_date,
"expiration_date": w.expiration_date,
"last_updated": w.updated_date,
"status": w.status,
"name_servers": w.name_servers,
"emails": w.emails
}
except Exception as e:
return {"error": str(e)}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def search_location(self, location: str) -> Dict[str, Any]:
"""Get information about a location."""
try:
location_data = self.geolocator.geocode(location, timeout=10)
if location_data:
return {
"address": location_data.address,
"latitude": location_data.latitude,
"longitude": location_data.longitude,
"raw": location_data.raw
}
return {"error": "Location not found"}
except GeocoderTimedOut:
return {"error": "Geocoding service timed out"}
except Exception as e:
return {"error": str(e)}
async def search_person(self, name: str, location: Optional[str] = None) -> Dict[str, Any]:
"""Search for information about a person."""
results = {
"name": name,
"location": location,
"social_profiles": [],
"possible_emails": [],
"location_info": None
}
# Get location information if provided
if location:
results["location_info"] = await self.search_location(location)
# Generate possible email formats
name_parts = name.lower().split()
if len(name_parts) >= 2:
first, last = name_parts[0], name_parts[-1]
common_domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
email_formats = [
f"{first}.{last}@{domain}",
f"{first}{last}@{domain}",
f"{first[0]}{last}@{domain}",
f"{first}_{last}@{domain}"
]
results["possible_emails"] = email_formats
return results
async def search(self, query: str, search_type: str = "username") -> Dict[str, Any]:
"""Main search interface."""
try:
if search_type == "username":
return await self.search_username(query)
elif search_type == "domain":
return await self.search_domain(query)
elif search_type == "location":
return await self.search_location(query)
elif search_type == "person":
return await self.search_person(query)
else:
return {"error": f"Unknown search type: {search_type}"}
except Exception as e:
return {"error": str(e)}