rafaldembski commited on
Commit
10c0ef6
verified
1 Parent(s): 6e630eb

Update utils/functions.py

Browse files
Files changed (1) hide show
  1. utils/functions.py +70 -41
utils/functions.py CHANGED
@@ -8,10 +8,60 @@ import os
8
  import json
9
  from datetime import datetime
10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  def get_phone_info(phone_number):
12
- """
13
- Weryfikuje numer telefonu i zwraca kraj oraz operatora.
14
- """
15
  try:
16
  parsed_number = phonenumbers.parse(phone_number, None)
17
  country = geocoder.description_for_number(parsed_number, 'pl')
@@ -20,28 +70,31 @@ def get_phone_info(phone_number):
20
  except phonenumbers.NumberParseException:
21
  return None, None
22
 
 
23
  def simple_checks(message):
24
- """
25
- Wykonuje podstawowe sprawdzenia heurystyczne wiadomo艣ci SMS.
26
- """
27
  warnings = []
 
28
  scam_keywords = ['pieni膮dze', 'przelew', 'has艂o', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'op艂ata']
29
  if any(keyword in message.lower() for keyword in scam_keywords):
30
  warnings.append("Wiadomo艣膰 zawiera s艂owa kluczowe zwi膮zane z potencjalnym oszustwem.")
 
31
  if re.search(r'http[s]?://', message):
32
  warnings.append("Wiadomo艣膰 zawiera link.")
 
33
  if re.search(r'\b(podaj|prze艣lij|udost臋pnij)\b.*\b(has艂o|kod|dane osobowe|numer konta)\b', message.lower()):
34
  warnings.append("Wiadomo艣膰 zawiera pro艣b臋 o poufne informacje.")
35
  return warnings
36
 
37
- def analyze_message(message, phone_number, additional_info, api_key):
38
- """
39
- Analizuje wiadomo艣膰 SMS za pomoc膮 API SambaNova.
40
- """
 
41
  url = "https://api.sambanova.ai/v1/chat/completions"
42
  headers = {
43
- "Authorization": f"Bearer {api_key}"
44
  }
 
45
  system_prompt = """
46
  Jeste艣 zaawansowanym asystentem AI specjalizuj膮cym si臋 w identyfikacji fa艂szywych wiadomo艣ci SMS. Twoim zadaniem jest przeprowadzenie szczeg贸艂owej analizy wiadomo艣ci, wykorzystuj膮c g艂臋boki proces my艣lenia i dostarczaj膮c kompleksow膮 ocen臋. Twoja odpowied藕 powinna by膰 podzielona na trzy sekcje:
47
 
@@ -73,9 +126,6 @@ Twoja odpowied藕 powinna by膰 sformatowana dok艂adnie w powy偶szy spos贸b, u偶yw
73
  Wiadomo艣膰: "{message}"
74
  Numer telefonu nadawcy: {phone_number}"
75
 
76
- Dodatkowe informacje:
77
- {additional_info}
78
-
79
  Podaj swoj膮 analiz臋 i wnioski zgodnie z powy偶szymi wytycznymi."""
80
 
81
  payload = {
@@ -95,6 +145,7 @@ Podaj swoj膮 analiz臋 i wnioski zgodnie z powy偶szymi wytycznymi."""
95
  if response.status_code == 200:
96
  data = response.json()
97
  ai_response = data['choices'][0]['message']['content']
 
98
  analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL)
99
  risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL)
100
  recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL)
@@ -109,19 +160,15 @@ Podaj swoj膮 analiz臋 i wnioski zgodnie z powy偶szymi wytycznymi."""
109
  except Exception as e:
110
  return f"B艂膮d po艂膮czenia z API: {e}", "B艂膮d analizy.", "B艂膮d analizy."
111
 
 
112
  def init_stats_file():
113
- """
114
- Inicjalizuje plik statystyk, je艣li nie istnieje.
115
- """
116
  stats_file = 'stats.json'
117
  if not os.path.exists(stats_file):
118
  with open(stats_file, 'w') as f:
119
  json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f)
120
 
 
121
  def update_stats(fraud_detected=False):
122
- """
123
- Aktualizuje statystyki analizy.
124
- """
125
  stats_file = 'stats.json'
126
  try:
127
  with open(stats_file, 'r') as f:
@@ -136,31 +183,15 @@ def update_stats(fraud_detected=False):
136
  with open(stats_file, 'w') as f:
137
  json.dump(stats, f, indent=4)
138
 
139
- def get_stats():
140
- """
141
- Pobiera aktualne statystyki analizy.
142
- """
143
- stats_file = 'stats.json'
144
- try:
145
- with open(stats_file, 'r') as f:
146
- stats = json.load(f)
147
- return stats
148
- except (json.JSONDecodeError, FileNotFoundError):
149
- return {"total_analyses": 0, "total_frauds_detected": 0}
150
-
151
  def init_history_file():
152
- """
153
- Inicjalizuje plik historii analiz, je艣li nie istnieje.
154
- """
155
  history_file = 'history.json'
156
  if not os.path.exists(history_file):
157
  with open(history_file, 'w') as f:
158
  json.dump([], f)
159
 
 
160
  def add_to_history(message, phone_number, analysis, risk, recommendations):
161
- """
162
- Dodaje wpis do historii analiz.
163
- """
164
  history_file = 'history.json'
165
  try:
166
  with open(history_file, 'r') as f:
@@ -180,10 +211,8 @@ def add_to_history(message, phone_number, analysis, risk, recommendations):
180
  with open(history_file, 'w') as f:
181
  json.dump(history, f, indent=4)
182
 
 
183
  def get_history():
184
- """
185
- Pobiera histori臋 analiz.
186
- """
187
  history_file = 'history.json'
188
  try:
189
  with open(history_file, 'r') as f:
 
8
  import json
9
  from datetime import datetime
10
 
11
+ # 艢cie偶ka do pliku JSON przechowuj膮cego fa艂szywe numery
12
+ FAKE_NUMBERS_FILE = 'fake_numbers.json'
13
+
14
+ # Inicjalizacja pliku JSON przechowuj膮cego fa艂szywe numery
15
+ def init_fake_numbers_file():
16
+ if not os.path.exists(FAKE_NUMBERS_FILE):
17
+ with open(FAKE_NUMBERS_FILE, 'w') as f:
18
+ json.dump([], f)
19
+ else:
20
+ # Sprawdzenie, czy plik nie jest pusty i zawiera prawid艂owy JSON
21
+ try:
22
+ with open(FAKE_NUMBERS_FILE, 'r') as f:
23
+ json.load(f)
24
+ except json.JSONDecodeError:
25
+ # Je艣li plik jest uszkodzony lub pusty, zresetuj go do pustej listy
26
+ with open(FAKE_NUMBERS_FILE, 'w') as f:
27
+ json.dump([], f)
28
+
29
+ # Dodanie numeru telefonu do pliku JSON
30
+ def add_fake_number(phone_number):
31
+ try:
32
+ with open(FAKE_NUMBERS_FILE, 'r') as f:
33
+ fake_numbers = json.load(f)
34
+ except (json.JSONDecodeError, FileNotFoundError):
35
+ fake_numbers = []
36
+
37
+ if not any(entry["phone_number"] == phone_number for entry in fake_numbers):
38
+ fake_numbers.append({
39
+ "phone_number": phone_number,
40
+ "reported_at": datetime.now().isoformat()
41
+ })
42
+ try:
43
+ with open(FAKE_NUMBERS_FILE, 'w') as f:
44
+ json.dump(fake_numbers, f, indent=4)
45
+ return True
46
+ except Exception as e:
47
+ return False
48
+ else:
49
+ return False # Numer ju偶 istnieje
50
+
51
+ # Sprawdzenie, czy numer telefonu jest w pliku JSON
52
+ def is_fake_number(phone_number):
53
+ try:
54
+ with open(FAKE_NUMBERS_FILE, 'r') as f:
55
+ fake_numbers = json.load(f)
56
+ return any(entry["phone_number"] == phone_number for entry in fake_numbers)
57
+ except (json.JSONDecodeError, FileNotFoundError):
58
+ return False
59
+
60
+ # Pobierz klucz API z zmiennej 艣rodowiskowej
61
+ API_KEY = os.getenv('SAMBANOVA_API_KEY')
62
+
63
+ # Funkcja do weryfikacji numeru telefonu
64
  def get_phone_info(phone_number):
 
 
 
65
  try:
66
  parsed_number = phonenumbers.parse(phone_number, None)
67
  country = geocoder.description_for_number(parsed_number, 'pl')
 
70
  except phonenumbers.NumberParseException:
71
  return None, None
72
 
73
+ # Proste sprawdzenia heurystyczne wiadomo艣ci
74
  def simple_checks(message):
 
 
 
75
  warnings = []
76
+ # S艂owa kluczowe cz臋sto u偶ywane w oszustwach
77
  scam_keywords = ['pieni膮dze', 'przelew', 'has艂o', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'op艂ata']
78
  if any(keyword in message.lower() for keyword in scam_keywords):
79
  warnings.append("Wiadomo艣膰 zawiera s艂owa kluczowe zwi膮zane z potencjalnym oszustwem.")
80
+ # Sprawdzenie obecno艣ci link贸w
81
  if re.search(r'http[s]?://', message):
82
  warnings.append("Wiadomo艣膰 zawiera link.")
83
+ # Sprawdzenie, czy nadawca prosi o poufne informacje
84
  if re.search(r'\b(podaj|prze艣lij|udost臋pnij)\b.*\b(has艂o|kod|dane osobowe|numer konta)\b', message.lower()):
85
  warnings.append("Wiadomo艣膰 zawiera pro艣b臋 o poufne informacje.")
86
  return warnings
87
 
88
+ # Funkcja do analizy wiadomo艣ci za pomoc膮 API SambaNova z g艂臋bszym procesem my艣lenia
89
+ def analyze_message(message, phone_number):
90
+ if not API_KEY:
91
+ return "Brak klucza API.", "Brak klucza API.", "Brak klucza API."
92
+
93
  url = "https://api.sambanova.ai/v1/chat/completions"
94
  headers = {
95
+ "Authorization": f"Bearer {API_KEY}"
96
  }
97
+ # Rozbudowany system prompt z g艂臋bszym procesem my艣lenia
98
  system_prompt = """
99
  Jeste艣 zaawansowanym asystentem AI specjalizuj膮cym si臋 w identyfikacji fa艂szywych wiadomo艣ci SMS. Twoim zadaniem jest przeprowadzenie szczeg贸艂owej analizy wiadomo艣ci, wykorzystuj膮c g艂臋boki proces my艣lenia i dostarczaj膮c kompleksow膮 ocen臋. Twoja odpowied藕 powinna by膰 podzielona na trzy sekcje:
100
 
 
126
  Wiadomo艣膰: "{message}"
127
  Numer telefonu nadawcy: {phone_number}"
128
 
 
 
 
129
  Podaj swoj膮 analiz臋 i wnioski zgodnie z powy偶szymi wytycznymi."""
130
 
131
  payload = {
 
145
  if response.status_code == 200:
146
  data = response.json()
147
  ai_response = data['choices'][0]['message']['content']
148
+ # Parsowanie odpowiedzi
149
  analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL)
150
  risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL)
151
  recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL)
 
160
  except Exception as e:
161
  return f"B艂膮d po艂膮czenia z API: {e}", "B艂膮d analizy.", "B艂膮d analizy."
162
 
163
+ # Inicjalizacja pliku statystyk
164
  def init_stats_file():
 
 
 
165
  stats_file = 'stats.json'
166
  if not os.path.exists(stats_file):
167
  with open(stats_file, 'w') as f:
168
  json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f)
169
 
170
+ # Aktualizacja statystyk analizy
171
  def update_stats(fraud_detected=False):
 
 
 
172
  stats_file = 'stats.json'
173
  try:
174
  with open(stats_file, 'r') as f:
 
183
  with open(stats_file, 'w') as f:
184
  json.dump(stats, f, indent=4)
185
 
186
+ # Inicjalizacja pliku historii analiz
 
 
 
 
 
 
 
 
 
 
 
187
  def init_history_file():
 
 
 
188
  history_file = 'history.json'
189
  if not os.path.exists(history_file):
190
  with open(history_file, 'w') as f:
191
  json.dump([], f)
192
 
193
+ # Dodanie wpisu do historii analiz
194
  def add_to_history(message, phone_number, analysis, risk, recommendations):
 
 
 
195
  history_file = 'history.json'
196
  try:
197
  with open(history_file, 'r') as f:
 
211
  with open(history_file, 'w') as f:
212
  json.dump(history, f, indent=4)
213
 
214
+ # Pobranie historii analiz
215
  def get_history():
 
 
 
216
  history_file = 'history.json'
217
  try:
218
  with open(history_file, 'r') as f: