ScamDetector / utils /functions.py
rafaldembski's picture
Update utils/functions.py
f18485b verified
raw
history blame
16.5 kB
# utils/functions.py
import phonenumbers
from phonenumbers import geocoder, carrier
import re
import requests
import os
from datetime import datetime
import logging
from sqlalchemy import create_engine, Column, String, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Konfiguracja logowania
logging.basicConfig(
filename='app.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(message)s'
)
# Konfiguracja bazy danych SQLite
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATABASE_URL = f"sqlite:///{os.path.join(BASE_DIR, '..', 'scam_detector.db')}"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Definicje modeli
class FakeNumber(Base):
__tablename__ = "fake_numbers"
id = Column(Integer, primary_key=True, index=True)
phone_number = Column(String, unique=True, index=True, nullable=False)
reported_at = Column(DateTime, default=datetime.utcnow)
class AnalysisHistory(Base):
__tablename__ = "analysis_history"
id = Column(Integer, primary_key=True, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
message = Column(String, nullable=False)
phone_number = Column(String, nullable=False)
analysis = Column(String, nullable=False)
risk_assessment = Column(String, nullable=False)
recommendations = Column(String, nullable=False)
class Stats(Base):
__tablename__ = "stats"
id = Column(Integer, primary_key=True, index=True)
total_analyses = Column(Integer, default=0)
total_frauds_detected = Column(Integer, default=0)
# Tworzenie tabel w bazie danych (jeśli jeszcze nie istnieją)
Base.metadata.create_all(bind=engine)
# Funkcje pomocnicze
def add_fake_number(phone_number):
"""
Dodaje numer telefonu do bazy danych jako fałszywy, jeśli jeszcze go tam nie ma.
"""
db = SessionLocal()
try:
logging.info(f"Próbuję dodać numer: {phone_number}")
existing_number = db.query(FakeNumber).filter(FakeNumber.phone_number == phone_number).first()
if not existing_number:
new_number = FakeNumber(phone_number=phone_number)
db.add(new_number)
db.commit()
db.refresh(new_number)
logging.info(f"Numer {phone_number} został pomyślnie dodany do bazy danych.")
return True
else:
logging.info(f"Numer {phone_number} już istnieje w bazie danych.")
return False
except Exception as e:
db.rollback()
logging.error(f"Nie udało się zapisać numeru {phone_number}: {e}")
return False
finally:
db.close()
def is_fake_number(phone_number):
"""
Sprawdza, czy dany numer telefonu jest oznaczony jako fałszywy w bazie danych.
"""
db = SessionLocal()
try:
exists = db.query(FakeNumber).filter(FakeNumber.phone_number == phone_number).first() is not None
logging.info(f"Sprawdzanie numeru {phone_number}: {'znaleziony' if exists else 'nie znaleziony'}.")
return exists
except Exception as e:
logging.error(f"Nie udało się sprawdzić numeru {phone_number}: {e}")
return False
finally:
db.close()
def add_to_history(message, phone_number, analysis, risk, recommendations):
"""
Dodaje wpis do historii analiz.
"""
db = SessionLocal()
try:
new_entry = AnalysisHistory(
message=message,
phone_number=phone_number,
analysis=analysis,
risk_assessment=risk,
recommendations=recommendations
)
db.add(new_entry)
db.commit()
logging.info(f"Dodano wpis do historii analiz dla numeru {phone_number}.")
except Exception as e:
db.rollback()
logging.error(f"Nie udało się zapisać historii analiz dla numeru {phone_number}: {e}")
finally:
db.close()
def update_stats(fraud_detected=False):
"""
Aktualizuje statystyki analiz w bazie danych.
"""
db = SessionLocal()
try:
stats = db.query(Stats).first()
if not stats:
stats = Stats()
db.add(stats)
stats.total_analyses += 1
if fraud_detected:
stats.total_frauds_detected += 1
db.commit()
logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats.total_analyses}, Oszustw {stats.total_frauds_detected}.")
except Exception as e:
db.rollback()
logging.error(f"Nie udało się zaktualizować statystyk: {e}")
finally:
db.close()
def get_stats():
"""
Pobiera statystyki analiz z bazy danych.
"""
db = SessionLocal()
try:
stats = db.query(Stats).first()
if stats:
logging.info("Statystyki zostały pobrane pomyślnie.")
return {"total_analyses": stats.total_analyses, "total_frauds_detected": stats.total_frauds_detected}
else:
logging.info("Brak statystyk w bazie danych.")
return {"total_analyses": 0, "total_frauds_detected": 0}
except Exception as e:
logging.error(f"Nie udało się pobrać statystyk: {e}")
return {"total_analyses": 0, "total_frauds_detected": 0}
finally:
db.close()
def get_history():
"""
Pobiera historię analiz z bazy danych jako listę słowników.
"""
db = SessionLocal()
try:
history_entries = db.query(AnalysisHistory).order_by(AnalysisHistory.timestamp.desc()).all()
history = []
for entry in history_entries:
history.append({
'timestamp': entry.timestamp,
'message': entry.message,
'phone_number': entry.phone_number,
'analysis': entry.analysis,
'risk_assessment': entry.risk_assessment,
'recommendations': entry.recommendations
})
logging.info("Historia analiz została pobrana pomyślnie.")
return history
except Exception as e:
logging.error(f"Nie udało się pobrać historii analiz: {e}")
return []
finally:
db.close()
def get_phone_info(phone_number):
"""
Weryfikuje numer telefonu i zwraca informacje o kraju i operatorze.
"""
try:
parsed_number = phonenumbers.parse(phone_number, None)
country = geocoder.description_for_number(parsed_number, 'en') # Możesz zmienić język na 'pl' lub 'de'
operator = carrier.name_for_number(parsed_number, 'en') # Możesz zmienić język na 'pl' lub 'de'
logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.")
return country, operator
except phonenumbers.NumberParseException as e:
logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}")
return None, None
def simple_checks(message, language):
"""
Przeprowadza proste sprawdzenia heurystyczne wiadomości SMS.
"""
warnings = []
# Baza słów kluczowych (polski, niemiecki, angielski)
scam_keywords = {
'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'],
'German': ['Geld', 'Überweisung', 'Passwort', 'Code', 'Preis', 'Gewinn', 'dringend', 'Hilfe', 'Gebühr', 'Bank', 'Karte', 'Konto', 'Anmeldung', 'Transaktion', 'Verifizierung', 'persönliche Daten', 'schnelle Zahlung', 'Kontosperrung', 'Benachrichtigung'],
'English': ['money', 'transfer', 'password', 'code', 'prize', 'win', 'urgent', 'help', 'fee', 'bank', 'card', 'account', 'login', 'transaction', 'verification', 'personal information', 'quick payment', 'account lock', 'notification']
}
selected_keywords = scam_keywords.get(language, scam_keywords['English'])
if any(keyword in message.lower() for keyword in selected_keywords):
warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.")
if re.search(r'http[s]?://', message):
warnings.append("Wiadomość zawiera link.")
if re.search(r'\b(podaj|prześlij|udostępnij)\b.*\b(hasło|kod|dane osobowe|numer konta)\b', message.lower()):
warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
return warnings
def analyze_message(message, phone_number, additional_info, api_key, language):
"""
Analizuje wiadomość SMS za pomocą API SambaNova.
"""
if not api_key:
logging.error("Brak klucza API.")
return "Brak klucza API.", "Brak klucza API.", "Brak klucza API."
url = "https://api.sambanova.ai/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}"
}
# System prompts w trzech językach
system_prompts = {
'Polish': """
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje:
<analysis>
**Analiza Treści Wiadomości:**
- Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp.
- Opisz kontekst językowy i kulturowy wiadomości.
- Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy.
</analysis>
<risk_assessment>
**Ocena Ryzyka Oszustwa:**
- Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko.
- Wyjaśnij, jakie czynniki wpływają na tę ocenę.
</risk_assessment>
<recommendations>
**Zalecenia dla Użytkownika:**
- Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć.
- Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości.
- Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe.
</recommendations>
Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo.
""",
'German': """
Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierung gefälschter SMS-Nachrichten. Deine Aufgabe ist es, eine detaillierte Analyse der Nachricht durchzuführen, indem du einen tiefgreifenden Denkprozess nutzt und eine umfassende Bewertung lieferst. Deine Antwort sollte in drei Abschnitte unterteilt sein:
<analysis>
**Nachrichteninhaltsanalyse:**
- Führe eine detaillierte Analyse des Nachrichteninhalts durch und identifiziere potenzielle rote Flaggen wie sprachliche Fehler, Aufforderungen zur Preisgabe persönlicher Daten, dringende Kontaktanfragen usw.
- Beschreibe den sprachlichen und kulturellen Kontext der Nachricht.
- Identifiziere alle Elemente, die darauf hindeuten könnten, dass die Nachricht ein Versuch ist, Informationen oder Geld zu erlangen.
</analysis>
<risk_assessment>
**Betrugsrisikobewertung:**
- Basierend auf der Inhaltsanalyse und den verfügbaren Informationen, bewerte die Wahrscheinlichkeit, dass die Nachricht ein Betrug ist. Verwende eine Skala von 1 bis 10, wobei 1 sehr geringes Risiko und 10 sehr hohes Risiko bedeutet.
- Erkläre, welche Faktoren diese Bewertung beeinflussen.
</risk_assessment>
<recommendations>
**Empfehlungen für den Benutzer:**
- Gib klare und konkrete Empfehlungen zu den nächsten Schritten, die der Benutzer unternehmen sollte.
- Berücksichtige Sicherheitsempfehlungen wie das Blockieren des Absenders, das Melden der Nachricht an entsprechende Behörden oder das Ignorieren der Nachricht.
- Wenn möglich, schlage zusätzliche Vorsichtsmaßnahmen vor, die der Benutzer ergreifen kann, um seine persönlichen und finanziellen Daten zu schützen.
</recommendations>
Deine Antwort sollte genau nach den oben genannten Richtlinien formatiert sein und die Markierungen <analysis>, <risk_assessment> und <recommendations> verwenden. Stelle sicher, dass jeder Abschnitt vollständig und detailliert ausgefüllt ist.
""",
'English': """
You are an advanced AI assistant specializing in identifying fake SMS messages. Your task is to conduct a detailed analysis of the message, utilizing a deep thinking process and providing a comprehensive assessment. Your response should be divided into three sections:
<analysis>
**Message Content Analysis:**
- Conduct a detailed analysis of the message content, identifying potential red flags such as language errors, requests for personal information, urgent contact requests, etc.
- Describe the linguistic and cultural context of the message.
- Identify any elements that may suggest the message is an attempt to solicit information or money.
</analysis>
<risk_assessment>
**Fraud Risk Assessment:**
- Based on the content analysis and available information, assess the likelihood that the message is fraudulent. Use a scale from 1 to 10, where 1 indicates very low risk and 10 indicates very high risk.
- Explain the factors that influence this assessment.
</risk_assessment>
<recommendations>
**User Recommendations:**
- Provide clear and concrete recommendations regarding the next steps the user should take.
- Include security suggestions such as blocking the sender, reporting the message to appropriate authorities, or ignoring the message.
- If possible, suggest additional precautionary measures the user can take to protect their personal and financial information.
</recommendations>
Your response should be formatted exactly as specified above, using the <analysis>, <risk_assessment>, and <recommendations> tags. Ensure that each section is thoroughly and comprehensively filled out.
"""
}
system_prompt = system_prompts.get(language, system_prompts['English']) # Default to English if language not found
user_prompt = f"""Analyze the following message for potential fraud:
Message: "{message}"
Sender's Phone Number: "{phone_number}"
Additional Information:
{additional_info}
Provide your analysis and conclusions following the guidelines above."""
payload = {
"model": "Meta-Llama-3.1-8B-Instruct",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_tokens": 1000,
"temperature": 0.2,
"top_p": 0.9,
"stop": ["<|eot_id|>"]
}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
ai_response = data['choices'][0]['message']['content']
# Parsowanie odpowiedzi
analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL)
risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL)
recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL)
analysis_text = analysis.group(1).strip() if analysis else "No analysis available."
risk_text = risk_assessment.group(1).strip() if risk_assessment else "No risk assessment available."
recommendations_text = recommendations.group(1).strip() if recommendations else "No recommendations available."
return analysis_text, risk_text, recommendations_text
else:
logging.error(f"API Error: {response.status_code} - {response.text}")
return f"API Error: {response.status_code} - {response.text}", "Analysis Error.", "Analysis Error."
except Exception as e:
logging.error(f"API Connection Error: {e}")
return f"API Connection Error: {e}", "Analysis Error.", "Analysis Error."