Spaces:
Build error
Build error
#main.py | |
import re | |
import time | |
import os | |
import logging | |
from typing import List, Dict, Optional, Set, Tuple | |
import google_auth_oauthlib.flow | |
import googleapiclient.discovery | |
import googleapiclient.errors | |
from google_auth_oauthlib.flow import Flow | |
from google.oauth2.credentials import Credentials | |
from googleapiclient.discovery import build | |
from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends | |
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.security import OAuth2PasswordBearer | |
from google.oauth2.credentials import Credentials | |
from pydantic import BaseModel | |
import unicodedata | |
import unidecode | |
import io | |
import pandas as pd | |
import json | |
from dotenv import load_dotenv | |
# For monitoring with Prometheus | |
load_dotenv() | |
# Configure logging at the top of the file | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
VISUAL_MAP = { | |
'А': 'A','В': 'B','С': 'C','Е': 'E','Н': 'H','К': 'K','М': 'M','О': 'O','Р': 'P','Т': 'T','Х': 'X', | |
'а': 'a','в': 'b','с': 'c','е': 'e','о': 'o','р': 'p','х': 'x','у': 'y', | |
'Я': 'R','я': 'r', | |
'ρ': 'p', | |
'Π': 'P', | |
# etc... | |
} | |
# At the top of your main.py, after your imports: | |
# In a real DB model, you'd do this in a table. | |
# But for demonstration, let's store it in memory: | |
manual_overrides = {} | |
# This might be a class-level dict keyed by comment_id or (video_id, comment_id) | |
from google.oauth2 import service_account | |
def get_google_credentials(): | |
if os.getenv("HF_SPACE") == "true": | |
# In Hugging Face Spaces: load from secrets | |
service_account_str = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON") | |
if not service_account_str: | |
raise RuntimeError("Missing GOOGLE_SERVICE_ACCOUNT_JSON in Hugging Face secret.") | |
service_account_info = json.loads(service_account_str) | |
credentials = service_account.Credentials.from_service_account_info(service_account_info) | |
# Attach the service account info so we can retrieve it later | |
credentials._sa_info = service_account_info | |
return credentials | |
else: | |
# Local development: use OAuth flow | |
flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( | |
"./app/client_secret.json", | |
scopes=[ | |
"https://www.googleapis.com/auth/youtube.readonly", | |
"https://www.googleapis.com/auth/youtube.force-ssl" | |
], | |
redirect_uri=os.getenv('YOUTUBE_REDIRECT_URI') | |
) | |
return flow.run_local_server(port=0) | |
def keep_comment(comment_id: str, video_id: str): | |
# Mark this comment as manually kept | |
manual_overrides[(video_id, comment_id)] = "safe" | |
# --- GamblingFilter class (with rule updates) --- | |
class GamblingFilter: | |
""" | |
A high-performance filter for detecting online gambling-related comments. | |
Features include aggressive Unicode normalization, keyword matching, and pattern detection. | |
""" | |
def __init__(self): | |
logger.info("Initializing GamblingFilter") | |
self._platform_names: Set[str] = { | |
'agustoto', 'aero', 'aero88', 'dora', 'dora77', 'dewadora', 'pulau777', 'pulau', '777', | |
'jptogel', 'mandalika', 'cnd88', 'axl', 'berkah99', 'weton88', 'garuda', 'hoki' | |
} | |
self._gambling_terms: Set[str] = { | |
'jackpot', 'jp', 'wd', 'depo', 'cuan', 'gacor', 'gacir', 'jekpot', 'sultan', | |
'rezeki nomplok', 'rezeki', 'menang', 'nomplok', 'deposit', 'withdraw', 'maxwin', | |
'auto sultan', 'jepe', 'jepee', 'bikin nagih', 'berkah' | |
} | |
self._ambiguous_terms: Set[str] = { | |
'auto', 'main', 'bermain', 'hasil', 'dapat', 'dapet', 'berkat' | |
} | |
self._safe_indicators: Set[str] = { | |
'tidak mengandung', 'bukan perjudian', 'tanpa perjudian', | |
'dokumentasi', 'profesional', 'pembelajaran' | |
} | |
self._gambling_contexts: List[str] = [ | |
r'(main|bermain|coba).{1,30}(dapat|dapet|pro|jadi|langsung|menang|jp|cuan)', | |
r'(modal|depo).{1,30}(jadi|langsung|wd|cuan)', | |
r'(jp|jackpot|jekpot).{1,30}(gede|besar|pecah)', | |
r'(berkat|dari).{1,30}(rezeki|menang|cuan|sultan)', | |
r'(gacor|gacir).{1,30}(terus|parah|tiap|hari)', | |
r'(rezeki|cuan).{1,30}(nomplok|datang|mengalir|lancar)', | |
r'(hari ini).{1,30}(menang|cuan|rezeki|berkat)', | |
r'(malah|eh).{1,30}(jadi|dapat|dapet|rezeki)', | |
r'(auto).{1,30}(sultan|cuan|rezeki|kaya)', | |
r'(0\d:[0-5]\d).{1,30}(menang|rezeki|cuan|gacor)', | |
r'(iseng|coba).{1,30}(malah|jadi|eh|pro)', | |
r'(deposit|depo|wd).{1,30}(jadi|langsung)', | |
r'(langsung|auto).{1,30}(jp|cuan|sultan|rezeki)', | |
r'bikin\s+nagih', | |
r'gak\s+ada\s+duanya', | |
r'berkah.{0,20}rezeki', | |
r'puji\s+syukur' | |
] | |
self._compiled_gambling_contexts = [ | |
re.compile(pattern, re.IGNORECASE | re.DOTALL) | |
for pattern in self._gambling_contexts | |
] | |
self._update_platform_pattern() | |
self._number_pattern = re.compile(r'(88|777|77|99|7+)') | |
def _update_platform_pattern(self): | |
"""Recompile the platform name regex based on current _platform_names.""" | |
platform_patterns = [] | |
for platform in self._platform_names: | |
# chars = list(platform) | |
# strict = ''.join(f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]*' for c in chars[:-1]) + f'[{chars[-1].upper()}{chars[-1].lower()}]' | |
# flexible = '.*?'.join(re.escape(c) for c in chars) | |
# platform_patterns.append(f'({strict})') | |
# platform_patterns.append(f'({flexible})') | |
chars = list(platform) # e.g. ['p', 'u', 'l', 'a', 'u'] | |
# Each letter can be followed by up to 3 non-alphanumeric chars: | |
# (or fewer if you want to be more strict) | |
segments = [ | |
f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]{{0,3}}' | |
for c in chars[:-1] | |
] | |
# Then the last char without trailing non-alphanumerics | |
segments.append(f'[{chars[-1].upper()}{chars[-1].lower()}]') | |
strict = ''.join(segments) | |
platform_patterns.append(strict) | |
self._platform_pattern = re.compile('|'.join(platform_patterns), re.DOTALL) | |
def add_rule(self, rule_type: str, rule_value: str): | |
""" | |
Add a new rule based on the rule type. | |
Allowed types: 'platform', 'gambling_term', 'safe_indicator', 'gambling_context', 'ambiguous_term' | |
""" | |
rule_type = rule_type.lower() | |
if rule_type == 'platform': | |
self._platform_names.add(rule_value) | |
self._update_platform_pattern() | |
elif rule_type == 'gambling_term': | |
self._gambling_terms.add(rule_value) | |
elif rule_type == 'safe_indicator': | |
self._safe_indicators.add(rule_value) | |
elif rule_type == 'gambling_context': | |
self._gambling_contexts.append(rule_value) | |
self._compiled_gambling_contexts.append(re.compile(rule_value, re.IGNORECASE | re.DOTALL)) | |
elif rule_type == 'ambiguous_term': | |
self._ambiguous_terms.add(rule_value) | |
else: | |
raise ValueError("Unsupported rule type") | |
def _strip_all_formatting(self, text: str) -> str: | |
result = [] | |
for c in text: | |
if c.isalnum() or c.isspace(): | |
result.append(c.lower()) | |
return ''.join(result) | |
def _aggressive_normalize_text(self, text: str) -> str: | |
normalized = unicodedata.normalize('NFKD', text) | |
ascii_text = ''.join(c for c in normalized if ord(c) < 128) | |
return ascii_text.lower() | |
def _robust_normalize(self, text: str) -> str: | |
""" | |
1) Replace visually-similar letters (Cyrillic/Greek) with Latin equivalents. | |
2) Then use unidecode to handle bold/italic forms, fullwidth, etc. | |
3) Lowercase the result. | |
""" | |
# Step 1: custom pass for visual lookalikes | |
mapped_chars = [] | |
for ch in text: | |
if ch in VISUAL_MAP: | |
mapped_chars.append(VISUAL_MAP[ch]) | |
else: | |
mapped_chars.append(ch) | |
mapped_text = ''.join(mapped_chars) | |
# Step 2: apply normal Unicode decomposition + unidecode | |
# This handles bold/italic/mathematical letters, fullwidth forms, etc. | |
decomposed = unicodedata.normalize('NFKD', mapped_text) | |
ascii_equiv = unidecode.unidecode(decomposed) | |
# Step 3: lowercase the result | |
return ascii_equiv.lower() | |
def _extract_platform_names(self, text: str) -> List[str]: | |
matches = [] | |
pattern_matches = self._platform_pattern.findall(text) | |
if pattern_matches: | |
pattern_matches = [m for sublist in pattern_matches for m in sublist if m] | |
matches.extend(pattern_matches) | |
normalized = self._robust_normalize(text) | |
stripped = self._strip_all_formatting(text) | |
for platform in self._platform_names: | |
if platform in normalized or platform in stripped: | |
if not any(platform in m.lower() for m in matches): | |
matches.append(platform) | |
if '88' in text or '88' in normalized: | |
if not any('88' in m for m in matches): | |
matches.append('88') | |
if '777' in text or '777' in normalized: | |
if not any('777' in m for m in matches): | |
matches.append('777') | |
return matches | |
def normalize_text(self, text: str) -> str: | |
normalized = unicodedata.normalize('NFKD', text) | |
normalized = ''.join(c for c in normalized if ord(c) < 128 or c.isspace()) | |
return normalized.lower() | |
def is_gambling_comment(self, text: str, threshold: float = 0.55) -> Tuple[bool, Dict]: | |
start_time = time.time() | |
logger.info(f"Analyzing comment for gambling content: {text[:100]}...") | |
metrics = { | |
'platform_matches': [], | |
'gambling_term_matches': [], | |
'context_matches': [], | |
'safe_indicators': [], | |
'has_numbers': False, | |
'confidence_score': 0.0, | |
'processing_time_ms': 0 | |
} | |
normalized_text = self.normalize_text(text) | |
stripped_text = self._strip_all_formatting(text) | |
aggressive_text = self._robust_normalize(text) | |
for indicator in self._safe_indicators: | |
if indicator in normalized_text.lower(): | |
metrics['safe_indicators'].append(indicator) | |
if len(metrics['safe_indicators']) > 0: | |
metrics['confidence_score'] = 0.0 | |
metrics['processing_time_ms'] = (time.time() - start_time) * 1000 | |
return False, metrics | |
platform_matches = self._extract_platform_names(text) | |
if platform_matches: | |
metrics['platform_matches'] = platform_matches | |
for term in self._gambling_terms: | |
if (term in normalized_text.lower() or | |
term in stripped_text.lower() or | |
term in aggressive_text.lower()): | |
metrics['gambling_term_matches'].append(term) | |
if self._number_pattern.search(normalized_text): | |
metrics['has_numbers'] = True | |
for pattern in self._compiled_gambling_contexts: | |
match = pattern.search(normalized_text) | |
if match: | |
metrics['context_matches'].append(match.group(0)) | |
match = pattern.search(aggressive_text) | |
if match and match.group(0) not in metrics['context_matches']: | |
metrics['context_matches'].append(match.group(0)) | |
platform_score = min(len(metrics['platform_matches']) * 1.0, 1) | |
term_score = min(len(metrics['gambling_term_matches']) * 0.2, 0.4) | |
context_score = min(len(metrics['context_matches']) * 0.2, 0.4) | |
number_score = 0.1 if metrics['has_numbers'] else 0 | |
if platform_score > 0 and (term_score > 0 or context_score > 0): | |
total_score = platform_score + term_score + context_score + number_score | |
elif context_score > 0.2 and term_score > 0: | |
total_score = context_score + term_score + number_score | |
else: | |
total_score = max(platform_score, term_score, context_score) * 0.8 | |
metrics['confidence_score'] = min(total_score, 1.0) | |
if ("berkah" in normalized_text.lower() or "berkah" in aggressive_text.lower()) and \ | |
("rezeki" in normalized_text.lower() or "rezeki" in aggressive_text.lower()) and \ | |
len(metrics['platform_matches']) > 0: | |
metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) | |
if "Special case: berkah+rezeki+platform" not in metrics['context_matches']: | |
metrics['context_matches'].append("Special case: berkah+rezeki+platform") | |
elif ("puji" in normalized_text.lower() or "puji" in aggressive_text.lower()) and \ | |
("syukur" in normalized_text.lower() or "syukur" in aggressive_text.lower()) and \ | |
len(metrics['platform_matches']) > 0: | |
metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) | |
if "Special case: puji+syukur+platform" not in metrics['context_matches']: | |
metrics['context_matches'].append("Special case: puji+syukur+platform") | |
metrics['processing_time_ms'] = (time.time() - start_time) * 1000 | |
is_gambling = metrics['confidence_score'] >= threshold | |
return is_gambling, metrics | |
def filter_comments(self, comments: List[str], threshold: float = 0.55) -> Dict[str, List]: | |
result = { | |
'gambling_comments': [], | |
'safe_comments': [], | |
'metrics': [] | |
} | |
for comment in comments: | |
is_gambling, metrics = self.is_gambling_comment(comment, threshold) | |
if is_gambling: | |
result['gambling_comments'].append(comment) | |
else: | |
result['safe_comments'].append(comment) | |
metrics['original_text'] = comment | |
result['metrics'].append(metrics) | |
return result | |
class YouTubeCommentModerator: | |
def __init__(self, | |
client_secrets_path: str = "./app/client_secret.json", | |
gambling_filter: Optional[GamblingFilter] = None): | |
""" | |
Initialize the YouTube Comment Moderator with configurable settings. | |
:param client_secrets_path: Path to OAuth 2.0 client secrets file | |
:param gambling_filter: Optional pre-configured GamblingFilter instance | |
""" | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - [%(levelname)s] %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
self.logger = logging.getLogger(__name__) | |
# OAuth configuration | |
self.client_secrets_path = client_secrets_path | |
self.scopes = [ | |
"https://www.googleapis.com/auth/youtube.readonly", | |
"https://www.googleapis.com/auth/youtube.force-ssl" | |
] | |
# Disable OAuthlib's HTTPS verification when running locally | |
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" | |
# YouTube service | |
self.youtube_service = None | |
# Gambling Filter | |
self.gambling_filter = gambling_filter or GamblingFilter() | |
def authenticate(self) -> bool: | |
""" | |
Authenticate with YouTube Data API. | |
:return: Boolean indicating successful authentication | |
""" | |
try: | |
# flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( | |
# self.client_secrets_path, self.scopes) | |
credentials = get_google_credentials() | |
self.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", credentials=credentials | |
) | |
self.logger.info("YouTube API authentication successful.") | |
return True | |
except Exception as e: | |
self.logger.error(f"Authentication failed: {e}") | |
return False | |
def moderate_video_comments(self, video_id: str, threshold: float = 0.55) -> Dict: | |
if not self.youtube_service: | |
self.logger.error("YouTube service not authenticated.") | |
return {"error": "Not authenticated"} | |
try: | |
comments = [] | |
request = self.youtube_service.commentThreads().list( | |
part="snippet", | |
videoId=video_id, | |
maxResults=100, | |
textFormat="plainText" | |
) | |
response = request.execute() | |
moderation_results = { | |
"total_comments": 0, | |
"gambling_comments": [], | |
"safe_comments": [], | |
"moderation_metrics": [] | |
} | |
while request is not None: | |
for item in response.get("items", []): | |
comment_id = item["snippet"]["topLevelComment"]["id"] | |
comment_snippet = item["snippet"]["topLevelComment"]["snippet"] | |
comment_text = comment_snippet["textDisplay"] | |
# Check for manual override first | |
if manual_overrides.get((video_id, comment_id)) == "safe": | |
# The user previously pressed "Keep" - skip the gambling filter | |
is_gambling = False | |
metrics = {"confidence_score": 0.0} | |
else: | |
# Normal path - filter it | |
is_gambling, metrics = self.gambling_filter.is_gambling_comment(comment_text, threshold) | |
comment_info = { | |
"id": comment_id, | |
"text": comment_text, | |
"author": comment_snippet["authorDisplayName"], | |
"is_gambling": is_gambling, | |
"metrics": metrics | |
} | |
moderation_results["total_comments"] += 1 | |
if is_gambling: | |
moderation_results["gambling_comments"].append(comment_info) | |
else: | |
moderation_results["safe_comments"].append(comment_info) | |
metrics["original_text"] = comment_text | |
moderation_results["moderation_metrics"].append(metrics) | |
# Handle pagination if available | |
request = self.youtube_service.commentThreads().list_next(request, response) | |
if request: | |
response = request.execute() | |
else: | |
break | |
return moderation_results | |
except Exception as e: | |
self.logger.error(f"Error moderating comments: {e}") | |
return {"error": str(e)} | |
def delete_comment(self, comment_id: str) -> bool: | |
""" | |
Delete a specific comment. | |
:param comment_id: YouTube comment ID | |
:return: Boolean indicating successful deletion | |
""" | |
try: | |
# self.youtube_service.comments().delete(id=comment_id).execute() | |
self.youtube_service.comments().setModerationStatus( | |
id=comment_id, | |
moderationStatus="rejected" | |
).execute() | |
self.logger.info(f"Comment {comment_id} deleted successfully.") | |
return True | |
except Exception as e: | |
self.logger.error(f"Failed to delete comment {comment_id}: {e}") | |
return False | |
def get_channel_videos(self, max_results: int = 50) -> List[Dict]: | |
""" | |
Retrieve videos from authenticated user's channel. | |
:param max_results: Maximum number of videos to retrieve | |
:return: List of video details | |
""" | |
if not self.youtube_service: | |
self.logger.error("YouTube service not authenticated.") | |
return [] | |
try: | |
request = self.youtube_service.search().list( | |
part="snippet", | |
channelId=self._get_channel_id(), | |
maxResults=max_results, | |
type="video" | |
) | |
response = request.execute() | |
videos = [] | |
for item in response.get("items", []): | |
video_info = { | |
"id": item["id"]["videoId"], | |
"title": item["snippet"]["title"], | |
"thumbnail": item["snippet"]["thumbnails"]["default"]["url"] | |
} | |
videos.append(video_info) | |
return videos | |
except Exception as e: | |
self.logger.error(f"Error retrieving videos: {e}") | |
return [] | |
def _get_channel_id(self) -> Optional[str]: | |
""" | |
Retrieve the authenticated user's channel ID. | |
:return: Channel ID or None | |
""" | |
try: | |
request = self.youtube_service.channels().list(part="id", mine=True) | |
response = request.execute() | |
return response["items"][0]["id"] | |
except Exception as e: | |
self.logger.error(f"Error retrieving channel ID: {e}") | |
return None | |
class User(BaseModel): | |
username: str | |
email: Optional[str] = None | |
youtube_credentials: Optional[Dict] = None | |
class UserDatabase: | |
""" | |
In-memory user database. In a production app, | |
replace with a proper database like SQLAlchemy | |
""" | |
users = {} | |
def create_user(cls, username: str, credentials: Dict): | |
user = User(username=username, youtube_credentials=credentials) | |
cls.users[username] = user | |
return user | |
def get_user(cls, username: str): | |
return cls.users.get(username) | |
class YouTubeAuthenticator: | |
def authenticate_with_client_secrets(client_secrets_file=None): | |
try: | |
credentials = get_google_credentials() | |
return credentials | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}") | |
# --- FastAPI application setup --- | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
templates = Jinja2Templates(directory="templates") | |
# Create a single instance of the GamblingFilter | |
filter_instance = GamblingFilter() | |
# ----Google ---- | |
class GoogleOAuthHandler: | |
def __init__(self): | |
# Configuration paths and settings | |
self.client_secrets_file = "./app/client_secret.json" | |
self.scopes = [ | |
'https://www.googleapis.com/auth/youtube.readonly', | |
'https://www.googleapis.com/auth/userinfo.profile' | |
] | |
self.redirect_uri = os.getenv('YOUTUBE_REDIRECT_URI', 'http://localhost:8000/oauth/callback') | |
def create_oauth_flow(self): | |
""" | |
Create OAuth 2.0 Flow for Google Authorization | |
""" | |
flow = Flow.from_client_secrets_file( | |
self.client_secrets_file, | |
scopes=self.scopes, | |
redirect_uri=self.redirect_uri | |
) | |
return flow | |
def initiate_oauth_flow(self): | |
""" | |
Generate Authorization URL for OAuth Flow | |
This method can be called when you want to start the OAuth authentication process. | |
In your case, it would be triggered from the login route. | |
""" | |
flow = self.create_oauth_flow() | |
# Generate authorization URL | |
authorization_url, state = flow.authorization_url( | |
access_type='offline', # Ensures we get a refresh token | |
prompt='consent', # Forces user to see and accept consent screen | |
include_granted_scopes='true' | |
) | |
return authorization_url | |
def handle_oauth_callback(self, authorization_code): | |
""" | |
Handle the OAuth callback and retrieve user credentials | |
This method exchanges the authorization code for access and refresh tokens | |
""" | |
try: | |
# Create flow and exchange authorization code for tokens | |
flow = self.create_oauth_flow() | |
flow.fetch_token(code=authorization_code) | |
# Get credentials | |
credentials = flow.credentials | |
# Fetch user information | |
oauth2_client = build('oauth2', 'v2', credentials=credentials) | |
user_info = oauth2_client.userinfo().get().execute() | |
# Build YouTube service to get channel details | |
youtube_service = build('youtube', 'v3', credentials=credentials) | |
channel_req = youtube_service.channels().list(part="snippet", mine=True) | |
channel_resp = channel_req.execute() | |
# Extract channel username or use email as fallback | |
if "items" in channel_resp and len(channel_resp["items"]) > 0: | |
channel_username = channel_resp['items'][0]['snippet']['title'] | |
else: | |
channel_username = user_info.get('email', 'unknown_user') | |
# Convert credentials to dict for storage | |
credentials_dict = { | |
'token': credentials.token, | |
'refresh_token': credentials.refresh_token, | |
'token_uri': credentials.token_uri, | |
'client_id': credentials.client_id, | |
'client_secret': credentials.client_secret, | |
'scopes': credentials.scopes | |
} | |
return { | |
'username': channel_username, | |
'credentials': credentials_dict, | |
'user_info': user_info | |
} | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"OAuth callback failed: {str(e)}") | |
moderator = YouTubeCommentModerator(gambling_filter=filter_instance) | |
async def moderate_video(request: Request, video_id: str = Form(...), threshold: float = Form(0.55)): | |
if not moderator.youtube_service: | |
result = {"error": "YouTube service not authenticated. Please authenticate first."} | |
else: | |
result = moderator.moderate_video_comments(video_id, threshold) | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": result, | |
"video_id": video_id, | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def api_delete_comment( | |
request: Request, | |
comment_id: str, | |
video_id: str | |
): | |
current_user = get_current_user_from_cookie(request) | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
success = user_moderator.delete_comment(comment_id) | |
return {"success": success} | |
# OAuth2 Password Bearer for session management | |
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
# Hardcoded client secrets path (you'll need to replace this with your actual path) | |
CLIENT_SECRETS_PATH = "./app/client_secret.json" | |
# 1) Root route => Decide if user is logged in; if not, go to /login | |
async def root_redirect(request: Request): | |
token = request.cookies.get("token") | |
if token: | |
return RedirectResponse(url="/videos", status_code=303) | |
else: | |
return RedirectResponse(url="/login", status_code=303) | |
# 2) Show the login form (GET /login) | |
async def login_form(request: Request): | |
return templates.TemplateResponse("login.html", {"request": request}) | |
# 3) Handle login submission (POST /login) => Google OAuth => /videos | |
async def login( | |
request: Request, | |
username: str = Form(None) # Make username optional | |
): | |
try: | |
# Get credentials (will return service account creds on HF, OAuth creds locally) | |
credentials = get_google_credentials() | |
youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", credentials=credentials | |
) | |
# If running in Hugging Face Space, use a default username | |
if os.getenv("HF_SPACE") == "true": | |
channel_username = "hf_space_user" | |
else: | |
req = youtube_service.channels().list(part="snippet", mine=True) | |
resp = req.execute() | |
if "items" in resp and len(resp["items"]) > 0: | |
channel_username = resp['items'][0]['snippet']['title'] | |
else: | |
channel_username = "unknown_user" | |
# Convert credentials to dict for storage | |
import json | |
if hasattr(credentials, "to_json"): | |
credentials_dict = json.loads(credentials.to_json()) | |
elif hasattr(credentials, "_sa_info"): | |
credentials_dict = credentials._sa_info | |
else: | |
credentials_dict = {} | |
# Create or update user in our "database" | |
user = UserDatabase.create_user(channel_username, credentials_dict) | |
# Determine cookie settings based on environment | |
if os.getenv("HF_SPACE") == "true": | |
secure_cookie = True | |
samesite_value = "none" | |
else: | |
secure_cookie = False | |
samesite_value = "lax" | |
# Set the user token in a cookie and redirect to /videos | |
response = RedirectResponse(url="/videos", status_code=303) | |
response.set_cookie( | |
key="token", | |
value=channel_username, | |
max_age=1800, | |
httponly=True, | |
secure=secure_cookie, | |
samesite=samesite_value | |
) | |
return response | |
except Exception as e: | |
return templates.TemplateResponse("login.html", { | |
"request": request, | |
"error": f"Authentication failed: {str(e)}" | |
}) | |
async def api_keep_comment( | |
request: Request, | |
comment_id: str, | |
video_id: str | |
): | |
try: | |
logging.debug(f"Received keep request for comment_id: {comment_id}, video_id: {video_id}") | |
# Get current user's credentials | |
current_user = get_current_user_from_cookie(request) | |
logging.debug(f"Current user: {current_user.username}") | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
# Create a moderator instance with user credentials | |
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
logging.debug("Setting moderation status to 'published' on YouTube...") | |
# Mark comment as approved on YouTube | |
result = user_moderator.youtube_service.comments().setModerationStatus( | |
id=comment_id, | |
moderationStatus="published" # This marks the comment as approved | |
).execute() | |
logging.debug(f"YouTube API response: {result}") | |
# Add the comment ID to the manual overrides so it won't be reflagged | |
keep_comment(comment_id, video_id) # Ensure this function is defined and working | |
logging.debug("Manual override saved for comment.") | |
return {"success": True, "message": "Comment kept successfully"} | |
except Exception as e: | |
logging.error(f"Error keeping comment: {e}", exc_info=True) | |
return {"success": False, "error": str(e)} | |
async def refresh_video_comments( | |
request: Request, | |
video_id: str, | |
threshold: float = 0.55 | |
): | |
""" | |
Refresh comments for a specific video, reapplying moderation. | |
:param request: Request object | |
:param video_id: ID of the video to refresh comments for | |
:param threshold: Gambling confidence threshold | |
:return: Rendered template with updated comments | |
""" | |
# Get current user's credentials | |
current_user = get_current_user_from_cookie(request) | |
if not current_user: | |
return RedirectResponse(url="/login", status_code=303) | |
try: | |
# Recreate moderator with current user's credentials | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
moderator = YouTubeCommentModerator(gambling_filter=filter_instance) | |
moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
# Moderate comments for the video | |
result = moderator.moderate_video_comments(video_id, threshold) | |
# Fetch video details to pass to template | |
youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
video_request = youtube_service.videos().list( | |
part="snippet", | |
id=video_id | |
) | |
video_response = video_request.execute() | |
video_info = video_response['items'][0]['snippet'] if video_response['items'] else {} | |
return templates.TemplateResponse("video_comments.html", { | |
"request": request, | |
"video": { | |
"id": video_id, | |
"title": video_info.get('title', 'Unknown Video') | |
}, | |
"safe_comments": result.get('safe_comments', []), | |
"flagged_comments": result.get('gambling_comments', []), | |
"total_comments": result.get('total_comments', 0) | |
}) | |
except Exception as e: | |
logging.error(f"Error refreshing comments: {e}") | |
return templates.TemplateResponse("error.html", { | |
"request": request, | |
"error": f"Failed to refresh comments: {str(e)}" | |
}) | |
# 4) Protected route to fetch current user from cookie | |
def get_current_user(token: str = Depends(oauth2_scheme)): | |
username = token # In a real app, decode/validate token properly | |
user = UserDatabase.get_user(username) | |
if not user: | |
raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
return user | |
def get_current_user_from_cookie(request: Request): | |
token = request.cookies.get("token") | |
if not token: | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
user = UserDatabase.get_user(token) | |
if not user: | |
raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
return user | |
async def auth_exception_handler(request: Request, exc: HTTPException): | |
if exc.status_code == 401: | |
# Redirect the user to the login page | |
return RedirectResponse(url="/login") | |
# For other HTTP errors, return a JSON response | |
return JSONResponse( | |
status_code=exc.status_code, | |
content={"detail": exc.detail}, | |
) | |
# 5) List user's videos (GET /videos) - requires login | |
async def list_videos(request: Request, current_user: User = Depends(get_current_user_from_cookie)): | |
# Reconstruct the credentials from the stored dictionary | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", credentials=user_creds | |
) | |
videos = user_moderator.get_channel_videos() | |
return templates.TemplateResponse("videos.html", { | |
"request": request, | |
"current_user": current_user, | |
"videos": videos | |
}) | |
# 6) Moderate a specific video's comments (GET /video/{video_id}) - requires login | |
async def moderate_video_comments( | |
request: Request, | |
video_id: str, | |
current_user: User = Depends(get_current_user_from_cookie) | |
): | |
# Reconstruct the Credentials object from the stored dict | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
moderation_results = user_moderator.moderate_video_comments(video_id) | |
return templates.TemplateResponse("video_comments.html", { | |
"request": request, | |
"current_user": current_user, | |
"video": {"id": video_id, "title": "Sample Video Title"}, # Optionally fetch actual title | |
"safe_comments": moderation_results.get('safe_comments', []), | |
"flagged_comments": moderation_results.get('gambling_comments', []) | |
}) | |
# 7) Logout => remove token | |
async def logout(): | |
response = RedirectResponse(url="/login") | |
response.delete_cookie("token") | |
return response | |
from jinja2 import Undefined | |
import json | |
def pretty_json(value): | |
if isinstance(value, Undefined): | |
return "" | |
return json.dumps(value, ensure_ascii=False, indent=2) | |
templates.env.filters["pretty_json"] = pretty_json | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": None, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def classify_comment(request: Request, comment: str = Form(...)): | |
is_gambling, metrics = filter_instance.is_gambling_comment(comment) | |
result = {"is_gambling": is_gambling, "metrics": metrics} | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": result, | |
"comment": comment, | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def add_rule(request: Request, rule_type: str = Form(...), rule_value: str = Form(...)): | |
try: | |
filter_instance.add_rule(rule_type, rule_value) | |
message = f"Added rule '{rule_value}' as type '{rule_type}'." | |
except ValueError as e: | |
message = str(e) | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": message}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def upload_file(request: Request, file: UploadFile = File(...), column: str = Form("comment")): | |
content = await file.read() | |
try: | |
if file.filename.endswith('.csv'): | |
df = pd.read_csv(io.BytesIO(content)) | |
elif file.filename.endswith('.xls') or file.filename.endswith('.xlsx'): | |
df = pd.read_excel(io.BytesIO(content)) | |
else: | |
raise ValueError("Unsupported file type.") | |
except Exception as e: | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": f"Error reading file: {e}"}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
if column not in df.columns: | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": f"Column '{column}' not found. Available columns: {list(df.columns)}"}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
comments = df[column].astype(str).tolist() | |
results = filter_instance.filter_comments(comments) | |
# Return the results as part of the template context. | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"upload_result": results}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def add_visual_char(request: Request, | |
char: str = Form(...), | |
ascii_equiv: str = Form(...)): | |
# Add a new mapping | |
VISUAL_MAP[char] = ascii_equiv | |
message = f"Added visual map entry '{char}' -> '{ascii_equiv}'." | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": {"message": message}, | |
"comment": "", | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |