Spaces:
Runtime error
Runtime error
File size: 5,816 Bytes
48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa 1f9ba54 48922fa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
"""
OSINT engine for gathering intelligence from various sources.
"""
from typing import Dict, List, Any, Optional
import asyncio
import json
from datetime import datetime
import whois
from holehe.core import import_submodules
from holehe.core import get_functions
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
import python_sherlock
from python_sherlock import sherlock_module
from tenacity import retry, stop_after_attempt, wait_exponential
class OSINTEngine:
def __init__(self):
self.holehe_modules = import_submodules("holehe.modules")
self.holehe_functions = get_functions(self.holehe_modules)
self.geolocator = Nominatim(user_agent="my_osint_app")
async def search_username(self, username: str) -> Dict[str, Any]:
"""Search for username across multiple platforms."""
results = {
"found": [],
"not_found": [],
"errors": []
}
# Sherlock search
try:
sherlock_results = sherlock_module.search_username(username)
for site, data in sherlock_results.items():
if data.get("status") == "found":
results["found"].append({
"platform": site,
"url": data.get("url", ""),
"source": "sherlock"
})
elif data.get("status") == "not found":
results["not_found"].append(site)
else:
results["errors"].append(site)
except Exception as e:
print(f"Sherlock error: {e}")
# Holehe search
try:
holehe_tasks = []
for platform in self.holehe_functions:
holehe_tasks.append(platform(username))
holehe_results = await asyncio.gather(*holehe_tasks, return_exceptions=True)
for result in holehe_results:
if isinstance(result, Exception):
continue
if result.get("exists"):
results["found"].append({
"platform": result.get("name", "unknown"),
"url": result.get("url", ""),
"source": "holehe"
})
else:
results["not_found"].append(result.get("name", "unknown"))
except Exception as e:
print(f"Holehe error: {e}")
return results
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def search_domain(self, domain: str) -> Dict[str, Any]:
"""Get information about a domain."""
try:
w = whois.whois(domain)
return {
"registrar": w.registrar,
"creation_date": w.creation_date,
"expiration_date": w.expiration_date,
"last_updated": w.updated_date,
"status": w.status,
"name_servers": w.name_servers,
"emails": w.emails
}
except Exception as e:
return {"error": str(e)}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def search_location(self, location: str) -> Dict[str, Any]:
"""Get information about a location."""
try:
location_data = self.geolocator.geocode(location, timeout=10)
if location_data:
return {
"address": location_data.address,
"latitude": location_data.latitude,
"longitude": location_data.longitude,
"raw": location_data.raw
}
return {"error": "Location not found"}
except GeocoderTimedOut:
return {"error": "Geocoding service timed out"}
except Exception as e:
return {"error": str(e)}
async def search_person(self, name: str, location: Optional[str] = None) -> Dict[str, Any]:
"""Search for information about a person."""
results = {
"name": name,
"location": location,
"social_profiles": [],
"possible_emails": [],
"location_info": None
}
# Get location information if provided
if location:
results["location_info"] = await self.search_location(location)
# Generate possible email formats
name_parts = name.lower().split()
if len(name_parts) >= 2:
first, last = name_parts[0], name_parts[-1]
common_domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
email_formats = [
f"{first}.{last}@{domain}",
f"{first}{last}@{domain}",
f"{first[0]}{last}@{domain}",
f"{first}_{last}@{domain}"
]
results["possible_emails"] = email_formats
return results
async def search(self, query: str, search_type: str = "username") -> Dict[str, Any]:
"""Main search interface."""
try:
if search_type == "username":
return await self.search_username(query)
elif search_type == "domain":
return await self.search_domain(query)
elif search_type == "location":
return await self.search_location(query)
elif search_type == "person":
return await self.search_person(query)
else:
return {"error": f"Unknown search type: {search_type}"}
except Exception as e:
return {"error": str(e)}
|