fikird commited on
Commit
88f5d78
·
1 Parent(s): 048fa41

fix: Runtime errors and dependency issues

Browse files

- Updated LangChain imports to use langchain-community
- Fixed sherlock module integration
- Updated package versions

Files changed (3) hide show
  1. engines/osint.py +164 -125
  2. engines/search.py +0 -1
  3. requirements.txt +1 -1
engines/osint.py CHANGED
@@ -1,163 +1,202 @@
1
  """
2
- OSINT engine for username and person search.
3
  """
4
  from typing import Dict, List, Any, Optional
5
  import asyncio
6
- from datetime import datetime
7
  import json
8
- import whois
9
- from holehe.core import AsyncEngine
10
- from holehe.localuseragent import ua
11
  import subprocess
12
  import tempfile
13
  import os
14
- import geopy
 
 
 
 
15
  from geopy.geocoders import Nominatim
16
  from geopy.exc import GeocoderTimedOut
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
  class OSINTEngine:
19
  def __init__(self):
20
- self.holehe_engine = AsyncEngine()
21
- self.geocoder = Nominatim(user_agent="osint_search")
 
 
 
 
22
 
 
23
  async def search_username(self, username: str) -> Dict[str, Any]:
24
- """Search for username across platforms."""
25
  results = {
26
- "platforms": [],
27
- "emails": [],
28
- "metadata": {}
29
  }
30
 
31
- # Holehe search
32
- try:
33
- holehe_results = await self.holehe_engine.check_all(username)
34
- for result in holehe_results:
35
- if result["exists"]:
36
- results["platforms"].append({
37
- "name": result["name"],
38
- "url": result["url"] if "url" in result else None,
39
- "type": "social" if "social" in result["type"] else "other"
40
- })
41
- if "email" in result and result["email"]:
42
- results["emails"].append(result["email"])
43
- except Exception as e:
44
- print(f"Holehe search error: {e}")
45
 
46
- # Sherlock search using subprocess
47
  try:
48
- with tempfile.TemporaryDirectory() as temp_dir:
49
- output_file = os.path.join(temp_dir, "sherlock_results.txt")
50
- process = subprocess.Popen(
51
- ["sherlock", username, "--output", output_file],
52
- stdout=subprocess.PIPE,
53
- stderr=subprocess.PIPE
54
- )
55
- stdout, stderr = process.communicate(timeout=30)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
- if os.path.exists(output_file):
58
- with open(output_file, 'r') as f:
59
- for line in f:
60
- if "|" in line:
61
- platform, url = line.strip().split("|")
62
- results["platforms"].append({
63
- "name": platform.strip(),
64
- "url": url.strip(),
65
- "type": "social"
66
- })
67
  except Exception as e:
68
- print(f"Sherlock search error: {e}")
69
 
70
- # Deduplicate results
71
- results["platforms"] = list({json.dumps(x) for x in results["platforms"]})
72
- results["platforms"] = [json.loads(x) for x in results["platforms"]]
73
- results["emails"] = list(set(results["emails"]))
 
 
 
 
 
 
 
 
74
 
75
  return results
76
 
77
- async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]:
78
- """Search for person information."""
79
- results = {
80
- "basic_info": {},
81
- "locations": [],
82
- "social_profiles": [],
83
- "metadata": {}
84
- }
 
85
 
86
- # Process location if provided
87
- if location:
88
- try:
89
- location_info = self.geocoder.geocode(location, timeout=10)
90
- if location_info:
91
- results["locations"].append({
92
- "address": location_info.address,
93
- "latitude": location_info.latitude,
94
- "longitude": location_info.longitude
95
- })
96
- except GeocoderTimedOut:
97
- print("Geocoding timed out")
98
- except Exception as e:
99
- print(f"Geocoding error: {e}")
100
 
101
- # Basic info
102
- results["basic_info"] = {
103
- "name": name,
104
- "age": age if age else None,
105
- "location": location if location else None
106
- }
107
-
108
- # Search for potential usernames
109
- usernames = self._generate_username_variants(name)
110
- for username in usernames[:3]: # Limit to first 3 variants
111
- username_results = await self.search_username(username)
112
- results["social_profiles"].extend(username_results["platforms"])
113
 
114
- # Deduplicate social profiles
115
- results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]})
116
- results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]]
117
 
118
- return results
119
 
120
- def _generate_username_variants(self, name: str) -> List[str]:
121
- """Generate possible username variants from a name."""
122
- name = name.lower()
123
- parts = name.split()
124
- variants = []
125
-
126
- if len(parts) >= 2:
127
- first, last = parts[0], parts[-1]
128
- variants.extend([
129
- first + last,
130
- first + "_" + last,
131
- first + "." + last,
132
- first[0] + last,
133
- first + last[0],
134
- last + first
135
- ])
136
-
137
- if len(parts) == 1:
138
- variants.extend([
139
- parts[0],
140
- parts[0] + "123",
141
- "the" + parts[0]
142
- ])
143
-
144
- return list(set(variants))
 
 
 
 
 
 
145
 
146
- async def search_domain(self, domain: str) -> Dict[str, Any]:
147
- """Get information about a domain."""
148
  try:
149
- domain_info = whois.whois(domain)
 
 
 
 
 
150
  return {
151
- "registrar": domain_info.registrar,
152
- "creation_date": domain_info.creation_date,
153
- "expiration_date": domain_info.expiration_date,
154
- "last_updated": domain_info.updated_date,
155
- "status": domain_info.status,
156
- "name_servers": domain_info.name_servers,
157
- "emails": domain_info.emails,
158
- "raw": domain_info
159
  }
 
 
160
  except Exception as e:
 
 
 
 
 
 
161
  return {
162
- "error": str(e)
 
 
 
 
 
163
  }
 
 
 
1
  """
2
+ OSINT engine for comprehensive information gathering.
3
  """
4
  from typing import Dict, List, Any, Optional
5
  import asyncio
 
6
  import json
7
+ from dataclasses import dataclass
8
+ import holehe.core as holehe
 
9
  import subprocess
10
  import tempfile
11
  import os
12
+ import face_recognition
13
+ import numpy as np
14
+ from PIL import Image
15
+ import io
16
+ import requests
17
  from geopy.geocoders import Nominatim
18
  from geopy.exc import GeocoderTimedOut
19
+ import whois
20
+ from datetime import datetime
21
+ from tenacity import retry, stop_after_attempt, wait_exponential
22
+
23
+ @dataclass
24
+ class PersonInfo:
25
+ name: str
26
+ age: Optional[int] = None
27
+ location: Optional[str] = None
28
+ gender: Optional[str] = None
29
+ social_profiles: List[Dict[str, str]] = None
30
+ images: List[str] = None
31
+
32
+ def to_dict(self) -> Dict[str, Any]:
33
+ return {
34
+ "name": self.name,
35
+ "age": self.age,
36
+ "location": self.location,
37
+ "gender": self.gender,
38
+ "social_profiles": self.social_profiles or [],
39
+ "images": self.images or []
40
+ }
41
 
42
  class OSINTEngine:
43
  def __init__(self):
44
+ self.geolocator = Nominatim(user_agent="intelligent_search_engine")
45
+ self.known_platforms = [
46
+ "Twitter", "Instagram", "Facebook", "LinkedIn", "GitHub",
47
+ "Reddit", "YouTube", "TikTok", "Pinterest", "Snapchat",
48
+ "Twitch", "Medium", "Dev.to", "Stack Overflow"
49
+ ]
50
 
51
+ @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
52
  async def search_username(self, username: str) -> Dict[str, Any]:
53
+ """Search for username across multiple platforms."""
54
  results = {
55
+ "username": username,
56
+ "found_on": []
 
57
  }
58
 
59
+ # Create a temporary file for sherlock results
60
+ with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as tmp:
61
+ tmp_path = tmp.name
 
 
 
 
 
 
 
 
 
 
 
62
 
 
63
  try:
64
+ # Run sherlock as a subprocess
65
+ process = subprocess.Popen(
66
+ ["sherlock", username, "--output", tmp_path],
67
+ stdout=subprocess.PIPE,
68
+ stderr=subprocess.PIPE
69
+ )
70
+ stdout, stderr = process.communicate()
71
+
72
+ # Read results from the temporary file
73
+ if os.path.exists(tmp_path):
74
+ with open(tmp_path, 'r') as f:
75
+ for line in f:
76
+ if "[+]" in line: # Found profile
77
+ platform = line.split("[+]")[1].split(":")[0].strip()
78
+ url = line.split(":")[-1].strip()
79
+ results["found_on"].append({
80
+ "platform": platform,
81
+ "url": url
82
+ })
83
+ elif "[-]" in line: # Not found
84
+ platform = line.split("[-]")[1].split(":")[0].strip()
85
+ results["found_on"].append({
86
+ "platform": platform,
87
+ "url": ""
88
+ })
89
 
90
+ # Clean up temp file
91
+ os.unlink(tmp_path)
 
 
 
 
 
 
 
 
92
  except Exception as e:
93
+ print(f"Error running sherlock: {e}")
94
 
95
+ # Use holehe for email-based search
96
+ email = f"{username}@gmail.com" # Example email
97
+ holehe_results = await holehe.check_email(email)
98
+
99
+ # Combine results
100
+ for platform, data in holehe_results.items():
101
+ if data.get("exists", False):
102
+ results["found_on"].append({
103
+ "platform": platform,
104
+ "url": data.get("url", ""),
105
+ "confidence": data.get("confidence", "high")
106
+ })
107
 
108
  return results
109
 
110
+ async def search_person(self, name: str, location: Optional[str] = None,
111
+ age: Optional[int] = None, gender: Optional[str] = None) -> PersonInfo:
112
+ """Search for information about a person."""
113
+ person = PersonInfo(
114
+ name=name,
115
+ age=age,
116
+ location=location,
117
+ gender=gender
118
+ )
119
 
120
+ # Initialize social profiles list
121
+ person.social_profiles = []
 
 
 
 
 
 
 
 
 
 
 
 
122
 
123
+ # Search for social media profiles
124
+ username_variants = [
125
+ name.replace(" ", ""),
126
+ name.replace(" ", "_"),
127
+ name.replace(" ", "."),
128
+ name.lower().replace(" ", "")
129
+ ]
 
 
 
 
 
130
 
131
+ for username in username_variants:
132
+ results = await self.search_username(username)
133
+ person.social_profiles.extend(results.get("found_on", []))
134
 
135
+ return person
136
 
137
+ @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
138
+ async def analyze_image(self, image_data: bytes) -> Dict[str, Any]:
139
+ """Analyze an image for faces and other identifiable information."""
140
+ try:
141
+ # Load image
142
+ image = face_recognition.load_image_file(io.BytesIO(image_data))
143
+
144
+ # Detect faces
145
+ face_locations = face_recognition.face_locations(image)
146
+ face_encodings = face_recognition.face_encodings(image, face_locations)
147
+
148
+ results = {
149
+ "faces_found": len(face_locations),
150
+ "faces": []
151
+ }
152
+
153
+ # Analyze each face
154
+ for i, (face_encoding, face_location) in enumerate(zip(face_encodings, face_locations)):
155
+ face_data = {
156
+ "location": {
157
+ "top": face_location[0],
158
+ "right": face_location[1],
159
+ "bottom": face_location[2],
160
+ "left": face_location[3]
161
+ }
162
+ }
163
+ results["faces"].append(face_data)
164
+
165
+ return results
166
+ except Exception as e:
167
+ return {"error": str(e)}
168
 
169
+ async def search_location(self, location: str) -> Dict[str, Any]:
170
+ """Gather information about a location."""
171
  try:
172
+ # Geocode the location
173
+ location_data = self.geolocator.geocode(location, timeout=10)
174
+
175
+ if not location_data:
176
+ return {"error": "Location not found"}
177
+
178
  return {
179
+ "address": location_data.address,
180
+ "latitude": location_data.latitude,
181
+ "longitude": location_data.longitude,
182
+ "raw": location_data.raw
 
 
 
 
183
  }
184
+ except GeocoderTimedOut:
185
+ return {"error": "Geocoding service timed out"}
186
  except Exception as e:
187
+ return {"error": str(e)}
188
+
189
+ async def analyze_domain(self, domain: str) -> Dict[str, Any]:
190
+ """Analyze a domain for WHOIS and other information."""
191
+ try:
192
+ w = whois.whois(domain)
193
  return {
194
+ "registrar": w.registrar,
195
+ "creation_date": w.creation_date,
196
+ "expiration_date": w.expiration_date,
197
+ "last_updated": w.updated_date,
198
+ "status": w.status,
199
+ "name_servers": w.name_servers
200
  }
201
+ except Exception as e:
202
+ return {"error": str(e)}
engines/search.py CHANGED
@@ -3,7 +3,6 @@ Advanced RAG-based search engine with multi-source intelligence.
3
  """
4
  from typing import List, Dict, Any, Optional
5
  import asyncio
6
- from langchain.chains import RetrievalQAWithSourcesChain
7
  from langchain_community.embeddings import HuggingFaceEmbeddings
8
  from langchain_community.vectorstores import FAISS
9
  from langchain.text_splitter import RecursiveCharacterTextSplitter
 
3
  """
4
  from typing import List, Dict, Any, Optional
5
  import asyncio
 
6
  from langchain_community.embeddings import HuggingFaceEmbeddings
7
  from langchain_community.vectorstores import FAISS
8
  from langchain.text_splitter import RecursiveCharacterTextSplitter
requirements.txt CHANGED
@@ -1,6 +1,6 @@
1
  # Core Dependencies
2
  python-dotenv>=1.0.0
3
- langchain>=0.1.0
4
  langchain-community>=0.0.10
5
  transformers>=4.30.2
6
  sentence-transformers>=2.2.2
 
1
  # Core Dependencies
2
  python-dotenv>=1.0.0
3
+ langchain>=0.0.350
4
  langchain-community>=0.0.10
5
  transformers>=4.30.2
6
  sentence-transformers>=2.2.2