File size: 5,816 Bytes
48922fa
1f9ba54
48922fa
 
 
 
1f9ba54
 
 
 
48922fa
 
1f9ba54
 
48922fa
 
 
 
1f9ba54
 
 
 
48922fa
 
1f9ba54
 
 
 
48922fa
 
1f9ba54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48922fa
1f9ba54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48922fa
1f9ba54
48922fa
 
1f9ba54
 
48922fa
1f9ba54
 
 
 
 
 
 
 
 
48922fa
 
 
 
1f9ba54
48922fa
1f9ba54
48922fa
 
1f9ba54
 
 
 
 
 
 
 
48922fa
 
 
 
 
1f9ba54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48922fa
1f9ba54
 
 
 
 
 
 
 
 
 
48922fa
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
OSINT engine for gathering intelligence from various sources.
"""
from typing import Dict, List, Any, Optional
import asyncio
import json
from datetime import datetime
import whois
from holehe.core import import_submodules
from holehe.core import get_functions
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
import python_sherlock
from python_sherlock import sherlock_module
from tenacity import retry, stop_after_attempt, wait_exponential

class OSINTEngine:
    def __init__(self):
        self.holehe_modules = import_submodules("holehe.modules")
        self.holehe_functions = get_functions(self.holehe_modules)
        self.geolocator = Nominatim(user_agent="my_osint_app")
        
    async def search_username(self, username: str) -> Dict[str, Any]:
        """Search for username across multiple platforms."""
        results = {
            "found": [],
            "not_found": [],
            "errors": []
        }
        
        # Sherlock search
        try:
            sherlock_results = sherlock_module.search_username(username)
            for site, data in sherlock_results.items():
                if data.get("status") == "found":
                    results["found"].append({
                        "platform": site,
                        "url": data.get("url", ""),
                        "source": "sherlock"
                    })
                elif data.get("status") == "not found":
                    results["not_found"].append(site)
                else:
                    results["errors"].append(site)
        except Exception as e:
            print(f"Sherlock error: {e}")
        
        # Holehe search
        try:
            holehe_tasks = []
            for platform in self.holehe_functions:
                holehe_tasks.append(platform(username))
            
            holehe_results = await asyncio.gather(*holehe_tasks, return_exceptions=True)
            
            for result in holehe_results:
                if isinstance(result, Exception):
                    continue
                    
                if result.get("exists"):
                    results["found"].append({
                        "platform": result.get("name", "unknown"),
                        "url": result.get("url", ""),
                        "source": "holehe"
                    })
                else:
                    results["not_found"].append(result.get("name", "unknown"))
        except Exception as e:
            print(f"Holehe error: {e}")
        
        return results
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def search_domain(self, domain: str) -> Dict[str, Any]:
        """Get information about a domain."""
        try:
            w = whois.whois(domain)
            return {
                "registrar": w.registrar,
                "creation_date": w.creation_date,
                "expiration_date": w.expiration_date,
                "last_updated": w.updated_date,
                "status": w.status,
                "name_servers": w.name_servers,
                "emails": w.emails
            }
        except Exception as e:
            return {"error": str(e)}
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def search_location(self, location: str) -> Dict[str, Any]:
        """Get information about a location."""
        try:
            location_data = self.geolocator.geocode(location, timeout=10)
            if location_data:
                return {
                    "address": location_data.address,
                    "latitude": location_data.latitude,
                    "longitude": location_data.longitude,
                    "raw": location_data.raw
                }
            return {"error": "Location not found"}
        except GeocoderTimedOut:
            return {"error": "Geocoding service timed out"}
        except Exception as e:
            return {"error": str(e)}
    
    async def search_person(self, name: str, location: Optional[str] = None) -> Dict[str, Any]:
        """Search for information about a person."""
        results = {
            "name": name,
            "location": location,
            "social_profiles": [],
            "possible_emails": [],
            "location_info": None
        }
        
        # Get location information if provided
        if location:
            results["location_info"] = await self.search_location(location)
        
        # Generate possible email formats
        name_parts = name.lower().split()
        if len(name_parts) >= 2:
            first, last = name_parts[0], name_parts[-1]
            common_domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
            email_formats = [
                f"{first}.{last}@{domain}",
                f"{first}{last}@{domain}",
                f"{first[0]}{last}@{domain}",
                f"{first}_{last}@{domain}"
            ]
            results["possible_emails"] = email_formats
        
        return results
    
    async def search(self, query: str, search_type: str = "username") -> Dict[str, Any]:
        """Main search interface."""
        try:
            if search_type == "username":
                return await self.search_username(query)
            elif search_type == "domain":
                return await self.search_domain(query)
            elif search_type == "location":
                return await self.search_location(query)
            elif search_type == "person":
                return await self.search_person(query)
            else:
                return {"error": f"Unknown search type: {search_type}"}
        except Exception as e:
            return {"error": str(e)}