Spaces:
Running
Running
# utils/functions.py | |
import phonenumbers | |
from phonenumbers import geocoder, carrier | |
import re | |
import requests | |
import os | |
from datetime import datetime | |
import logging | |
import json | |
import pycountry # Upewnij się, że zainstalowałeś tę bibliotekę: pip install pycountry | |
# Konfiguracja logowania | |
logging.basicConfig( | |
filename='app.log', | |
level=logging.INFO, | |
format='%(asctime)s %(levelname)s:%(message)s' | |
) | |
# Definiowanie ścieżek do plików JSON | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
DATA_DIR = os.path.join(BASE_DIR, '..', 'data') | |
FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json') | |
HISTORY_FILE = os.path.join(DATA_DIR, 'history.json') | |
STATS_FILE = os.path.join(DATA_DIR, 'stats.json') | |
# Funkcje pomocnicze | |
def load_json(file_path): | |
"""Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt.""" | |
if not os.path.exists(file_path): | |
if file_path.endswith('stats.json'): | |
return {"total_analyses": 0, "total_frauds_detected": 0} | |
else: | |
return [] | |
with open(file_path, 'r', encoding='utf-8') as file: | |
try: | |
data = json.load(file) | |
return data | |
except json.JSONDecodeError: | |
logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.") | |
if file_path.endswith('stats.json'): | |
return {"total_analyses": 0, "total_frauds_detected": 0} | |
return [] | |
def save_json(file_path, data): | |
"""Zapisuje dane do pliku JSON.""" | |
with open(file_path, 'w', encoding='utf-8') as file: | |
json.dump(data, file, ensure_ascii=False, indent=4) | |
logging.info(f"Dane zostały zapisane do {file_path}.") | |
def add_fake_number(phone_number): | |
""" | |
Dodaje numer telefonu do pliku fake_numbers.json jako fałszywy, jeśli jeszcze go tam nie ma. | |
""" | |
fake_numbers = load_json(FAKE_NUMBERS_FILE) | |
if phone_number not in fake_numbers: | |
fake_numbers.append(phone_number) | |
save_json(FAKE_NUMBERS_FILE, fake_numbers) | |
logging.info(f"Numer {phone_number} został pomyślnie dodany do fake_numbers.json.") | |
return True | |
else: | |
logging.info(f"Numer {phone_number} już istnieje w fake_numbers.json.") | |
return False | |
def is_fake_number(phone_number): | |
""" | |
Sprawdza, czy dany numer telefonu jest oznaczony jako fałszywy w pliku fake_numbers.json. | |
""" | |
fake_numbers = load_json(FAKE_NUMBERS_FILE) | |
exists = phone_number in fake_numbers | |
logging.info(f"Sprawdzanie numeru {phone_number}: {'znaleziony' if exists else 'nie znaleziony'}.") | |
return exists | |
def get_fake_numbers(): | |
""" | |
Pobiera listę fałszywych numerów z pliku fake_numbers.json. | |
""" | |
fake_numbers = load_json(FAKE_NUMBERS_FILE) | |
return fake_numbers | |
def add_to_history(message, phone_number, analysis, risk, recommendations): | |
""" | |
Dodaje wpis do historii analiz w pliku history.json. | |
""" | |
history = load_json(HISTORY_FILE) | |
history.append({ | |
"timestamp": datetime.now().isoformat(), | |
"message": message, | |
"phone_number": phone_number, | |
"analysis": analysis, | |
"risk_assessment": risk, | |
"recommendations": recommendations | |
}) | |
save_json(HISTORY_FILE, history) | |
logging.info(f"Dodano wpis do history.json dla numeru {phone_number}.") | |
def get_history(): | |
""" | |
Pobiera historię analiz z pliku history.json jako listę słowników. | |
""" | |
history = load_json(HISTORY_FILE) | |
logging.info("Historia analiz została pobrana pomyślnie.") | |
return history | |
def update_stats(fraud_detected=False): | |
""" | |
Aktualizuje statystyki analiz w pliku stats.json. | |
""" | |
stats = load_json(STATS_FILE) | |
stats["total_analyses"] += 1 | |
if fraud_detected: | |
stats["total_frauds_detected"] += 1 | |
save_json(STATS_FILE, stats) | |
logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['total_analyses']}, Oszustw {stats['total_frauds_detected']}.") | |
def get_stats(): | |
""" | |
Pobiera statystyki analiz z pliku stats.json. | |
""" | |
stats = load_json(STATS_FILE) | |
logging.info("Statystyki zostały pobrane pomyślnie.") | |
return stats | |
def get_phone_info(phone_number): | |
""" | |
Weryfikuje numer telefonu i zwraca informacje o kraju i operatorze. | |
""" | |
try: | |
parsed_number = phonenumbers.parse(phone_number, None) | |
country = geocoder.description_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego | |
operator = carrier.name_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego | |
if not country: | |
country = "Nieznany" | |
if not operator: | |
operator = "Nieznany" | |
logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.") | |
return country, operator | |
except phonenumbers.NumberParseException as e: | |
logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}") | |
return "Nieznany", "Nieznany" | |
def simple_checks(message, language): | |
""" | |
Przeprowadza proste sprawdzenia heurystyczne wiadomości SMS. | |
""" | |
warnings = [] | |
# Baza słów kluczowych (polski, niemiecki, angielski) | |
scam_keywords = { | |
'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'], | |
'German': ['Geld', 'Überweisung', 'Passwort', 'Code', 'Preis', 'Gewinn', 'dringend', 'Hilfe', 'Gebühr', 'Bank', 'Karte', 'Konto', 'Anmeldung', 'Transaktion', 'Verifizierung', 'persönliche Daten', 'schnelle Zahlung', 'Kontosperrung', 'Benachrichtigung'], | |
'English': ['money', 'transfer', 'password', 'code', 'prize', 'win', 'urgent', 'help', 'fee', 'bank', 'card', 'account', 'login', 'transaction', 'verification', 'personal information', 'quick payment', 'account lock', 'notification'] | |
} | |
selected_keywords = scam_keywords.get(language, scam_keywords['English']) | |
message_lower = message.lower() | |
if any(keyword.lower() in message_lower for keyword in selected_keywords): | |
warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.") | |
if re.search(r'http[s]?://', message): | |
warnings.append("Wiadomość zawiera link.") | |
if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower): | |
warnings.append("Wiadomość zawiera prośbę o poufne informacje.") | |
return warnings | |
def analyze_message(message, phone_number, additional_info, api_key, language): | |
""" | |
Analizuje wiadomość SMS za pomocą API SambaNova. | |
""" | |
if not api_key: | |
logging.error("Brak klucza API.") | |
return "Brak klucza API.", "Brak klucza API.", "Brak klucza API." | |
url = "https://api.sambanova.ai/v1/chat/completions" # Upewnij się, że to poprawny URL | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
system_prompts = { | |
'Polish': """ | |
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje: | |
<analysis> | |
**Analiza Treści Wiadomości:** | |
- Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp. | |
- Opisz kontekst językowy i kulturowy wiadomości. | |
- Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy. | |
</analysis> | |
<risk_assessment> | |
**Ocena Ryzyka Oszustwa:** | |
- Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko. | |
- Wyjaśnij, jakie czynniki wpływają na tę ocenę. | |
</risk_assessment> | |
<recommendations> | |
**Zalecenia dla Użytkownika:** | |
- Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć. | |
- Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości. | |
- Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe. | |
</recommendations> | |
Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo. | |
""", | |
'German': """ | |
Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierung gefälschter SMS-Nachrichten. Deine Aufgabe ist es, eine detaillierte Analyse der Nachricht durchzuführen, indem du einen tiefgreifenden Denkprozess nutzt und eine umfassende Bewertung lieferst. Deine Antwort sollte in drei Abschnitte unterteilt sein: | |
<analysis> | |
**Nachrichteninhaltsanalyse:** | |
- Führe eine detaillierte Analyse des Nachrichteninhalts durch und identifiziere potenzielle rote Flaggen wie sprachliche Fehler, Aufforderungen zur Preisgabe persönlicher Daten, dringende Kontaktanfragen usw. | |
- Beschreibe den sprachlichen und kulturellen Kontext der Nachricht. | |
- Identifiziere alle Elemente, die darauf hindeuten könnten, dass die Nachricht ein Versuch ist, Informationen oder Geld zu erlangen. | |
</analysis> | |
<risk_assessment> | |
**Betrugsrisikobewertung:** | |
- Basierend auf der Inhaltsanalyse und den verfügbaren Informationen, bewerte die Wahrscheinlichkeit, dass die Nachricht ein Betrug ist. Verwende eine Skala von 1 bis 10, wobei 1 sehr geringes Risiko und 10 sehr hohes Risiko bedeutet. | |
- Erkläre, welche Faktoren diese Bewertung beeinflussen. | |
</risk_assessment> | |
<recommendations> | |
**Empfehlungen für den Benutzer:** | |
- Gib klare und konkrete Empfehlungen zu den nächsten Schritten, die der Benutzer unternehmen sollte. | |
- Berücksichtige Sicherheitsempfehlungen wie das Blockieren des Absenders, das Melden der Nachricht an entsprechende Behörden oder das Ignorieren der Nachricht. | |
- Wenn möglich, schlage zusätzliche Vorsichtsmaßnahmen vor, die der Benutzer ergreifen kann, um seine persönlichen und finanziellen Daten zu schützen. | |
</recommendations> | |
Deine Antwort sollte genau nach den oben genannten Richtlinien formatiert sein und die Markierungen <analysis>, <risk_assessment> und <recommendations> verwenden. Stelle sicher, dass jeder Abschnitt vollständig und detailliert ausgefüllt ist. | |
""", | |
'English': """ | |
You are an advanced AI assistant specializing in identifying fake SMS messages. Your task is to conduct a detailed analysis of the message, utilizing a deep thinking process and providing a comprehensive assessment. Your response should be divided into three sections: | |
<analysis> | |
**Message Content Analysis:** | |
- Conduct a detailed analysis of the message content, identifying potential red flags such as language errors, requests for personal information, urgent contact requests, etc. | |
- Describe the linguistic and cultural context of the message. | |
- Identify any elements that may suggest the message is an attempt to solicit information or money. | |
</analysis> | |
<risk_assessment> | |
**Fraud Risk Assessment:** | |
- Based on the content analysis and available information, assess the likelihood that the message is fraudulent. Use a scale from 1 to 10, where 1 indicates very low risk and 10 indicates very high risk. | |
- Explain the factors that influence this assessment. | |
</risk_assessment> | |
<recommendations> | |
**User Recommendations:** | |
- Provide clear and concrete recommendations regarding the next steps the user should take. | |
- Include security suggestions such as blocking the sender, reporting the message to appropriate authorities, or ignoring the message. | |
- If possible, suggest additional precautionary measures the user can take to protect their personal and financial information. | |
</recommendations> | |
Your response should be formatted exactly as specified above, using the <analysis>, <risk_assessment>, and <recommendations> tags. Ensure that each section is thoroughly and comprehensively filled out. | |
""" | |
} | |
system_prompt = system_prompts.get(language, system_prompts['English']) # Domyślnie angielski, jeśli język nie jest obsługiwany | |
user_prompt = f"""Analyze the following message for potential fraud: | |
Message: "{message}" | |
Sender's Phone Number: "{phone_number}" | |
Additional Information: | |
{additional_info} | |
Provide your analysis and conclusions following the guidelines above.""" | |
payload = { | |
"model": "Meta-Llama-3.1-8B-Instruct", # Upewnij się, że to poprawny model API | |
"messages": [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt} | |
], | |
"max_tokens": 1000, | |
"temperature": 0.2, | |
"top_p": 0.9, | |
"stop": ["<|eot_id|>"] | |
} | |
try: | |
response = requests.post(url, headers=headers, json=payload) | |
if response.status_code == 200: | |
data = response.json() | |
ai_response = data['choices'][0]['message']['content'] | |
# Parsowanie odpowiedzi | |
analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL) | |
risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL) | |
recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL) | |
analysis_text = analysis.group(1).strip() if analysis else "Brak analizy." | |
risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka." | |
recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń." | |
return analysis_text, risk_text, recommendations_text | |
else: | |
logging.error(f"Błąd API: {response.status_code} - {response.text}") | |
return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy." | |
except Exception as e: | |
logging.error(f"Błąd połączenia z API: {e}") | |
return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy." | |