ariansyahdedy's picture
fix gitignore
2ee07d8
raw
history blame contribute delete
44.5 kB
#main.py
import re
import time
import os
import logging
from typing import List, Dict, Optional, Set, Tuple
import google_auth_oauthlib.flow
import googleapiclient.discovery
import googleapiclient.errors
from google_auth_oauthlib.flow import Flow
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordBearer
from google.oauth2.credentials import Credentials
from pydantic import BaseModel
import unicodedata
import unidecode
import io
import pandas as pd
import json
from dotenv import load_dotenv
# For monitoring with Prometheus
load_dotenv()
# Configure logging at the top of the file
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
VISUAL_MAP = {
'А': 'A','В': 'B','С': 'C','Е': 'E','Н': 'H','К': 'K','М': 'M','О': 'O','Р': 'P','Т': 'T','Х': 'X',
'а': 'a','в': 'b','с': 'c','е': 'e','о': 'o','р': 'p','х': 'x','у': 'y',
'Я': 'R','я': 'r',
'ρ': 'p',
'Π': 'P',
# etc...
}
# At the top of your main.py, after your imports:
# In a real DB model, you'd do this in a table.
# But for demonstration, let's store it in memory:
manual_overrides = {}
# This might be a class-level dict keyed by comment_id or (video_id, comment_id)
from google.oauth2 import service_account
def get_google_credentials():
if os.getenv("HF_SPACE") == "true":
# In Hugging Face Spaces: load from secrets
service_account_str = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON")
if not service_account_str:
raise RuntimeError("Missing GOOGLE_SERVICE_ACCOUNT_JSON in Hugging Face secret.")
service_account_info = json.loads(service_account_str)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
# Attach the service account info so we can retrieve it later
credentials._sa_info = service_account_info
return credentials
else:
# Local development: use OAuth flow
flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
"./app/client_secret.json",
scopes=[
"https://www.googleapis.com/auth/youtube.readonly",
"https://www.googleapis.com/auth/youtube.force-ssl"
],
redirect_uri=os.getenv('YOUTUBE_REDIRECT_URI')
)
return flow.run_local_server(port=0)
def keep_comment(comment_id: str, video_id: str):
# Mark this comment as manually kept
manual_overrides[(video_id, comment_id)] = "safe"
# --- GamblingFilter class (with rule updates) ---
class GamblingFilter:
"""
A high-performance filter for detecting online gambling-related comments.
Features include aggressive Unicode normalization, keyword matching, and pattern detection.
"""
def __init__(self):
logger.info("Initializing GamblingFilter")
self._platform_names: Set[str] = {
'agustoto', 'aero', 'aero88', 'dora', 'dora77', 'dewadora', 'pulau777', 'pulau', '777',
'jptogel', 'mandalika', 'cnd88', 'axl', 'berkah99', 'weton88', 'garuda', 'hoki'
}
self._gambling_terms: Set[str] = {
'jackpot', 'jp', 'wd', 'depo', 'cuan', 'gacor', 'gacir', 'jekpot', 'sultan',
'rezeki nomplok', 'rezeki', 'menang', 'nomplok', 'deposit', 'withdraw', 'maxwin',
'auto sultan', 'jepe', 'jepee', 'bikin nagih', 'berkah'
}
self._ambiguous_terms: Set[str] = {
'auto', 'main', 'bermain', 'hasil', 'dapat', 'dapet', 'berkat'
}
self._safe_indicators: Set[str] = {
'tidak mengandung', 'bukan perjudian', 'tanpa perjudian',
'dokumentasi', 'profesional', 'pembelajaran'
}
self._gambling_contexts: List[str] = [
r'(main|bermain|coba).{1,30}(dapat|dapet|pro|jadi|langsung|menang|jp|cuan)',
r'(modal|depo).{1,30}(jadi|langsung|wd|cuan)',
r'(jp|jackpot|jekpot).{1,30}(gede|besar|pecah)',
r'(berkat|dari).{1,30}(rezeki|menang|cuan|sultan)',
r'(gacor|gacir).{1,30}(terus|parah|tiap|hari)',
r'(rezeki|cuan).{1,30}(nomplok|datang|mengalir|lancar)',
r'(hari ini).{1,30}(menang|cuan|rezeki|berkat)',
r'(malah|eh).{1,30}(jadi|dapat|dapet|rezeki)',
r'(auto).{1,30}(sultan|cuan|rezeki|kaya)',
r'(0\d:[0-5]\d).{1,30}(menang|rezeki|cuan|gacor)',
r'(iseng|coba).{1,30}(malah|jadi|eh|pro)',
r'(deposit|depo|wd).{1,30}(jadi|langsung)',
r'(langsung|auto).{1,30}(jp|cuan|sultan|rezeki)',
r'bikin\s+nagih',
r'gak\s+ada\s+duanya',
r'berkah.{0,20}rezeki',
r'puji\s+syukur'
]
self._compiled_gambling_contexts = [
re.compile(pattern, re.IGNORECASE | re.DOTALL)
for pattern in self._gambling_contexts
]
self._update_platform_pattern()
self._number_pattern = re.compile(r'(88|777|77|99|7+)')
def _update_platform_pattern(self):
"""Recompile the platform name regex based on current _platform_names."""
platform_patterns = []
for platform in self._platform_names:
# chars = list(platform)
# strict = ''.join(f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]*' for c in chars[:-1]) + f'[{chars[-1].upper()}{chars[-1].lower()}]'
# flexible = '.*?'.join(re.escape(c) for c in chars)
# platform_patterns.append(f'({strict})')
# platform_patterns.append(f'({flexible})')
chars = list(platform) # e.g. ['p', 'u', 'l', 'a', 'u']
# Each letter can be followed by up to 3 non-alphanumeric chars:
# (or fewer if you want to be more strict)
segments = [
f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]{{0,3}}'
for c in chars[:-1]
]
# Then the last char without trailing non-alphanumerics
segments.append(f'[{chars[-1].upper()}{chars[-1].lower()}]')
strict = ''.join(segments)
platform_patterns.append(strict)
self._platform_pattern = re.compile('|'.join(platform_patterns), re.DOTALL)
def add_rule(self, rule_type: str, rule_value: str):
"""
Add a new rule based on the rule type.
Allowed types: 'platform', 'gambling_term', 'safe_indicator', 'gambling_context', 'ambiguous_term'
"""
rule_type = rule_type.lower()
if rule_type == 'platform':
self._platform_names.add(rule_value)
self._update_platform_pattern()
elif rule_type == 'gambling_term':
self._gambling_terms.add(rule_value)
elif rule_type == 'safe_indicator':
self._safe_indicators.add(rule_value)
elif rule_type == 'gambling_context':
self._gambling_contexts.append(rule_value)
self._compiled_gambling_contexts.append(re.compile(rule_value, re.IGNORECASE | re.DOTALL))
elif rule_type == 'ambiguous_term':
self._ambiguous_terms.add(rule_value)
else:
raise ValueError("Unsupported rule type")
def _strip_all_formatting(self, text: str) -> str:
result = []
for c in text:
if c.isalnum() or c.isspace():
result.append(c.lower())
return ''.join(result)
def _aggressive_normalize_text(self, text: str) -> str:
normalized = unicodedata.normalize('NFKD', text)
ascii_text = ''.join(c for c in normalized if ord(c) < 128)
return ascii_text.lower()
def _robust_normalize(self, text: str) -> str:
"""
1) Replace visually-similar letters (Cyrillic/Greek) with Latin equivalents.
2) Then use unidecode to handle bold/italic forms, fullwidth, etc.
3) Lowercase the result.
"""
# Step 1: custom pass for visual lookalikes
mapped_chars = []
for ch in text:
if ch in VISUAL_MAP:
mapped_chars.append(VISUAL_MAP[ch])
else:
mapped_chars.append(ch)
mapped_text = ''.join(mapped_chars)
# Step 2: apply normal Unicode decomposition + unidecode
# This handles bold/italic/mathematical letters, fullwidth forms, etc.
decomposed = unicodedata.normalize('NFKD', mapped_text)
ascii_equiv = unidecode.unidecode(decomposed)
# Step 3: lowercase the result
return ascii_equiv.lower()
def _extract_platform_names(self, text: str) -> List[str]:
matches = []
pattern_matches = self._platform_pattern.findall(text)
if pattern_matches:
pattern_matches = [m for sublist in pattern_matches for m in sublist if m]
matches.extend(pattern_matches)
normalized = self._robust_normalize(text)
stripped = self._strip_all_formatting(text)
for platform in self._platform_names:
if platform in normalized or platform in stripped:
if not any(platform in m.lower() for m in matches):
matches.append(platform)
if '88' in text or '88' in normalized:
if not any('88' in m for m in matches):
matches.append('88')
if '777' in text or '777' in normalized:
if not any('777' in m for m in matches):
matches.append('777')
return matches
def normalize_text(self, text: str) -> str:
normalized = unicodedata.normalize('NFKD', text)
normalized = ''.join(c for c in normalized if ord(c) < 128 or c.isspace())
return normalized.lower()
def is_gambling_comment(self, text: str, threshold: float = 0.55) -> Tuple[bool, Dict]:
start_time = time.time()
logger.info(f"Analyzing comment for gambling content: {text[:100]}...")
metrics = {
'platform_matches': [],
'gambling_term_matches': [],
'context_matches': [],
'safe_indicators': [],
'has_numbers': False,
'confidence_score': 0.0,
'processing_time_ms': 0
}
normalized_text = self.normalize_text(text)
stripped_text = self._strip_all_formatting(text)
aggressive_text = self._robust_normalize(text)
for indicator in self._safe_indicators:
if indicator in normalized_text.lower():
metrics['safe_indicators'].append(indicator)
if len(metrics['safe_indicators']) > 0:
metrics['confidence_score'] = 0.0
metrics['processing_time_ms'] = (time.time() - start_time) * 1000
return False, metrics
platform_matches = self._extract_platform_names(text)
if platform_matches:
metrics['platform_matches'] = platform_matches
for term in self._gambling_terms:
if (term in normalized_text.lower() or
term in stripped_text.lower() or
term in aggressive_text.lower()):
metrics['gambling_term_matches'].append(term)
if self._number_pattern.search(normalized_text):
metrics['has_numbers'] = True
for pattern in self._compiled_gambling_contexts:
match = pattern.search(normalized_text)
if match:
metrics['context_matches'].append(match.group(0))
match = pattern.search(aggressive_text)
if match and match.group(0) not in metrics['context_matches']:
metrics['context_matches'].append(match.group(0))
platform_score = min(len(metrics['platform_matches']) * 1.0, 1)
term_score = min(len(metrics['gambling_term_matches']) * 0.2, 0.4)
context_score = min(len(metrics['context_matches']) * 0.2, 0.4)
number_score = 0.1 if metrics['has_numbers'] else 0
if platform_score > 0 and (term_score > 0 or context_score > 0):
total_score = platform_score + term_score + context_score + number_score
elif context_score > 0.2 and term_score > 0:
total_score = context_score + term_score + number_score
else:
total_score = max(platform_score, term_score, context_score) * 0.8
metrics['confidence_score'] = min(total_score, 1.0)
if ("berkah" in normalized_text.lower() or "berkah" in aggressive_text.lower()) and \
("rezeki" in normalized_text.lower() or "rezeki" in aggressive_text.lower()) and \
len(metrics['platform_matches']) > 0:
metrics['confidence_score'] = max(metrics['confidence_score'], 0.7)
if "Special case: berkah+rezeki+platform" not in metrics['context_matches']:
metrics['context_matches'].append("Special case: berkah+rezeki+platform")
elif ("puji" in normalized_text.lower() or "puji" in aggressive_text.lower()) and \
("syukur" in normalized_text.lower() or "syukur" in aggressive_text.lower()) and \
len(metrics['platform_matches']) > 0:
metrics['confidence_score'] = max(metrics['confidence_score'], 0.7)
if "Special case: puji+syukur+platform" not in metrics['context_matches']:
metrics['context_matches'].append("Special case: puji+syukur+platform")
metrics['processing_time_ms'] = (time.time() - start_time) * 1000
is_gambling = metrics['confidence_score'] >= threshold
return is_gambling, metrics
def filter_comments(self, comments: List[str], threshold: float = 0.55) -> Dict[str, List]:
result = {
'gambling_comments': [],
'safe_comments': [],
'metrics': []
}
for comment in comments:
is_gambling, metrics = self.is_gambling_comment(comment, threshold)
if is_gambling:
result['gambling_comments'].append(comment)
else:
result['safe_comments'].append(comment)
metrics['original_text'] = comment
result['metrics'].append(metrics)
return result
class YouTubeCommentModerator:
def __init__(self,
client_secrets_path: str = "./app/client_secret.json",
gambling_filter: Optional[GamblingFilter] = None):
"""
Initialize the YouTube Comment Moderator with configurable settings.
:param client_secrets_path: Path to OAuth 2.0 client secrets file
:param gambling_filter: Optional pre-configured GamblingFilter instance
"""
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
# OAuth configuration
self.client_secrets_path = client_secrets_path
self.scopes = [
"https://www.googleapis.com/auth/youtube.readonly",
"https://www.googleapis.com/auth/youtube.force-ssl"
]
# Disable OAuthlib's HTTPS verification when running locally
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
# YouTube service
self.youtube_service = None
# Gambling Filter
self.gambling_filter = gambling_filter or GamblingFilter()
def authenticate(self) -> bool:
"""
Authenticate with YouTube Data API.
:return: Boolean indicating successful authentication
"""
try:
# flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file(
# self.client_secrets_path, self.scopes)
credentials = get_google_credentials()
self.youtube_service = googleapiclient.discovery.build(
"youtube", "v3", credentials=credentials
)
self.logger.info("YouTube API authentication successful.")
return True
except Exception as e:
self.logger.error(f"Authentication failed: {e}")
return False
def moderate_video_comments(self, video_id: str, threshold: float = 0.55) -> Dict:
if not self.youtube_service:
self.logger.error("YouTube service not authenticated.")
return {"error": "Not authenticated"}
try:
comments = []
request = self.youtube_service.commentThreads().list(
part="snippet",
videoId=video_id,
maxResults=100,
textFormat="plainText"
)
response = request.execute()
moderation_results = {
"total_comments": 0,
"gambling_comments": [],
"safe_comments": [],
"moderation_metrics": []
}
while request is not None:
for item in response.get("items", []):
comment_id = item["snippet"]["topLevelComment"]["id"]
comment_snippet = item["snippet"]["topLevelComment"]["snippet"]
comment_text = comment_snippet["textDisplay"]
# Check for manual override first
if manual_overrides.get((video_id, comment_id)) == "safe":
# The user previously pressed "Keep" - skip the gambling filter
is_gambling = False
metrics = {"confidence_score": 0.0}
else:
# Normal path - filter it
is_gambling, metrics = self.gambling_filter.is_gambling_comment(comment_text, threshold)
comment_info = {
"id": comment_id,
"text": comment_text,
"author": comment_snippet["authorDisplayName"],
"is_gambling": is_gambling,
"metrics": metrics
}
moderation_results["total_comments"] += 1
if is_gambling:
moderation_results["gambling_comments"].append(comment_info)
else:
moderation_results["safe_comments"].append(comment_info)
metrics["original_text"] = comment_text
moderation_results["moderation_metrics"].append(metrics)
# Handle pagination if available
request = self.youtube_service.commentThreads().list_next(request, response)
if request:
response = request.execute()
else:
break
return moderation_results
except Exception as e:
self.logger.error(f"Error moderating comments: {e}")
return {"error": str(e)}
def delete_comment(self, comment_id: str) -> bool:
"""
Delete a specific comment.
:param comment_id: YouTube comment ID
:return: Boolean indicating successful deletion
"""
try:
# self.youtube_service.comments().delete(id=comment_id).execute()
self.youtube_service.comments().setModerationStatus(
id=comment_id,
moderationStatus="rejected"
).execute()
self.logger.info(f"Comment {comment_id} deleted successfully.")
return True
except Exception as e:
self.logger.error(f"Failed to delete comment {comment_id}: {e}")
return False
def get_channel_videos(self, max_results: int = 50) -> List[Dict]:
"""
Retrieve videos from authenticated user's channel.
:param max_results: Maximum number of videos to retrieve
:return: List of video details
"""
if not self.youtube_service:
self.logger.error("YouTube service not authenticated.")
return []
try:
request = self.youtube_service.search().list(
part="snippet",
channelId=self._get_channel_id(),
maxResults=max_results,
type="video"
)
response = request.execute()
videos = []
for item in response.get("items", []):
video_info = {
"id": item["id"]["videoId"],
"title": item["snippet"]["title"],
"thumbnail": item["snippet"]["thumbnails"]["default"]["url"]
}
videos.append(video_info)
return videos
except Exception as e:
self.logger.error(f"Error retrieving videos: {e}")
return []
def _get_channel_id(self) -> Optional[str]:
"""
Retrieve the authenticated user's channel ID.
:return: Channel ID or None
"""
try:
request = self.youtube_service.channels().list(part="id", mine=True)
response = request.execute()
return response["items"][0]["id"]
except Exception as e:
self.logger.error(f"Error retrieving channel ID: {e}")
return None
class User(BaseModel):
username: str
email: Optional[str] = None
youtube_credentials: Optional[Dict] = None
class UserDatabase:
"""
In-memory user database. In a production app,
replace with a proper database like SQLAlchemy
"""
users = {}
@classmethod
def create_user(cls, username: str, credentials: Dict):
user = User(username=username, youtube_credentials=credentials)
cls.users[username] = user
return user
@classmethod
def get_user(cls, username: str):
return cls.users.get(username)
class YouTubeAuthenticator:
@staticmethod
def authenticate_with_client_secrets(client_secrets_file=None):
try:
credentials = get_google_credentials()
return credentials
except Exception as e:
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}")
# --- FastAPI application setup ---
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# Create a single instance of the GamblingFilter
filter_instance = GamblingFilter()
# ----Google ----
class GoogleOAuthHandler:
def __init__(self):
# Configuration paths and settings
self.client_secrets_file = "./app/client_secret.json"
self.scopes = [
'https://www.googleapis.com/auth/youtube.readonly',
'https://www.googleapis.com/auth/userinfo.profile'
]
self.redirect_uri = os.getenv('YOUTUBE_REDIRECT_URI', 'http://localhost:8000/oauth/callback')
def create_oauth_flow(self):
"""
Create OAuth 2.0 Flow for Google Authorization
"""
flow = Flow.from_client_secrets_file(
self.client_secrets_file,
scopes=self.scopes,
redirect_uri=self.redirect_uri
)
return flow
def initiate_oauth_flow(self):
"""
Generate Authorization URL for OAuth Flow
This method can be called when you want to start the OAuth authentication process.
In your case, it would be triggered from the login route.
"""
flow = self.create_oauth_flow()
# Generate authorization URL
authorization_url, state = flow.authorization_url(
access_type='offline', # Ensures we get a refresh token
prompt='consent', # Forces user to see and accept consent screen
include_granted_scopes='true'
)
return authorization_url
def handle_oauth_callback(self, authorization_code):
"""
Handle the OAuth callback and retrieve user credentials
This method exchanges the authorization code for access and refresh tokens
"""
try:
# Create flow and exchange authorization code for tokens
flow = self.create_oauth_flow()
flow.fetch_token(code=authorization_code)
# Get credentials
credentials = flow.credentials
# Fetch user information
oauth2_client = build('oauth2', 'v2', credentials=credentials)
user_info = oauth2_client.userinfo().get().execute()
# Build YouTube service to get channel details
youtube_service = build('youtube', 'v3', credentials=credentials)
channel_req = youtube_service.channels().list(part="snippet", mine=True)
channel_resp = channel_req.execute()
# Extract channel username or use email as fallback
if "items" in channel_resp and len(channel_resp["items"]) > 0:
channel_username = channel_resp['items'][0]['snippet']['title']
else:
channel_username = user_info.get('email', 'unknown_user')
# Convert credentials to dict for storage
credentials_dict = {
'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes
}
return {
'username': channel_username,
'credentials': credentials_dict,
'user_info': user_info
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"OAuth callback failed: {str(e)}")
moderator = YouTubeCommentModerator(gambling_filter=filter_instance)
@app.post("/moderate_video")
async def moderate_video(request: Request, video_id: str = Form(...), threshold: float = Form(0.55)):
if not moderator.youtube_service:
result = {"error": "YouTube service not authenticated. Please authenticate first."}
else:
result = moderator.moderate_video_comments(video_id, threshold)
return templates.TemplateResponse("index.html", {
"request": request,
"result": result,
"video_id": video_id,
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
@app.delete("/api/comments/{comment_id}")
async def api_delete_comment(
request: Request,
comment_id: str,
video_id: str
):
current_user = get_current_user_from_cookie(request)
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials)
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter())
user_moderator.youtube_service = googleapiclient.discovery.build(
"youtube", "v3",
credentials=user_creds
)
success = user_moderator.delete_comment(comment_id)
return {"success": success}
# OAuth2 Password Bearer for session management
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Hardcoded client secrets path (you'll need to replace this with your actual path)
CLIENT_SECRETS_PATH = "./app/client_secret.json"
# 1) Root route => Decide if user is logged in; if not, go to /login
@app.get("/", response_class=HTMLResponse)
async def root_redirect(request: Request):
token = request.cookies.get("token")
if token:
return RedirectResponse(url="/videos", status_code=303)
else:
return RedirectResponse(url="/login", status_code=303)
# 2) Show the login form (GET /login)
@app.get("/login", response_class=HTMLResponse)
async def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
# 3) Handle login submission (POST /login) => Google OAuth => /videos
@app.post("/login", response_class=HTMLResponse)
async def login(
request: Request,
username: str = Form(None) # Make username optional
):
try:
# Get credentials (will return service account creds on HF, OAuth creds locally)
credentials = get_google_credentials()
youtube_service = googleapiclient.discovery.build(
"youtube", "v3", credentials=credentials
)
# If running in Hugging Face Space, use a default username
if os.getenv("HF_SPACE") == "true":
channel_username = "hf_space_user"
else:
req = youtube_service.channels().list(part="snippet", mine=True)
resp = req.execute()
if "items" in resp and len(resp["items"]) > 0:
channel_username = resp['items'][0]['snippet']['title']
else:
channel_username = "unknown_user"
# Convert credentials to dict for storage
import json
if hasattr(credentials, "to_json"):
credentials_dict = json.loads(credentials.to_json())
elif hasattr(credentials, "_sa_info"):
credentials_dict = credentials._sa_info
else:
credentials_dict = {}
# Create or update user in our "database"
user = UserDatabase.create_user(channel_username, credentials_dict)
# Determine cookie settings based on environment
if os.getenv("HF_SPACE") == "true":
secure_cookie = True
samesite_value = "none"
else:
secure_cookie = False
samesite_value = "lax"
# Set the user token in a cookie and redirect to /videos
response = RedirectResponse(url="/videos", status_code=303)
response.set_cookie(
key="token",
value=channel_username,
max_age=1800,
httponly=True,
secure=secure_cookie,
samesite=samesite_value
)
return response
except Exception as e:
return templates.TemplateResponse("login.html", {
"request": request,
"error": f"Authentication failed: {str(e)}"
})
@app.post("/api/comments/keep/{comment_id}")
async def api_keep_comment(
request: Request,
comment_id: str,
video_id: str
):
try:
logging.debug(f"Received keep request for comment_id: {comment_id}, video_id: {video_id}")
# Get current user's credentials
current_user = get_current_user_from_cookie(request)
logging.debug(f"Current user: {current_user.username}")
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials)
# Create a moderator instance with user credentials
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter())
user_moderator.youtube_service = googleapiclient.discovery.build(
"youtube", "v3",
credentials=user_creds
)
logging.debug("Setting moderation status to 'published' on YouTube...")
# Mark comment as approved on YouTube
result = user_moderator.youtube_service.comments().setModerationStatus(
id=comment_id,
moderationStatus="published" # This marks the comment as approved
).execute()
logging.debug(f"YouTube API response: {result}")
# Add the comment ID to the manual overrides so it won't be reflagged
keep_comment(comment_id, video_id) # Ensure this function is defined and working
logging.debug("Manual override saved for comment.")
return {"success": True, "message": "Comment kept successfully"}
except Exception as e:
logging.error(f"Error keeping comment: {e}", exc_info=True)
return {"success": False, "error": str(e)}
@app.get("/refresh_comments/{video_id}")
async def refresh_video_comments(
request: Request,
video_id: str,
threshold: float = 0.55
):
"""
Refresh comments for a specific video, reapplying moderation.
:param request: Request object
:param video_id: ID of the video to refresh comments for
:param threshold: Gambling confidence threshold
:return: Rendered template with updated comments
"""
# Get current user's credentials
current_user = get_current_user_from_cookie(request)
if not current_user:
return RedirectResponse(url="/login", status_code=303)
try:
# Recreate moderator with current user's credentials
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials)
moderator = YouTubeCommentModerator(gambling_filter=filter_instance)
moderator.youtube_service = googleapiclient.discovery.build(
"youtube", "v3",
credentials=user_creds
)
# Moderate comments for the video
result = moderator.moderate_video_comments(video_id, threshold)
# Fetch video details to pass to template
youtube_service = googleapiclient.discovery.build(
"youtube", "v3",
credentials=user_creds
)
video_request = youtube_service.videos().list(
part="snippet",
id=video_id
)
video_response = video_request.execute()
video_info = video_response['items'][0]['snippet'] if video_response['items'] else {}
return templates.TemplateResponse("video_comments.html", {
"request": request,
"video": {
"id": video_id,
"title": video_info.get('title', 'Unknown Video')
},
"safe_comments": result.get('safe_comments', []),
"flagged_comments": result.get('gambling_comments', []),
"total_comments": result.get('total_comments', 0)
})
except Exception as e:
logging.error(f"Error refreshing comments: {e}")
return templates.TemplateResponse("error.html", {
"request": request,
"error": f"Failed to refresh comments: {str(e)}"
})
# 4) Protected route to fetch current user from cookie
def get_current_user(token: str = Depends(oauth2_scheme)):
username = token # In a real app, decode/validate token properly
user = UserDatabase.get_user(username)
if not user:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return user
def get_current_user_from_cookie(request: Request):
token = request.cookies.get("token")
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
user = UserDatabase.get_user(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return user
@app.exception_handler(HTTPException)
async def auth_exception_handler(request: Request, exc: HTTPException):
if exc.status_code == 401:
# Redirect the user to the login page
return RedirectResponse(url="/login")
# For other HTTP errors, return a JSON response
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
# 5) List user's videos (GET /videos) - requires login
@app.get("/videos", response_class=HTMLResponse)
async def list_videos(request: Request, current_user: User = Depends(get_current_user_from_cookie)):
# Reconstruct the credentials from the stored dictionary
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials)
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter())
user_moderator.youtube_service = googleapiclient.discovery.build(
"youtube", "v3", credentials=user_creds
)
videos = user_moderator.get_channel_videos()
return templates.TemplateResponse("videos.html", {
"request": request,
"current_user": current_user,
"videos": videos
})
# 6) Moderate a specific video's comments (GET /video/{video_id}) - requires login
@app.get("/video/{video_id}", response_class=HTMLResponse)
async def moderate_video_comments(
request: Request,
video_id: str,
current_user: User = Depends(get_current_user_from_cookie)
):
# Reconstruct the Credentials object from the stored dict
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials)
user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter())
user_moderator.youtube_service = googleapiclient.discovery.build(
"youtube", "v3",
credentials=user_creds
)
moderation_results = user_moderator.moderate_video_comments(video_id)
return templates.TemplateResponse("video_comments.html", {
"request": request,
"current_user": current_user,
"video": {"id": video_id, "title": "Sample Video Title"}, # Optionally fetch actual title
"safe_comments": moderation_results.get('safe_comments', []),
"flagged_comments": moderation_results.get('gambling_comments', [])
})
# 7) Logout => remove token
@app.get("/logout")
async def logout():
response = RedirectResponse(url="/login")
response.delete_cookie("token")
return response
from jinja2 import Undefined
import json
def pretty_json(value):
if isinstance(value, Undefined):
return ""
return json.dumps(value, ensure_ascii=False, indent=2)
templates.env.filters["pretty_json"] = pretty_json
@app.get("/classify", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {
"request": request,
"result": None,
"comment": "",
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
@app.post("/classify", response_class=HTMLResponse)
async def classify_comment(request: Request, comment: str = Form(...)):
is_gambling, metrics = filter_instance.is_gambling_comment(comment)
result = {"is_gambling": is_gambling, "metrics": metrics}
return templates.TemplateResponse("index.html", {
"request": request,
"result": result,
"comment": comment,
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
@app.post("/add_rule", response_class=HTMLResponse)
async def add_rule(request: Request, rule_type: str = Form(...), rule_value: str = Form(...)):
try:
filter_instance.add_rule(rule_type, rule_value)
message = f"Added rule '{rule_value}' as type '{rule_type}'."
except ValueError as e:
message = str(e)
return templates.TemplateResponse("index.html", {
"request": request,
"result": {"message": message},
"comment": "",
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
@app.post("/upload", response_class=HTMLResponse)
async def upload_file(request: Request, file: UploadFile = File(...), column: str = Form("comment")):
content = await file.read()
try:
if file.filename.endswith('.csv'):
df = pd.read_csv(io.BytesIO(content))
elif file.filename.endswith('.xls') or file.filename.endswith('.xlsx'):
df = pd.read_excel(io.BytesIO(content))
else:
raise ValueError("Unsupported file type.")
except Exception as e:
return templates.TemplateResponse("index.html", {
"request": request,
"result": {"message": f"Error reading file: {e}"},
"comment": "",
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
if column not in df.columns:
return templates.TemplateResponse("index.html", {
"request": request,
"result": {"message": f"Column '{column}' not found. Available columns: {list(df.columns)}"},
"comment": "",
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
comments = df[column].astype(str).tolist()
results = filter_instance.filter_comments(comments)
# Return the results as part of the template context.
return templates.TemplateResponse("index.html", {
"request": request,
"result": {"upload_result": results},
"comment": "",
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})
@app.post("/add_visual_char")
async def add_visual_char(request: Request,
char: str = Form(...),
ascii_equiv: str = Form(...)):
# Add a new mapping
VISUAL_MAP[char] = ascii_equiv
message = f"Added visual map entry '{char}' -> '{ascii_equiv}'."
return templates.TemplateResponse("index.html", {
"request": request,
"result": {"message": message},
"comment": "",
"rules": {
"platform": sorted(list(filter_instance._platform_names)),
"gambling_term": sorted(list(filter_instance._gambling_terms)),
"safe_indicator": sorted(list(filter_instance._safe_indicators)),
"gambling_context": sorted(list(filter_instance._gambling_contexts)),
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms))
}
})