import phonenumbers from phonenumbers import geocoder, carrier import re import requests import os from datetime import datetime import logging import json from PIL import Image # Upewnij się, że zainstalowałeś tę bibliotekę: pip install pillow import pytesseract # Upewnij się, że zainstalowałeś tę bibliotekę: pip install pytesseract # Konfiguracja logowania logging.basicConfig( filename='app.log', level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s' ) # Definiowanie ścieżek do plików JSON BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(BASE_DIR, '..', 'data') FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json') HISTORY_FILE = os.path.join(DATA_DIR, 'history.json') STATS_FILE = os.path.join(DATA_DIR, 'stats.json') # Funkcje pomocnicze def load_json(file_path): """Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt.""" if not os.path.exists(file_path): if file_path.endswith('stats.json'): return {"total_analyses": 0, "total_frauds_detected": 0} else: return [] with open(file_path, 'r', encoding='utf-8') as file: try: data = json.load(file) return data except json.JSONDecodeError: logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.") if file_path.endswith('stats.json'): return {"total_analyses": 0, "total_frauds_detected": 0} return [] def save_json(file_path, data): """Zapisuje dane do pliku JSON.""" with open(file_path, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) logging.info(f"Dane zostały zapisane do {file_path}.") def add_fake_number(phone_number): """Dodaje numer telefonu do pliku fake_numbers.json jako fałszywy, jeśli jeszcze go tam nie ma.""" fake_numbers = load_json(FAKE_NUMBERS_FILE) if phone_number not in fake_numbers: fake_numbers.append(phone_number) save_json(FAKE_NUMBERS_FILE, fake_numbers) logging.info(f"Numer {phone_number} został pomyślnie dodany do fake_numbers.json.") return True else: logging.info(f"Numer {phone_number} już istnieje w fake_numbers.json.") return False def is_fake_number(phone_number): """Sprawdza, czy dany numer telefonu jest oznaczony jako fałszywy w pliku fake_numbers.json.""" fake_numbers = load_json(FAKE_NUMBERS_FILE) exists = phone_number in fake_numbers logging.info(f"Sprawdzanie numeru {phone_number}: {'znaleziony' if exists else 'nie znaleziony'}.") return exists def get_fake_numbers(): """Pobiera listę fałszywych numerów z pliku fake_numbers.json.""" fake_numbers = load_json(FAKE_NUMBERS_FILE) return fake_numbers def add_to_history(message, phone_number, analysis, risk, recommendations): """Dodaje wpis do historii analiz w pliku history.json.""" history = load_json(HISTORY_FILE) history.append({ "timestamp": datetime.now().isoformat(), "message": message, "phone_number": phone_number, "analysis": analysis, "risk_assessment": risk, "recommendations": recommendations }) save_json(HISTORY_FILE, history) logging.info(f"Dodano wpis do history.json dla numeru {phone_number}.") def get_history(): """Pobiera historię analiz z pliku history.json jako listę słowników.""" history = load_json(HISTORY_FILE) logging.info("Historia analiz została pobrana pomyślnie.") return history def update_stats(fraud_detected=False): """Aktualizuje statystyki analiz w pliku stats.json.""" stats = load_json(STATS_FILE) stats["total_analyses"] += 1 if fraud_detected: stats["total_frauds_detected"] += 1 save_json(STATS_FILE, stats) logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['total_analyses']}, Oszustw {stats['total_frauds_detected']}.") def get_stats(): """Pobiera statystyki analiz z pliku stats.json.""" stats = load_json(STATS_FILE) logging.info("Statystyki zostały pobrane pomyślnie.") return stats def get_phone_info(phone_number): """Weryfikuje numer telefonu i zwraca informacje o kraju i operatorze.""" try: parsed_number = phonenumbers.parse(phone_number, None) country = geocoder.description_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego operator = carrier.name_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego if not country: country = "Nieznany" if not operator: operator = "Nieznany" logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.") return country, operator except phonenumbers.NumberParseException as e: logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}") return "Nieznany", "Nieznany" def simple_checks(message, language): """Przeprowadza proste sprawdzenia heurystyczne wiadomości SMS.""" warnings = [] scam_keywords = { 'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'], 'German': ['Geld', 'Überweisung', 'Passwort', 'Code', 'Preis', 'Gewinn', 'dringend', 'Hilfe', 'Gebühr', 'Bank', 'Karte', 'Konto', 'Anmeldung', 'Transaktion', 'Verifizierung', 'persönliche Daten', 'schnelle Zahlung', 'Kontosperrung', 'Benachrichtigung'], 'English': ['money', 'transfer', 'password', 'code', 'prize', 'win', 'urgent', 'help', 'fee', 'bank', 'card', 'account', 'login', 'transaction', 'verification', 'personal information', 'quick payment', 'account lock', 'notification'] } selected_keywords = scam_keywords.get(language, scam_keywords['English']) message_lower = message.lower() if any(keyword.lower() in message_lower for keyword in selected_keywords): warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.") if re.search(r'http[s]?://', message): warnings.append("Wiadomość zawiera link.") if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower): warnings.append("Wiadomość zawiera prośbę o poufne informacje.") return warnings def analyze_message(message, phone_number, additional_info, api_key, language): """Analizuje wiadomość SMS za pomocą API SambaNova.""" if not api_key: logging.error("Brak klucza API.") return "Brak klucza API.", "Brak klucza API.", "Brak klucza API." url = "https://api.sambanova.ai/v1/chat/completions" # Upewnij się, że to poprawny URL headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } system_prompts = { 'Polish': """ Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje: **📝 Analiza Treści Wiadomości:** - Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp. - Jakie elementy treści mogą wskazywać na oszustwo? - Jakie słowa kluczowe są używane w wiadomości? (np. "pieniądze", "przelew", "nagroda") - Jakie są reakcje na tę wiadomość w kontekście kulturowym i językowym? **❓ Dodatkowe pytania do przemyślenia:** - Kiedy i jak często otrzymujesz wiadomości z tego numeru? - Czy numer nadawcy jest znany z innych źródeł? - Jakie są konsekwencje dla Ciebie, jeśli ta wiadomość jest oszustwem? - Jakie masz doświadczenia z podobnymi wiadomościami w przeszłości? **⚖️ Ocena Ryzyka Oszustwa:** - Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko. - Jakie czynniki wpływają na tę ocenę? - Jakie są argumenty za i przeciw ocenie tej wiadomości jako oszustwa? **❓ Dodatkowe pytania do przemyślenia:** - Jakie inne wiadomości z tego numeru otrzymywałeś w przeszłości? - Czy wiadomość zawiera jakiekolwiek inne informacje, które mogłyby być użyteczne w ocenie ryzyka? - Jakie są Twoje dotychczasowe doświadczenia z oszustwami SMS? - Jakie są Twoje przemyślenia na temat nadawcy tej wiadomości? **💡 Zalecenia dla Użytkownika:** - Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć. - Jakie środki bezpieczeństwa powinny być wdrożone? **❓ Dodatkowe pytania do przemyślenia:** - Czy powinieneś zgłosić tę wiadomość do operatora sieci? - Jakie dodatkowe kroki możesz podjąć, aby upewnić się, że nie padłeś ofiarą oszustwa? - Czy masz dostęp do innych narzędzi, które mogą pomóc w ocenie tej wiadomości? - Jakie działania możesz podjąć, aby zwiększyć swoje bezpieczeństwo w przyszłości? """, 'German': """ Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierung gefälschter SMS-Nachrichten. Deine Aufgabe ist es, eine detaillierte Analyse der Nachricht durchzuführen, indem du einen tiefgreifenden Denkprozess nutzt und eine umfassende Bewertung lieferst. Deine Antwort sollte in drei Abschnitte unterteilt sein: **📝 Nachrichteninhaltsanalyse:** - Führe eine detaillierte Analyse des Nachrichteninhalts durch und identifiziere potenzielle rote Flaggen wie sprachliche Fehler, Aufforderungen zur Preisgabe persönlicher Daten, dringende Kontaktanfragen usw. - Welche Elemente im Inhalt könnten auf Betrug hinweisen? - Welche Schlüsselwörter werden in der Nachricht verwendet? (z. B. "Geld", "Überweisung", "Preis") - Wie reagieren die Menschen auf diese Nachricht im kulturellen und sprachlichen Kontext? **❓ Zusätzliche Fragen zur Überlegung:** - Wann und wie oft erhältst du Nachrichten von dieser Nummer? - Ist die Nummer des Absenders aus anderen Quellen bekannt? - Was sind die Konsequenzen für dich, wenn diese Nachricht ein Betrug ist? - Welche Erfahrungen hast du in der Vergangenheit mit ähnlichen Nachrichten gemacht? **⚖️ Betrugsrisikobewertung:** - Bewerte die Wahrscheinlichkeit, dass die Nachricht betrügerisch ist, auf einer Skala von 1 bis 10, wobei 1 sehr geringes Risiko und 10 sehr hohes Risiko bedeutet. - Welche Faktoren beeinflussen diese Bewertung? - Was sind die Argumente für und gegen die Bewertung dieser Nachricht als Betrug? **❓ Zusätzliche Fragen zur Überlegung:** - Welche anderen Nachrichten hast du in der Vergangenheit von dieser Nummer erhalten? - Enthält die Nachricht weitere Informationen, die bei der Risikobewertung hilfreich sein könnten? - Welche bisherigen Erfahrungen hast du mit SMS-Betrügereien gemacht? - Welche Gedanken hast du über den Absender dieser Nachricht? **💡 Empfehlungen für den Benutzer:** - Gib klare und spezifische Empfehlungen zu den nächsten Schritten, die der Benutzer unternehmen sollte. - Welche Sicherheitsmaßnahmen sollten umgesetzt werden? **❓ Zusätzne Fragen zur Überlegung:** - Solltest du diese Nachricht deinem Mobilfunkanbieter melden? - Welche weiteren Schritte kannst du unternehmen, um sicherzustellen, dass du nicht Opfer eines Betrugs geworden bist? - Hast du Zugriff auf andere Werkzeuge, die dir helfen können, diese Nachricht zu bewerten? - Welche Maßnahmen kannst du ergreifen, um deine Sicherheit in Zukunft zu erhöhen? """, 'English': """ You are an advanced AI assistant specializing in identifying fake SMS messages. Your task is to conduct a detailed analysis of the message, utilizing a deep thinking process and providing a comprehensive assessment. Your response should be divided into three sections: **📝 Message Content Analysis:** - Conduct a detailed analysis of the message content, identifying potential red flags such as language errors, requests for personal information, urgent contact requests, etc. - What elements of the content may indicate fraud? - What keywords are used in the message? (e.g., "money", "transfer", "prize") - What are the cultural and linguistic reactions to this message? **❓ Additional questions to consider:** - When and how often do you receive messages from this number? - Is the sender's number known from other sources? - What are the consequences for you if this message is a fraud? - What experiences have you had in the past with similar messages? **⚖️ Fraud Risk Assessment:** - Assess the likelihood that the message is fraudulent on a scale from 1 to 10, where 1 indicates very low risk and 10 indicates very high risk. - What factors influence this assessment? - What are the arguments for and against assessing this message as fraud? **❓ Additional questions to consider:** - What other messages have you received from this number in the past? - Does the message contain any other information that could be useful in assessing the risk? - What previous experiences do you have with SMS scams? - What are your thoughts on the sender of this message? **💡 User Recommendations:** - Provide clear and concrete recommendations regarding the next steps the user should take. - What security measures should be implemented? **❓ Additional questions to consider:** - Should you report this message to your service provider? - What additional steps can you take to ensure that you have not fallen victim to a scam? - Do you have access to other tools that can help you assess this message? - What actions can you take to enhance your security in the future? Your response should be formatted exactly as specified above, using the , , and tags. Ensure that each section is thoroughly and comprehensively filled out. """ } system_prompt = system_prompts.get(language, system_prompts['English']) # Domyślnie angielski, jeśli język nie jest obsługiwany user_prompt = f"""Analyze the following message for potential fraud: Message: "{message}" Sender's Phone Number: "{phone_number}" Additional Information: {additional_info} Provide your analysis and conclusions following the guidelines above.""" payload = { "model": "Meta-Llama-3.1-8B-Instruct", # Upewnij się, że to poprawny model API "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "max_tokens": 1000, "temperature": 0.2, "top_p": 0.9, "stop": ["<|eot_id|>"] } try: response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: data = response.json() ai_response = data['choices'][0]['message']['content'] analysis = re.search(r'(.*?)', ai_response, re.DOTALL) risk_assessment = re.search(r'(.*?)', ai_response, re.DOTALL) recommendations = re.search(r'(.*?)', ai_response, re.DOTALL) analysis_text = analysis.group(1).strip() if analysis else "Brak analizy." risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka." recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń." return analysis_text, risk_text, recommendations_text else: logging.error(f"Błąd API: {response.status_code} - {response.text}") return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy." except Exception as e: logging.error(f"Błąd połączenia z API: {e}") return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy." def analyze_url(url): """Analizuje zawartość strony internetowej pod kątem oszustw.""" phishing_urls = [] # Sprawdzanie URL w PhishTank def check_url_phishtank(url): params = { 'format': 'json', 'url': url } try: response = requests.post('https://checkurl.phishtank.com/checkurl/', data=params) if response.status_code == 200: data = response.json() in_database = data.get('results', {}).get('in_database', False) valid = data.get('results', {}).get('valid', False) if in_database and valid: return True else: logging.warning(f"Błąd podczas sprawdzania URL w PhishTank: {response.status_code}") except Exception as e: logging.error(f"Błąd podczas sprawdzania URL w PhishTank: {e}") return False # Sprawdzanie URL w Google Safe Browsing def check_url_safe_browsing(url): api_key = os.getenv('GOOGLE_SAFE_BROWSING_API_KEY') if not api_key: return None unsafe_urls = [] headers = {'Content-Type': 'application/json'} client_body = { 'client': { 'clientId': 'yourcompanyname', 'clientVersion': '1.0' }, 'threatInfo': { 'threatTypes': ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"], 'platformTypes': ["ANY_PLATFORM"], 'threatEntryTypes': ["URL"], 'threatEntries': [{'url': url}] } } try: response = requests.post( f'https://safebrowsing.googleapis.com/v4/threatMatches:find?key={api_key}', headers=headers, json=client_body ) if response.status_code == 200: data = response.json() matches = data.get('matches', []) return [match['threat']['url'] for match in matches] else: logging.error(f"Błąd podczas komunikacji z Google Safe Browsing API: {response.status_code}") except Exception as e: logging.error(f"Błąd podczas sprawdzania URL w Google Safe Browsing: {e}") return [] # Sprawdzanie URL w PhishTank if check_url_phishtank(url): phishing_urls.append(url) # Sprawdzanie URL w Google Safe Browsing unsafe_urls = check_url_safe_browsing(url) if unsafe_urls: phishing_urls.extend(unsafe_urls) return phishing_urls # Zwraca listę zagrożonych URL def extract_text_from_image(image_file): """Ekstrahuje tekst z przesłanego obrazu przy użyciu pytesseract.""" try: image = Image.open(image_file) text = pytesseract.image_to_string(image) return text except Exception as e: logging.error(f"Błąd podczas ekstrakcji tekstu z obrazu: {e}") return "Błąd podczas ekstrakcji tekstu." def get_email_info(email): """Sprawdza informacje o nadawcy e-maila (np. domena, organizacja, kraj).""" domain = email.split('@')[-1] # Prosta ekstrakcja domeny # Możesz dodać więcej logiki do weryfikacji domeny return { "domain": domain, "organization": "Nieznana organizacja", # Możesz dodać logikę, aby zidentyfikować organizację "country": "Nieznany kraj" # Możesz dodać logikę, aby zidentyfikować kraj } def get_analysis_history(): """Zwraca historię analiz z pliku history.json.""" return get_history() # Używamy get_history do uzyskania historii analiz