Spaces:
Sleeping
Sleeping
# main.py | |
import re | |
import time | |
import os | |
import json | |
import pathlib | |
import logging | |
import unicodedata | |
import io | |
import traceback | |
import unidecode | |
import pandas as pd | |
from dotenv import load_dotenv | |
from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends | |
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.security import OAuth2PasswordBearer | |
from pydantic import BaseModel | |
load_dotenv() | |
# Configure logging at the top of the file | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
# Global visual map for replacing visually similar characters. | |
VISUAL_MAP = { | |
'А': 'A', 'В': 'B', 'С': 'C', 'Е': 'E', 'Н': 'H', 'К': 'K', 'М': 'M', | |
'О': 'O', 'Р': 'P', 'Т': 'T', 'Х': 'X', | |
'а': 'a', 'в': 'b', 'с': 'c', 'е': 'e', 'о': 'o', 'р': 'p', 'х': 'x', 'у': 'y', | |
'Я': 'R', 'я': 'r', | |
'ρ': 'p', | |
'Π': 'P', | |
# etc... | |
} | |
# --- GamblingFilter class (with rule updates) --- | |
class GamblingFilter: | |
""" | |
A high-performance filter for detecting online gambling-related comments. | |
Features include aggressive Unicode normalization, keyword matching, and pattern detection. | |
""" | |
def __init__(self): | |
logger.info("Initializing GamblingFilter") | |
self._platform_names = { | |
'agustoto', 'aero', 'aero88', 'dora', 'dora77', 'dewadora', 'pulau777', 'pulau', '777', | |
'jptogel', 'mandalika', 'cnd88', 'axl', 'berkah99', 'weton88', 'garuda', 'hoki' | |
} | |
self._gambling_terms = { | |
'jackpot', 'jp', 'wd', 'depo', 'cuan', 'gacor', 'gacir', 'jekpot', 'sultan', | |
'rezeki nomplok', 'rezeki', 'menang', 'nomplok', 'deposit', 'withdraw', 'maxwin', | |
'auto sultan', 'jepe', 'jepee', 'bikin nagih', 'berkah' | |
} | |
self._ambiguous_terms = { | |
'auto', 'main', 'bermain', 'hasil', 'dapat', 'dapet', 'berkat' | |
} | |
self._safe_indicators = { | |
'tidak mengandung', 'bukan perjudian', 'tanpa perjudian', | |
'dokumentasi', 'profesional', 'pembelajaran' | |
} | |
self._gambling_contexts = [ | |
r'(main|bermain|coba).{1,30}(dapat|dapet|pro|jadi|langsung|menang|jp|cuan)', | |
r'(modal|depo).{1,30}(jadi|langsung|wd|cuan)', | |
r'(jp|jackpot|jekpot).{1,30}(gede|besar|pecah)', | |
r'(berkat|dari).{1,30}(rezeki|menang|cuan|sultan)', | |
r'(gacor|gacir).{1,30}(terus|parah|tiap|hari)', | |
r'(rezeki|cuan).{1,30}(nomplok|datang|mengalir|lancar)', | |
r'(hari ini).{1,30}(menang|cuan|rezeki|berkat)', | |
r'(malah|eh).{1,30}(jadi|dapat|dapet|rezeki)', | |
r'(auto).{1,30}(sultan|cuan|rezeki|kaya)', | |
r'(0\d:[0-5]\d).{1,30}(menang|rezeki|cuan|gacor)', | |
r'(iseng|coba).{1,30}(malah|jadi|eh|pro)', | |
r'(deposit|depo|wd).{1,30}(jadi|langsung)', | |
r'(langsung|auto).{1,30}(jp|cuan|sultan|rezeki)', | |
r'bikin\s+nagih', | |
r'gak\s+ada\s+duanya', | |
r'berkah.{0,20}rezeki', | |
r'puji\s+syukur' | |
] | |
self._compiled_gambling_contexts = [ | |
re.compile(pattern, re.IGNORECASE | re.DOTALL) | |
for pattern in self._gambling_contexts | |
] | |
self._update_platform_pattern() | |
self._number_pattern = re.compile(r'(88|777|77|99|7+)') | |
def _update_platform_pattern(self): | |
"""Recompile the platform name regex based on current _platform_names.""" | |
platform_patterns = [] | |
for platform in self._platform_names: | |
chars = list(platform) | |
segments = [ | |
f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]{{0,3}}' | |
for c in chars[:-1] | |
] | |
segments.append(f'[{chars[-1].upper()}{chars[-1].lower()}]') | |
strict = ''.join(segments) | |
platform_patterns.append(strict) | |
self._platform_pattern = re.compile('|'.join(platform_patterns), re.DOTALL) | |
def add_rule(self, rule_type: str, rule_value: str): | |
""" | |
Add a new rule based on the rule type. | |
Allowed types: 'platform', 'gambling_term', 'safe_indicator', 'gambling_context', 'ambiguous_term' | |
""" | |
rule_type = rule_type.lower() | |
if rule_type == 'platform': | |
self._platform_names.add(rule_value) | |
self._update_platform_pattern() | |
elif rule_type == 'gambling_term': | |
self._gambling_terms.add(rule_value) | |
elif rule_type == 'safe_indicator': | |
self._safe_indicators.add(rule_value) | |
elif rule_type == 'gambling_context': | |
self._gambling_contexts.append(rule_value) | |
self._compiled_gambling_contexts.append(re.compile(rule_value, re.IGNORECASE | re.DOTALL)) | |
elif rule_type == 'ambiguous_term': | |
self._ambiguous_terms.add(rule_value) | |
else: | |
raise ValueError("Unsupported rule type") | |
def _strip_all_formatting(self, text: str) -> str: | |
return ''.join(c.lower() for c in text if c.isalnum() or c.isspace()) | |
def _robust_normalize(self, text: str) -> str: | |
# Step 1: custom mapping for visually similar characters | |
mapped_text = ''.join(VISUAL_MAP.get(ch, ch) for ch in text) | |
# Step 2: Unicode normalization + unidecode | |
decomposed = unicodedata.normalize('NFKD', mapped_text) | |
ascii_equiv = unidecode.unidecode(decomposed) | |
return ascii_equiv.lower() | |
def _extract_platform_names(self, text: str) -> list: | |
matches = [] | |
pattern_matches = self._platform_pattern.findall(text) | |
if pattern_matches: | |
pattern_matches = [m for sublist in pattern_matches for m in sublist if m] | |
matches.extend(pattern_matches) | |
normalized = self._robust_normalize(text) | |
stripped = self._strip_all_formatting(text) | |
for platform in self._platform_names: | |
if platform in normalized or platform in stripped: | |
if not any(platform in m.lower() for m in matches): | |
matches.append(platform) | |
if '88' in text or '88' in normalized: | |
if not any('88' in m for m in matches): | |
matches.append('88') | |
if '777' in text or '777' in normalized: | |
if not any('777' in m for m in matches): | |
matches.append('777') | |
return matches | |
def normalize_text(self, text: str) -> str: | |
normalized = unicodedata.normalize('NFKD', text) | |
normalized = ''.join(c for c in normalized if ord(c) < 128 or c.isspace()) | |
return normalized.lower() | |
def is_gambling_comment(self, text: str, threshold: float = 0.55) -> tuple: | |
start_time = time.time() | |
logger.info(f"Analyzing comment for gambling content: {text[:100]}...") | |
metrics = { | |
'platform_matches': [], | |
'gambling_term_matches': [], | |
'context_matches': [], | |
'safe_indicators': [], | |
'has_numbers': False, | |
'confidence_score': 0.0, | |
'processing_time_ms': 0 | |
} | |
normalized_text = self.normalize_text(text) | |
stripped_text = self._strip_all_formatting(text) | |
aggressive_text = self._robust_normalize(text) | |
for indicator in self._safe_indicators: | |
if indicator in normalized_text: | |
metrics['safe_indicators'].append(indicator) | |
if metrics['safe_indicators']: | |
metrics['confidence_score'] = 0.0 | |
metrics['processing_time_ms'] = (time.time() - start_time) * 1000 | |
return False, metrics | |
platform_matches = self._extract_platform_names(text) | |
if platform_matches: | |
metrics['platform_matches'] = platform_matches | |
for term in self._gambling_terms: | |
if term in normalized_text or term in stripped_text or term in aggressive_text: | |
metrics['gambling_term_matches'].append(term) | |
if self._number_pattern.search(normalized_text): | |
metrics['has_numbers'] = True | |
for pattern in self._compiled_gambling_contexts: | |
match = pattern.search(normalized_text) | |
if match: | |
metrics['context_matches'].append(match.group(0)) | |
match = pattern.search(aggressive_text) | |
if match and match.group(0) not in metrics['context_matches']: | |
metrics['context_matches'].append(match.group(0)) | |
platform_score = min(len(metrics['platform_matches']) * 1.0, 1) | |
term_score = min(len(metrics['gambling_term_matches']) * 0.2, 0.4) | |
context_score = min(len(metrics['context_matches']) * 0.2, 0.4) | |
number_score = 0.1 if metrics['has_numbers'] else 0 | |
if platform_score > 0 and (term_score > 0 or context_score > 0): | |
total_score = platform_score + term_score + context_score + number_score | |
elif context_score > 0.2 and term_score > 0: | |
total_score = context_score + term_score + number_score | |
else: | |
total_score = max(platform_score, term_score, context_score) * 0.8 | |
metrics['confidence_score'] = min(total_score, 1.0) | |
if ("berkah" in normalized_text or "berkah" in aggressive_text) and \ | |
("rezeki" in normalized_text or "rezeki" in aggressive_text) and \ | |
metrics['platform_matches']: | |
metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) | |
if "Special case: berkah+rezeki+platform" not in metrics['context_matches']: | |
metrics['context_matches'].append("Special case: berkah+rezeki+platform") | |
elif ("puji" in normalized_text or "puji" in aggressive_text) and \ | |
("syukur" in normalized_text or "syukur" in aggressive_text) and \ | |
metrics['platform_matches']: | |
metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) | |
if "Special case: puji+syukur+platform" not in metrics['context_matches']: | |
metrics['context_matches'].append("Special case: puji+syukur+platform") | |
metrics['processing_time_ms'] = (time.time() - start_time) * 1000 | |
is_gambling = metrics['confidence_score'] >= threshold | |
return is_gambling, metrics | |
def filter_comments(self, comments: list, threshold: float = 0.55) -> dict: | |
result = { | |
'gambling_comments': [], | |
'safe_comments': [], | |
'metrics': [] | |
} | |
for comment in comments: | |
is_gambling, metrics = self.is_gambling_comment(comment, threshold) | |
if is_gambling: | |
result['gambling_comments'].append(comment) | |
else: | |
result['safe_comments'].append(comment) | |
metrics["original_text"] = comment | |
result["metrics"].append(metrics) | |
return result | |
# --- FastAPI application setup --- | |
app = FastAPI() | |
templates = Jinja2Templates(directory="templates") | |
# Create a single instance of the GamblingFilter | |
filter_instance = GamblingFilter() | |
from jinja2 import Undefined | |
def pretty_json(value): | |
if isinstance(value, Undefined): | |
return "" | |
return json.dumps(value, ensure_ascii=False, indent=2) | |
templates.env.filters["pretty_json"] = pretty_json | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": None, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": None, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def classify_comment(request: Request, comment: str = Form(...)): | |
is_gambling, metrics = filter_instance.is_gambling_comment(comment) | |
result = {"is_gambling": is_gambling, "metrics": metrics} | |
print(result['metrics']) | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": result, | |
"comment": comment, | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def add_rule(request: Request, rule_type: str = Form(...), rule_value: str = Form(...)): | |
try: | |
filter_instance.add_rule(rule_type, rule_value) | |
message = f"Added rule '{rule_value}' as type '{rule_type}'." | |
except ValueError as e: | |
message = str(e) | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": message}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def upload_file(request: Request, file: UploadFile = File(...), column: str = Form("comment")): | |
content = await file.read() | |
try: | |
if file.filename.endswith('.csv'): | |
df = pd.read_csv(io.BytesIO(content)) | |
elif file.filename.endswith(('.xls', '.xlsx')): | |
df = pd.read_excel(io.BytesIO(content)) | |
else: | |
raise ValueError("Unsupported file type.") | |
except Exception as e: | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": f"Error reading file: {e}"}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
if column not in df.columns: | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": f"Column '{column}' not found. Available columns: {list(df.columns)}"}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
comments = df[column].astype(str).tolist() | |
results = filter_instance.filter_comments(comments) | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"upload_result": results}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def add_visual_char(request: Request, char: str = Form(...), ascii_equiv: str = Form(...)): | |
VISUAL_MAP[char] = ascii_equiv | |
message = f"Added visual map entry '{char}' -> '{ascii_equiv}'." | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": message}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) | |