Spaces:
Running
Running
Update utils/functions.py
Browse files- utils/functions.py +86 -28
utils/functions.py
CHANGED
@@ -1,15 +1,16 @@
|
|
1 |
# utils/functions.py
|
2 |
|
3 |
import phonenumbers
|
4 |
-
from phonenumbers import geocoder, carrier
|
5 |
import re
|
6 |
import requests
|
7 |
import os
|
8 |
from datetime import datetime
|
9 |
import logging
|
10 |
import json
|
11 |
-
|
12 |
-
|
|
|
13 |
|
14 |
# Konfiguracja logowania
|
15 |
logging.basicConfig(
|
@@ -25,13 +26,16 @@ FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json')
|
|
25 |
HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
|
26 |
STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
|
27 |
|
|
|
|
|
|
|
28 |
# Funkcje pomocnicze
|
29 |
|
30 |
def load_json(file_path):
|
31 |
"""Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt."""
|
32 |
if not os.path.exists(file_path):
|
33 |
if file_path.endswith('stats.json'):
|
34 |
-
return {"
|
35 |
else:
|
36 |
return []
|
37 |
with open(file_path, 'r', encoding='utf-8') as file:
|
@@ -41,7 +45,7 @@ def load_json(file_path):
|
|
41 |
except json.JSONDecodeError:
|
42 |
logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.")
|
43 |
if file_path.endswith('stats.json'):
|
44 |
-
return {"
|
45 |
return []
|
46 |
|
47 |
def save_json(file_path, data):
|
@@ -80,21 +84,24 @@ def get_fake_numbers():
|
|
80 |
fake_numbers = load_json(FAKE_NUMBERS_FILE)
|
81 |
return fake_numbers
|
82 |
|
83 |
-
def add_to_history(
|
84 |
"""
|
85 |
Dodaje wpis do historii analiz w pliku history.json.
|
86 |
"""
|
87 |
history = load_json(HISTORY_FILE)
|
88 |
history.append({
|
89 |
-
"
|
90 |
-
"
|
91 |
-
"
|
92 |
-
|
93 |
-
|
94 |
-
|
|
|
|
|
|
|
95 |
})
|
96 |
save_json(HISTORY_FILE, history)
|
97 |
-
logging.info(f"Dodano wpis do history.json dla
|
98 |
|
99 |
def get_history():
|
100 |
"""
|
@@ -109,11 +116,13 @@ def update_stats(fraud_detected=False):
|
|
109 |
Aktualizuje statystyki analiz w pliku stats.json.
|
110 |
"""
|
111 |
stats = load_json(STATS_FILE)
|
112 |
-
stats["
|
113 |
if fraud_detected:
|
114 |
-
stats["
|
|
|
|
|
115 |
save_json(STATS_FILE, stats)
|
116 |
-
logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['
|
117 |
|
118 |
def get_stats():
|
119 |
"""
|
@@ -129,21 +138,21 @@ def get_phone_info(phone_number):
|
|
129 |
"""
|
130 |
try:
|
131 |
parsed_number = phonenumbers.parse(phone_number, None)
|
132 |
-
country = geocoder.description_for_number(parsed_number, 'pl')
|
133 |
-
operator = carrier.name_for_number(parsed_number, 'pl')
|
134 |
if not country:
|
135 |
country = "Nieznany"
|
136 |
if not operator:
|
137 |
operator = "Nieznany"
|
138 |
logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.")
|
139 |
return country, operator
|
140 |
-
except
|
141 |
logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}")
|
142 |
return "Nieznany", "Nieznany"
|
143 |
|
144 |
def simple_checks(message, language):
|
145 |
"""
|
146 |
-
Przeprowadza proste sprawdzenia heurystyczne wiadomości
|
147 |
"""
|
148 |
warnings = []
|
149 |
# Baza słów kluczowych (polski, niemiecki, angielski)
|
@@ -165,9 +174,9 @@ def simple_checks(message, language):
|
|
165 |
warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
|
166 |
return warnings
|
167 |
|
168 |
-
def analyze_message(
|
169 |
"""
|
170 |
-
Analizuje
|
171 |
"""
|
172 |
if not api_key:
|
173 |
logging.error("Brak klucza API.")
|
@@ -181,7 +190,7 @@ def analyze_message(message, phone_number, additional_info, api_key, language):
|
|
181 |
|
182 |
system_prompts = {
|
183 |
'Polish': """
|
184 |
-
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości
|
185 |
|
186 |
<analysis>
|
187 |
**Analiza Treści Wiadomości:**
|
@@ -206,7 +215,7 @@ Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałsz
|
|
206 |
Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo.
|
207 |
""",
|
208 |
'German': """
|
209 |
-
Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierung gefälschter
|
210 |
|
211 |
<analysis>
|
212 |
**Nachrichteninhaltsanalyse:**
|
@@ -231,7 +240,7 @@ Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierun
|
|
231 |
Deine Antwort sollte genau nach den oben genannten Richtlinien formatiert sein und die Markierungen <analysis>, <risk_assessment> und <recommendations> verwenden. Stelle sicher, dass jeder Abschnitt vollständig und detailliert ausgefüllt ist.
|
232 |
""",
|
233 |
'English': """
|
234 |
-
You are an advanced AI assistant specializing in identifying fake
|
235 |
|
236 |
<analysis>
|
237 |
**Message Content Analysis:**
|
@@ -257,12 +266,12 @@ Your response should be formatted exactly as specified above, using the <analysi
|
|
257 |
"""
|
258 |
}
|
259 |
|
260 |
-
system_prompt = system_prompts.get(language, system_prompts['English'])
|
261 |
|
262 |
user_prompt = f"""Analyze the following message for potential fraud:
|
263 |
|
264 |
-
Message: "{
|
265 |
-
Sender
|
266 |
|
267 |
Additional Information:
|
268 |
{additional_info}
|
@@ -302,3 +311,52 @@ Provide your analysis and conclusions following the guidelines above."""
|
|
302 |
except Exception as e:
|
303 |
logging.error(f"Błąd połączenia z API: {e}")
|
304 |
return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
# utils/functions.py
|
2 |
|
3 |
import phonenumbers
|
4 |
+
from phonenumbers import geocoder, carrier, NumberParseException
|
5 |
import re
|
6 |
import requests
|
7 |
import os
|
8 |
from datetime import datetime
|
9 |
import logging
|
10 |
import json
|
11 |
+
import whois
|
12 |
+
from PIL import Image
|
13 |
+
import pytesseract
|
14 |
|
15 |
# Konfiguracja logowania
|
16 |
logging.basicConfig(
|
|
|
26 |
HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
|
27 |
STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
|
28 |
|
29 |
+
# Upewnij się, że katalog 'data' istnieje
|
30 |
+
os.makedirs(DATA_DIR, exist_ok=True)
|
31 |
+
|
32 |
# Funkcje pomocnicze
|
33 |
|
34 |
def load_json(file_path):
|
35 |
"""Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt."""
|
36 |
if not os.path.exists(file_path):
|
37 |
if file_path.endswith('stats.json'):
|
38 |
+
return {"total": 0, "fraud": 0, "safe": 0}
|
39 |
else:
|
40 |
return []
|
41 |
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
45 |
except json.JSONDecodeError:
|
46 |
logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.")
|
47 |
if file_path.endswith('stats.json'):
|
48 |
+
return {"total": 0, "fraud": 0, "safe": 0}
|
49 |
return []
|
50 |
|
51 |
def save_json(file_path, data):
|
|
|
84 |
fake_numbers = load_json(FAKE_NUMBERS_FILE)
|
85 |
return fake_numbers
|
86 |
|
87 |
+
def add_to_history(content, sender_info, analysis_text, risk_text, recommendations_text):
|
88 |
"""
|
89 |
Dodaje wpis do historii analiz w pliku history.json.
|
90 |
"""
|
91 |
history = load_json(HISTORY_FILE)
|
92 |
history.append({
|
93 |
+
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
94 |
+
"type": "Analysis",
|
95 |
+
"details": {
|
96 |
+
"content": content,
|
97 |
+
"sender_info": sender_info,
|
98 |
+
"analysis": analysis_text,
|
99 |
+
"risk_assessment": risk_text,
|
100 |
+
"recommendations": recommendations_text
|
101 |
+
}
|
102 |
})
|
103 |
save_json(HISTORY_FILE, history)
|
104 |
+
logging.info(f"Dodano wpis do history.json dla nadawcy {sender_info}.")
|
105 |
|
106 |
def get_history():
|
107 |
"""
|
|
|
116 |
Aktualizuje statystyki analiz w pliku stats.json.
|
117 |
"""
|
118 |
stats = load_json(STATS_FILE)
|
119 |
+
stats["total"] += 1
|
120 |
if fraud_detected:
|
121 |
+
stats["fraud"] += 1
|
122 |
+
else:
|
123 |
+
stats["safe"] += 1
|
124 |
save_json(STATS_FILE, stats)
|
125 |
+
logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['total']}, Oszustw {stats['fraud']}.")
|
126 |
|
127 |
def get_stats():
|
128 |
"""
|
|
|
138 |
"""
|
139 |
try:
|
140 |
parsed_number = phonenumbers.parse(phone_number, None)
|
141 |
+
country = geocoder.description_for_number(parsed_number, 'pl')
|
142 |
+
operator = carrier.name_for_number(parsed_number, 'pl')
|
143 |
if not country:
|
144 |
country = "Nieznany"
|
145 |
if not operator:
|
146 |
operator = "Nieznany"
|
147 |
logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.")
|
148 |
return country, operator
|
149 |
+
except NumberParseException as e:
|
150 |
logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}")
|
151 |
return "Nieznany", "Nieznany"
|
152 |
|
153 |
def simple_checks(message, language):
|
154 |
"""
|
155 |
+
Przeprowadza proste sprawdzenia heurystyczne wiadomości.
|
156 |
"""
|
157 |
warnings = []
|
158 |
# Baza słów kluczowych (polski, niemiecki, angielski)
|
|
|
174 |
warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
|
175 |
return warnings
|
176 |
|
177 |
+
def analyze_message(content, sender_info, additional_info, api_key, language):
|
178 |
"""
|
179 |
+
Analizuje treść wiadomości za pomocą modelu AI, wykorzystując system prompts.
|
180 |
"""
|
181 |
if not api_key:
|
182 |
logging.error("Brak klucza API.")
|
|
|
190 |
|
191 |
system_prompts = {
|
192 |
'Polish': """
|
193 |
+
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości. Twoim zadaniem jest przeprowadzenie szczegółowej analizy poniższej wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje:
|
194 |
|
195 |
<analysis>
|
196 |
**Analiza Treści Wiadomości:**
|
|
|
215 |
Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo.
|
216 |
""",
|
217 |
'German': """
|
218 |
+
Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierung gefälschter Nachrichten. Deine Aufgabe ist es, eine detaillierte Analyse der folgenden Nachricht durchzuführen, indem du einen tiefgreifenden Denkprozess nutzt und eine umfassende Bewertung lieferst. Deine Antwort sollte in drei Abschnitte unterteilt sein:
|
219 |
|
220 |
<analysis>
|
221 |
**Nachrichteninhaltsanalyse:**
|
|
|
240 |
Deine Antwort sollte genau nach den oben genannten Richtlinien formatiert sein und die Markierungen <analysis>, <risk_assessment> und <recommendations> verwenden. Stelle sicher, dass jeder Abschnitt vollständig und detailliert ausgefüllt ist.
|
241 |
""",
|
242 |
'English': """
|
243 |
+
You are an advanced AI assistant specializing in identifying fake messages. Your task is to conduct a detailed analysis of the following message, utilizing a deep thinking process and providing a comprehensive assessment. Your response should be divided into three sections:
|
244 |
|
245 |
<analysis>
|
246 |
**Message Content Analysis:**
|
|
|
266 |
"""
|
267 |
}
|
268 |
|
269 |
+
system_prompt = system_prompts.get(language, system_prompts['English'])
|
270 |
|
271 |
user_prompt = f"""Analyze the following message for potential fraud:
|
272 |
|
273 |
+
Message: "{content}"
|
274 |
+
Sender Information: "{sender_info}"
|
275 |
|
276 |
Additional Information:
|
277 |
{additional_info}
|
|
|
311 |
except Exception as e:
|
312 |
logging.error(f"Błąd połączenia z API: {e}")
|
313 |
return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy."
|
314 |
+
|
315 |
+
def get_email_info(email_address):
|
316 |
+
"""
|
317 |
+
Pobiera informacje o domenie nadawcy e-mail za pomocą zapytania WHOIS.
|
318 |
+
"""
|
319 |
+
try:
|
320 |
+
domain = email_address.split('@')[-1]
|
321 |
+
w = whois.whois(domain)
|
322 |
+
organization = w.org if w.org else 'Nieznana'
|
323 |
+
country = w.country if w.country else 'Nieznany'
|
324 |
+
logging.info(f"Domena {domain} - Organizacja: {organization}, Kraj: {country}.")
|
325 |
+
return {
|
326 |
+
'domain': domain,
|
327 |
+
'organization': organization,
|
328 |
+
'country': country
|
329 |
+
}
|
330 |
+
except Exception as e:
|
331 |
+
logging.error(f"Nie udało się pobrać informacji WHOIS dla domeny {email_address}: {e}")
|
332 |
+
return None
|
333 |
+
|
334 |
+
def analyze_url(url):
|
335 |
+
"""
|
336 |
+
Pobiera zawartość strony internetowej pod podanym URL.
|
337 |
+
"""
|
338 |
+
try:
|
339 |
+
response = requests.get(url)
|
340 |
+
if response.status_code == 200:
|
341 |
+
logging.info(f"Pobrano zawartość strony {url}.")
|
342 |
+
# Zwróć pierwsze 500 znaków treści strony
|
343 |
+
return response.text[:500]
|
344 |
+
else:
|
345 |
+
logging.error(f"Nie udało się pobrać zawartości strony {url}. Kod statusu HTTP: {response.status_code}")
|
346 |
+
return f"Nie udało się pobrać zawartości strony. Kod statusu HTTP: {response.status_code}"
|
347 |
+
except Exception as e:
|
348 |
+
logging.error(f"Błąd podczas pobierania URL {url}: {e}")
|
349 |
+
return f"Błąd podczas pobierania URL: {e}"
|
350 |
+
|
351 |
+
def extract_text_from_image(image_file):
|
352 |
+
"""
|
353 |
+
Ekstraktuje tekst z obrazu za pomocą Tesseract OCR.
|
354 |
+
"""
|
355 |
+
try:
|
356 |
+
image = Image.open(image_file)
|
357 |
+
text = pytesseract.image_to_string(image, lang='pol') # Upewnij się, że masz zainstalowane języki dla Tesseract
|
358 |
+
logging.info("Tekst został wyodrębniony z obrazu.")
|
359 |
+
return text
|
360 |
+
except Exception as e:
|
361 |
+
logging.error(f"Błąd podczas ekstrakcji tekstu z obrazu: {e}")
|
362 |
+
return f"Błąd podczas ekstrakcji tekstu z obrazu: {e}"
|