fikird commited on
Commit
048fa41
·
1 Parent(s): 1f9ba54

fix: Runtime errors and dependency issues

Browse files

- Updated LangChain imports to new community package
- Fixed sherlock dependency usage
- Updated package versions
- Improved error handling

Files changed (3) hide show
  1. engines/osint.py +128 -116
  2. engines/search.py +1 -0
  3. requirements.txt +0 -1
engines/osint.py CHANGED
@@ -1,151 +1,163 @@
1
  """
2
- OSINT engine for gathering intelligence from various sources.
3
  """
4
  from typing import Dict, List, Any, Optional
5
  import asyncio
6
- import json
7
  from datetime import datetime
 
8
  import whois
9
- from holehe.core import import_submodules
10
- from holehe.core import get_functions
 
 
 
 
11
  from geopy.geocoders import Nominatim
12
  from geopy.exc import GeocoderTimedOut
13
- import python_sherlock
14
- from python_sherlock import sherlock_module
15
- from tenacity import retry, stop_after_attempt, wait_exponential
16
 
17
  class OSINTEngine:
18
  def __init__(self):
19
- self.holehe_modules = import_submodules("holehe.modules")
20
- self.holehe_functions = get_functions(self.holehe_modules)
21
- self.geolocator = Nominatim(user_agent="my_osint_app")
22
-
23
  async def search_username(self, username: str) -> Dict[str, Any]:
24
- """Search for username across multiple platforms."""
25
  results = {
26
- "found": [],
27
- "not_found": [],
28
- "errors": []
29
  }
30
 
31
- # Sherlock search
32
- try:
33
- sherlock_results = sherlock_module.search_username(username)
34
- for site, data in sherlock_results.items():
35
- if data.get("status") == "found":
36
- results["found"].append({
37
- "platform": site,
38
- "url": data.get("url", ""),
39
- "source": "sherlock"
40
- })
41
- elif data.get("status") == "not found":
42
- results["not_found"].append(site)
43
- else:
44
- results["errors"].append(site)
45
- except Exception as e:
46
- print(f"Sherlock error: {e}")
47
-
48
  # Holehe search
49
  try:
50
- holehe_tasks = []
51
- for platform in self.holehe_functions:
52
- holehe_tasks.append(platform(username))
53
-
54
- holehe_results = await asyncio.gather(*holehe_tasks, return_exceptions=True)
55
-
56
  for result in holehe_results:
57
- if isinstance(result, Exception):
58
- continue
59
-
60
- if result.get("exists"):
61
- results["found"].append({
62
- "platform": result.get("name", "unknown"),
63
- "url": result.get("url", ""),
64
- "source": "holehe"
65
  })
66
- else:
67
- results["not_found"].append(result.get("name", "unknown"))
68
  except Exception as e:
69
- print(f"Holehe error: {e}")
70
 
71
- return results
72
-
73
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
74
- async def search_domain(self, domain: str) -> Dict[str, Any]:
75
- """Get information about a domain."""
76
  try:
77
- w = whois.whois(domain)
78
- return {
79
- "registrar": w.registrar,
80
- "creation_date": w.creation_date,
81
- "expiration_date": w.expiration_date,
82
- "last_updated": w.updated_date,
83
- "status": w.status,
84
- "name_servers": w.name_servers,
85
- "emails": w.emails
86
- }
 
 
 
 
 
 
 
 
 
87
  except Exception as e:
88
- return {"error": str(e)}
89
-
90
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
91
- async def search_location(self, location: str) -> Dict[str, Any]:
92
- """Get information about a location."""
93
- try:
94
- location_data = self.geolocator.geocode(location, timeout=10)
95
- if location_data:
96
- return {
97
- "address": location_data.address,
98
- "latitude": location_data.latitude,
99
- "longitude": location_data.longitude,
100
- "raw": location_data.raw
101
- }
102
- return {"error": "Location not found"}
103
- except GeocoderTimedOut:
104
- return {"error": "Geocoding service timed out"}
105
- except Exception as e:
106
- return {"error": str(e)}
107
 
108
- async def search_person(self, name: str, location: Optional[str] = None) -> Dict[str, Any]:
109
- """Search for information about a person."""
110
  results = {
111
- "name": name,
112
- "location": location,
113
  "social_profiles": [],
114
- "possible_emails": [],
115
- "location_info": None
116
  }
117
 
118
- # Get location information if provided
119
  if location:
120
- results["location_info"] = await self.search_location(location)
 
 
 
 
 
 
 
 
 
 
 
121
 
122
- # Generate possible email formats
123
- name_parts = name.lower().split()
124
- if len(name_parts) >= 2:
125
- first, last = name_parts[0], name_parts[-1]
126
- common_domains = ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
127
- email_formats = [
128
- f"{first}.{last}@{domain}",
129
- f"{first}{last}@{domain}",
130
- f"{first[0]}{last}@{domain}",
131
- f"{first}_{last}@{domain}"
132
- ]
133
- results["possible_emails"] = email_formats
 
 
 
 
134
 
135
  return results
136
 
137
- async def search(self, query: str, search_type: str = "username") -> Dict[str, Any]:
138
- """Main search interface."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  try:
140
- if search_type == "username":
141
- return await self.search_username(query)
142
- elif search_type == "domain":
143
- return await self.search_domain(query)
144
- elif search_type == "location":
145
- return await self.search_location(query)
146
- elif search_type == "person":
147
- return await self.search_person(query)
148
- else:
149
- return {"error": f"Unknown search type: {search_type}"}
 
150
  except Exception as e:
151
- return {"error": str(e)}
 
 
 
1
  """
2
+ OSINT engine for username and person search.
3
  """
4
  from typing import Dict, List, Any, Optional
5
  import asyncio
 
6
  from datetime import datetime
7
+ import json
8
  import whois
9
+ from holehe.core import AsyncEngine
10
+ from holehe.localuseragent import ua
11
+ import subprocess
12
+ import tempfile
13
+ import os
14
+ import geopy
15
  from geopy.geocoders import Nominatim
16
  from geopy.exc import GeocoderTimedOut
 
 
 
17
 
18
  class OSINTEngine:
19
  def __init__(self):
20
+ self.holehe_engine = AsyncEngine()
21
+ self.geocoder = Nominatim(user_agent="osint_search")
22
+
 
23
  async def search_username(self, username: str) -> Dict[str, Any]:
24
+ """Search for username across platforms."""
25
  results = {
26
+ "platforms": [],
27
+ "emails": [],
28
+ "metadata": {}
29
  }
30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  # Holehe search
32
  try:
33
+ holehe_results = await self.holehe_engine.check_all(username)
 
 
 
 
 
34
  for result in holehe_results:
35
+ if result["exists"]:
36
+ results["platforms"].append({
37
+ "name": result["name"],
38
+ "url": result["url"] if "url" in result else None,
39
+ "type": "social" if "social" in result["type"] else "other"
 
 
 
40
  })
41
+ if "email" in result and result["email"]:
42
+ results["emails"].append(result["email"])
43
  except Exception as e:
44
+ print(f"Holehe search error: {e}")
45
 
46
+ # Sherlock search using subprocess
 
 
 
 
47
  try:
48
+ with tempfile.TemporaryDirectory() as temp_dir:
49
+ output_file = os.path.join(temp_dir, "sherlock_results.txt")
50
+ process = subprocess.Popen(
51
+ ["sherlock", username, "--output", output_file],
52
+ stdout=subprocess.PIPE,
53
+ stderr=subprocess.PIPE
54
+ )
55
+ stdout, stderr = process.communicate(timeout=30)
56
+
57
+ if os.path.exists(output_file):
58
+ with open(output_file, 'r') as f:
59
+ for line in f:
60
+ if "|" in line:
61
+ platform, url = line.strip().split("|")
62
+ results["platforms"].append({
63
+ "name": platform.strip(),
64
+ "url": url.strip(),
65
+ "type": "social"
66
+ })
67
  except Exception as e:
68
+ print(f"Sherlock search error: {e}")
69
+
70
+ # Deduplicate results
71
+ results["platforms"] = list({json.dumps(x) for x in results["platforms"]})
72
+ results["platforms"] = [json.loads(x) for x in results["platforms"]]
73
+ results["emails"] = list(set(results["emails"]))
74
+
75
+ return results
 
 
 
 
 
 
 
 
 
 
 
76
 
77
+ async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]:
78
+ """Search for person information."""
79
  results = {
80
+ "basic_info": {},
81
+ "locations": [],
82
  "social_profiles": [],
83
+ "metadata": {}
 
84
  }
85
 
86
+ # Process location if provided
87
  if location:
88
+ try:
89
+ location_info = self.geocoder.geocode(location, timeout=10)
90
+ if location_info:
91
+ results["locations"].append({
92
+ "address": location_info.address,
93
+ "latitude": location_info.latitude,
94
+ "longitude": location_info.longitude
95
+ })
96
+ except GeocoderTimedOut:
97
+ print("Geocoding timed out")
98
+ except Exception as e:
99
+ print(f"Geocoding error: {e}")
100
 
101
+ # Basic info
102
+ results["basic_info"] = {
103
+ "name": name,
104
+ "age": age if age else None,
105
+ "location": location if location else None
106
+ }
107
+
108
+ # Search for potential usernames
109
+ usernames = self._generate_username_variants(name)
110
+ for username in usernames[:3]: # Limit to first 3 variants
111
+ username_results = await self.search_username(username)
112
+ results["social_profiles"].extend(username_results["platforms"])
113
+
114
+ # Deduplicate social profiles
115
+ results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]})
116
+ results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]]
117
 
118
  return results
119
 
120
+ def _generate_username_variants(self, name: str) -> List[str]:
121
+ """Generate possible username variants from a name."""
122
+ name = name.lower()
123
+ parts = name.split()
124
+ variants = []
125
+
126
+ if len(parts) >= 2:
127
+ first, last = parts[0], parts[-1]
128
+ variants.extend([
129
+ first + last,
130
+ first + "_" + last,
131
+ first + "." + last,
132
+ first[0] + last,
133
+ first + last[0],
134
+ last + first
135
+ ])
136
+
137
+ if len(parts) == 1:
138
+ variants.extend([
139
+ parts[0],
140
+ parts[0] + "123",
141
+ "the" + parts[0]
142
+ ])
143
+
144
+ return list(set(variants))
145
+
146
+ async def search_domain(self, domain: str) -> Dict[str, Any]:
147
+ """Get information about a domain."""
148
  try:
149
+ domain_info = whois.whois(domain)
150
+ return {
151
+ "registrar": domain_info.registrar,
152
+ "creation_date": domain_info.creation_date,
153
+ "expiration_date": domain_info.expiration_date,
154
+ "last_updated": domain_info.updated_date,
155
+ "status": domain_info.status,
156
+ "name_servers": domain_info.name_servers,
157
+ "emails": domain_info.emails,
158
+ "raw": domain_info
159
+ }
160
  except Exception as e:
161
+ return {
162
+ "error": str(e)
163
+ }
engines/search.py CHANGED
@@ -3,6 +3,7 @@ Advanced RAG-based search engine with multi-source intelligence.
3
  """
4
  from typing import List, Dict, Any, Optional
5
  import asyncio
 
6
  from langchain_community.embeddings import HuggingFaceEmbeddings
7
  from langchain_community.vectorstores import FAISS
8
  from langchain.text_splitter import RecursiveCharacterTextSplitter
 
3
  """
4
  from typing import List, Dict, Any, Optional
5
  import asyncio
6
+ from langchain.chains import RetrievalQAWithSourcesChain
7
  from langchain_community.embeddings import HuggingFaceEmbeddings
8
  from langchain_community.vectorstores import FAISS
9
  from langchain.text_splitter import RecursiveCharacterTextSplitter
requirements.txt CHANGED
@@ -25,7 +25,6 @@ opencv-python-headless>=4.8.0
25
  # OSINT Tools
26
  holehe>=1.61
27
  sherlock-project>=0.14.0
28
- python-sherlock>=0.1.0
29
  python-whois>=0.8.0
30
  geopy>=2.3.0
31
 
 
25
  # OSINT Tools
26
  holehe>=1.61
27
  sherlock-project>=0.14.0
 
28
  python-whois>=0.8.0
29
  geopy>=2.3.0
30