#main.py import re import time import os import logging from typing import List, Dict, Optional, Set, Tuple import google_auth_oauthlib.flow import googleapiclient.discovery import googleapiclient.errors from google_auth_oauthlib.flow import Flow from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.security import OAuth2PasswordBearer from google.oauth2.credentials import Credentials from pydantic import BaseModel import unicodedata import unidecode import io import pandas as pd import json from dotenv import load_dotenv # For monitoring with Prometheus load_dotenv() # Configure logging at the top of the file logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) VISUAL_MAP = { 'А': 'A','В': 'B','С': 'C','Е': 'E','Н': 'H','К': 'K','М': 'M','О': 'O','Р': 'P','Т': 'T','Х': 'X', 'а': 'a','в': 'b','с': 'c','е': 'e','о': 'o','р': 'p','х': 'x','у': 'y', 'Я': 'R','я': 'r', 'ρ': 'p', 'Π': 'P', # etc... } # At the top of your main.py, after your imports: # In a real DB model, you'd do this in a table. # But for demonstration, let's store it in memory: manual_overrides = {} # This might be a class-level dict keyed by comment_id or (video_id, comment_id) from google.oauth2 import service_account def get_google_credentials(): if os.getenv("HF_SPACE") == "true": # In Hugging Face Spaces: load from secrets service_account_str = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON") if not service_account_str: raise RuntimeError("Missing GOOGLE_SERVICE_ACCOUNT_JSON in Hugging Face secret.") service_account_info = json.loads(service_account_str) credentials = service_account.Credentials.from_service_account_info(service_account_info) # Attach the service account info so we can retrieve it later credentials._sa_info = service_account_info return credentials else: # Local development: use OAuth flow flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( "./app/client_secret.json", scopes=[ "https://www.googleapis.com/auth/youtube.readonly", "https://www.googleapis.com/auth/youtube.force-ssl" ], redirect_uri=os.getenv('YOUTUBE_REDIRECT_URI') ) return flow.run_local_server(port=0) def keep_comment(comment_id: str, video_id: str): # Mark this comment as manually kept manual_overrides[(video_id, comment_id)] = "safe" # --- GamblingFilter class (with rule updates) --- class GamblingFilter: """ A high-performance filter for detecting online gambling-related comments. Features include aggressive Unicode normalization, keyword matching, and pattern detection. """ def __init__(self): logger.info("Initializing GamblingFilter") self._platform_names: Set[str] = { 'agustoto', 'aero', 'aero88', 'dora', 'dora77', 'dewadora', 'pulau777', 'pulau', '777', 'jptogel', 'mandalika', 'cnd88', 'axl', 'berkah99', 'weton88', 'garuda', 'hoki' } self._gambling_terms: Set[str] = { 'jackpot', 'jp', 'wd', 'depo', 'cuan', 'gacor', 'gacir', 'jekpot', 'sultan', 'rezeki nomplok', 'rezeki', 'menang', 'nomplok', 'deposit', 'withdraw', 'maxwin', 'auto sultan', 'jepe', 'jepee', 'bikin nagih', 'berkah' } self._ambiguous_terms: Set[str] = { 'auto', 'main', 'bermain', 'hasil', 'dapat', 'dapet', 'berkat' } self._safe_indicators: Set[str] = { 'tidak mengandung', 'bukan perjudian', 'tanpa perjudian', 'dokumentasi', 'profesional', 'pembelajaran' } self._gambling_contexts: List[str] = [ r'(main|bermain|coba).{1,30}(dapat|dapet|pro|jadi|langsung|menang|jp|cuan)', r'(modal|depo).{1,30}(jadi|langsung|wd|cuan)', r'(jp|jackpot|jekpot).{1,30}(gede|besar|pecah)', r'(berkat|dari).{1,30}(rezeki|menang|cuan|sultan)', r'(gacor|gacir).{1,30}(terus|parah|tiap|hari)', r'(rezeki|cuan).{1,30}(nomplok|datang|mengalir|lancar)', r'(hari ini).{1,30}(menang|cuan|rezeki|berkat)', r'(malah|eh).{1,30}(jadi|dapat|dapet|rezeki)', r'(auto).{1,30}(sultan|cuan|rezeki|kaya)', r'(0\d:[0-5]\d).{1,30}(menang|rezeki|cuan|gacor)', r'(iseng|coba).{1,30}(malah|jadi|eh|pro)', r'(deposit|depo|wd).{1,30}(jadi|langsung)', r'(langsung|auto).{1,30}(jp|cuan|sultan|rezeki)', r'bikin\s+nagih', r'gak\s+ada\s+duanya', r'berkah.{0,20}rezeki', r'puji\s+syukur' ] self._compiled_gambling_contexts = [ re.compile(pattern, re.IGNORECASE | re.DOTALL) for pattern in self._gambling_contexts ] self._update_platform_pattern() self._number_pattern = re.compile(r'(88|777|77|99|7+)') def _update_platform_pattern(self): """Recompile the platform name regex based on current _platform_names.""" platform_patterns = [] for platform in self._platform_names: # chars = list(platform) # strict = ''.join(f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]*' for c in chars[:-1]) + f'[{chars[-1].upper()}{chars[-1].lower()}]' # flexible = '.*?'.join(re.escape(c) for c in chars) # platform_patterns.append(f'({strict})') # platform_patterns.append(f'({flexible})') chars = list(platform) # e.g. ['p', 'u', 'l', 'a', 'u'] # Each letter can be followed by up to 3 non-alphanumeric chars: # (or fewer if you want to be more strict) segments = [ f'[{c.upper()}{c.lower()}][^a-zA-Z0-9]{{0,3}}' for c in chars[:-1] ] # Then the last char without trailing non-alphanumerics segments.append(f'[{chars[-1].upper()}{chars[-1].lower()}]') strict = ''.join(segments) platform_patterns.append(strict) self._platform_pattern = re.compile('|'.join(platform_patterns), re.DOTALL) def add_rule(self, rule_type: str, rule_value: str): """ Add a new rule based on the rule type. Allowed types: 'platform', 'gambling_term', 'safe_indicator', 'gambling_context', 'ambiguous_term' """ rule_type = rule_type.lower() if rule_type == 'platform': self._platform_names.add(rule_value) self._update_platform_pattern() elif rule_type == 'gambling_term': self._gambling_terms.add(rule_value) elif rule_type == 'safe_indicator': self._safe_indicators.add(rule_value) elif rule_type == 'gambling_context': self._gambling_contexts.append(rule_value) self._compiled_gambling_contexts.append(re.compile(rule_value, re.IGNORECASE | re.DOTALL)) elif rule_type == 'ambiguous_term': self._ambiguous_terms.add(rule_value) else: raise ValueError("Unsupported rule type") def _strip_all_formatting(self, text: str) -> str: result = [] for c in text: if c.isalnum() or c.isspace(): result.append(c.lower()) return ''.join(result) def _aggressive_normalize_text(self, text: str) -> str: normalized = unicodedata.normalize('NFKD', text) ascii_text = ''.join(c for c in normalized if ord(c) < 128) return ascii_text.lower() def _robust_normalize(self, text: str) -> str: """ 1) Replace visually-similar letters (Cyrillic/Greek) with Latin equivalents. 2) Then use unidecode to handle bold/italic forms, fullwidth, etc. 3) Lowercase the result. """ # Step 1: custom pass for visual lookalikes mapped_chars = [] for ch in text: if ch in VISUAL_MAP: mapped_chars.append(VISUAL_MAP[ch]) else: mapped_chars.append(ch) mapped_text = ''.join(mapped_chars) # Step 2: apply normal Unicode decomposition + unidecode # This handles bold/italic/mathematical letters, fullwidth forms, etc. decomposed = unicodedata.normalize('NFKD', mapped_text) ascii_equiv = unidecode.unidecode(decomposed) # Step 3: lowercase the result return ascii_equiv.lower() def _extract_platform_names(self, text: str) -> List[str]: matches = [] pattern_matches = self._platform_pattern.findall(text) if pattern_matches: pattern_matches = [m for sublist in pattern_matches for m in sublist if m] matches.extend(pattern_matches) normalized = self._robust_normalize(text) stripped = self._strip_all_formatting(text) for platform in self._platform_names: if platform in normalized or platform in stripped: if not any(platform in m.lower() for m in matches): matches.append(platform) if '88' in text or '88' in normalized: if not any('88' in m for m in matches): matches.append('88') if '777' in text or '777' in normalized: if not any('777' in m for m in matches): matches.append('777') return matches def normalize_text(self, text: str) -> str: normalized = unicodedata.normalize('NFKD', text) normalized = ''.join(c for c in normalized if ord(c) < 128 or c.isspace()) return normalized.lower() def is_gambling_comment(self, text: str, threshold: float = 0.55) -> Tuple[bool, Dict]: start_time = time.time() logger.info(f"Analyzing comment for gambling content: {text[:100]}...") metrics = { 'platform_matches': [], 'gambling_term_matches': [], 'context_matches': [], 'safe_indicators': [], 'has_numbers': False, 'confidence_score': 0.0, 'processing_time_ms': 0 } normalized_text = self.normalize_text(text) stripped_text = self._strip_all_formatting(text) aggressive_text = self._robust_normalize(text) for indicator in self._safe_indicators: if indicator in normalized_text.lower(): metrics['safe_indicators'].append(indicator) if len(metrics['safe_indicators']) > 0: metrics['confidence_score'] = 0.0 metrics['processing_time_ms'] = (time.time() - start_time) * 1000 return False, metrics platform_matches = self._extract_platform_names(text) if platform_matches: metrics['platform_matches'] = platform_matches for term in self._gambling_terms: if (term in normalized_text.lower() or term in stripped_text.lower() or term in aggressive_text.lower()): metrics['gambling_term_matches'].append(term) if self._number_pattern.search(normalized_text): metrics['has_numbers'] = True for pattern in self._compiled_gambling_contexts: match = pattern.search(normalized_text) if match: metrics['context_matches'].append(match.group(0)) match = pattern.search(aggressive_text) if match and match.group(0) not in metrics['context_matches']: metrics['context_matches'].append(match.group(0)) platform_score = min(len(metrics['platform_matches']) * 1.0, 1) term_score = min(len(metrics['gambling_term_matches']) * 0.2, 0.4) context_score = min(len(metrics['context_matches']) * 0.2, 0.4) number_score = 0.1 if metrics['has_numbers'] else 0 if platform_score > 0 and (term_score > 0 or context_score > 0): total_score = platform_score + term_score + context_score + number_score elif context_score > 0.2 and term_score > 0: total_score = context_score + term_score + number_score else: total_score = max(platform_score, term_score, context_score) * 0.8 metrics['confidence_score'] = min(total_score, 1.0) if ("berkah" in normalized_text.lower() or "berkah" in aggressive_text.lower()) and \ ("rezeki" in normalized_text.lower() or "rezeki" in aggressive_text.lower()) and \ len(metrics['platform_matches']) > 0: metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) if "Special case: berkah+rezeki+platform" not in metrics['context_matches']: metrics['context_matches'].append("Special case: berkah+rezeki+platform") elif ("puji" in normalized_text.lower() or "puji" in aggressive_text.lower()) and \ ("syukur" in normalized_text.lower() or "syukur" in aggressive_text.lower()) and \ len(metrics['platform_matches']) > 0: metrics['confidence_score'] = max(metrics['confidence_score'], 0.7) if "Special case: puji+syukur+platform" not in metrics['context_matches']: metrics['context_matches'].append("Special case: puji+syukur+platform") metrics['processing_time_ms'] = (time.time() - start_time) * 1000 is_gambling = metrics['confidence_score'] >= threshold return is_gambling, metrics def filter_comments(self, comments: List[str], threshold: float = 0.55) -> Dict[str, List]: result = { 'gambling_comments': [], 'safe_comments': [], 'metrics': [] } for comment in comments: is_gambling, metrics = self.is_gambling_comment(comment, threshold) if is_gambling: result['gambling_comments'].append(comment) else: result['safe_comments'].append(comment) metrics['original_text'] = comment result['metrics'].append(metrics) return result class YouTubeCommentModerator: def __init__(self, client_secrets_path: str = "./app/client_secret.json", gambling_filter: Optional[GamblingFilter] = None): """ Initialize the YouTube Comment Moderator with configurable settings. :param client_secrets_path: Path to OAuth 2.0 client secrets file :param gambling_filter: Optional pre-configured GamblingFilter instance """ # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) self.logger = logging.getLogger(__name__) # OAuth configuration self.client_secrets_path = client_secrets_path self.scopes = [ "https://www.googleapis.com/auth/youtube.readonly", "https://www.googleapis.com/auth/youtube.force-ssl" ] # Disable OAuthlib's HTTPS verification when running locally os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" # YouTube service self.youtube_service = None # Gambling Filter self.gambling_filter = gambling_filter or GamblingFilter() def authenticate(self) -> bool: """ Authenticate with YouTube Data API. :return: Boolean indicating successful authentication """ try: # flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( # self.client_secrets_path, self.scopes) credentials = get_google_credentials() self.youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=credentials ) self.logger.info("YouTube API authentication successful.") return True except Exception as e: self.logger.error(f"Authentication failed: {e}") return False def moderate_video_comments(self, video_id: str, threshold: float = 0.55) -> Dict: if not self.youtube_service: self.logger.error("YouTube service not authenticated.") return {"error": "Not authenticated"} try: comments = [] request = self.youtube_service.commentThreads().list( part="snippet", videoId=video_id, maxResults=100, textFormat="plainText" ) response = request.execute() moderation_results = { "total_comments": 0, "gambling_comments": [], "safe_comments": [], "moderation_metrics": [] } while request is not None: for item in response.get("items", []): comment_id = item["snippet"]["topLevelComment"]["id"] comment_snippet = item["snippet"]["topLevelComment"]["snippet"] comment_text = comment_snippet["textDisplay"] # Check for manual override first if manual_overrides.get((video_id, comment_id)) == "safe": # The user previously pressed "Keep" - skip the gambling filter is_gambling = False metrics = {"confidence_score": 0.0} else: # Normal path - filter it is_gambling, metrics = self.gambling_filter.is_gambling_comment(comment_text, threshold) comment_info = { "id": comment_id, "text": comment_text, "author": comment_snippet["authorDisplayName"], "is_gambling": is_gambling, "metrics": metrics } moderation_results["total_comments"] += 1 if is_gambling: moderation_results["gambling_comments"].append(comment_info) else: moderation_results["safe_comments"].append(comment_info) metrics["original_text"] = comment_text moderation_results["moderation_metrics"].append(metrics) # Handle pagination if available request = self.youtube_service.commentThreads().list_next(request, response) if request: response = request.execute() else: break return moderation_results except Exception as e: self.logger.error(f"Error moderating comments: {e}") return {"error": str(e)} def delete_comment(self, comment_id: str) -> bool: """ Delete a specific comment. :param comment_id: YouTube comment ID :return: Boolean indicating successful deletion """ try: # self.youtube_service.comments().delete(id=comment_id).execute() self.youtube_service.comments().setModerationStatus( id=comment_id, moderationStatus="rejected" ).execute() self.logger.info(f"Comment {comment_id} deleted successfully.") return True except Exception as e: self.logger.error(f"Failed to delete comment {comment_id}: {e}") return False def get_channel_videos(self, max_results: int = 50) -> List[Dict]: """ Retrieve videos from authenticated user's channel. :param max_results: Maximum number of videos to retrieve :return: List of video details """ if not self.youtube_service: self.logger.error("YouTube service not authenticated.") return [] try: request = self.youtube_service.search().list( part="snippet", channelId=self._get_channel_id(), maxResults=max_results, type="video" ) response = request.execute() videos = [] for item in response.get("items", []): video_info = { "id": item["id"]["videoId"], "title": item["snippet"]["title"], "thumbnail": item["snippet"]["thumbnails"]["default"]["url"] } videos.append(video_info) return videos except Exception as e: self.logger.error(f"Error retrieving videos: {e}") return [] def _get_channel_id(self) -> Optional[str]: """ Retrieve the authenticated user's channel ID. :return: Channel ID or None """ try: request = self.youtube_service.channels().list(part="id", mine=True) response = request.execute() return response["items"][0]["id"] except Exception as e: self.logger.error(f"Error retrieving channel ID: {e}") return None class User(BaseModel): username: str email: Optional[str] = None youtube_credentials: Optional[Dict] = None class UserDatabase: """ In-memory user database. In a production app, replace with a proper database like SQLAlchemy """ users = {} @classmethod def create_user(cls, username: str, credentials: Dict): user = User(username=username, youtube_credentials=credentials) cls.users[username] = user return user @classmethod def get_user(cls, username: str): return cls.users.get(username) class YouTubeAuthenticator: @staticmethod def authenticate_with_client_secrets(client_secrets_file=None): try: credentials = get_google_credentials() return credentials except Exception as e: raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}") # --- FastAPI application setup --- app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # Create a single instance of the GamblingFilter filter_instance = GamblingFilter() # ----Google ---- class GoogleOAuthHandler: def __init__(self): # Configuration paths and settings self.client_secrets_file = "./app/client_secret.json" self.scopes = [ 'https://www.googleapis.com/auth/youtube.readonly', 'https://www.googleapis.com/auth/userinfo.profile' ] self.redirect_uri = os.getenv('YOUTUBE_REDIRECT_URI', 'http://localhost:8000/oauth/callback') def create_oauth_flow(self): """ Create OAuth 2.0 Flow for Google Authorization """ flow = Flow.from_client_secrets_file( self.client_secrets_file, scopes=self.scopes, redirect_uri=self.redirect_uri ) return flow def initiate_oauth_flow(self): """ Generate Authorization URL for OAuth Flow This method can be called when you want to start the OAuth authentication process. In your case, it would be triggered from the login route. """ flow = self.create_oauth_flow() # Generate authorization URL authorization_url, state = flow.authorization_url( access_type='offline', # Ensures we get a refresh token prompt='consent', # Forces user to see and accept consent screen include_granted_scopes='true' ) return authorization_url def handle_oauth_callback(self, authorization_code): """ Handle the OAuth callback and retrieve user credentials This method exchanges the authorization code for access and refresh tokens """ try: # Create flow and exchange authorization code for tokens flow = self.create_oauth_flow() flow.fetch_token(code=authorization_code) # Get credentials credentials = flow.credentials # Fetch user information oauth2_client = build('oauth2', 'v2', credentials=credentials) user_info = oauth2_client.userinfo().get().execute() # Build YouTube service to get channel details youtube_service = build('youtube', 'v3', credentials=credentials) channel_req = youtube_service.channels().list(part="snippet", mine=True) channel_resp = channel_req.execute() # Extract channel username or use email as fallback if "items" in channel_resp and len(channel_resp["items"]) > 0: channel_username = channel_resp['items'][0]['snippet']['title'] else: channel_username = user_info.get('email', 'unknown_user') # Convert credentials to dict for storage credentials_dict = { 'token': credentials.token, 'refresh_token': credentials.refresh_token, 'token_uri': credentials.token_uri, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'scopes': credentials.scopes } return { 'username': channel_username, 'credentials': credentials_dict, 'user_info': user_info } except Exception as e: raise HTTPException(status_code=400, detail=f"OAuth callback failed: {str(e)}") moderator = YouTubeCommentModerator(gambling_filter=filter_instance) @app.post("/moderate_video") async def moderate_video(request: Request, video_id: str = Form(...), threshold: float = Form(0.55)): if not moderator.youtube_service: result = {"error": "YouTube service not authenticated. Please authenticate first."} else: result = moderator.moderate_video_comments(video_id, threshold) return templates.TemplateResponse("index.html", { "request": request, "result": result, "video_id": video_id, "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) @app.delete("/api/comments/{comment_id}") async def api_delete_comment( request: Request, comment_id: str, video_id: str ): current_user = get_current_user_from_cookie(request) user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) user_moderator.youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=user_creds ) success = user_moderator.delete_comment(comment_id) return {"success": success} # OAuth2 Password Bearer for session management oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Hardcoded client secrets path (you'll need to replace this with your actual path) CLIENT_SECRETS_PATH = "./app/client_secret.json" # 1) Root route => Decide if user is logged in; if not, go to /login @app.get("/", response_class=HTMLResponse) async def root_redirect(request: Request): token = request.cookies.get("token") if token: return RedirectResponse(url="/videos", status_code=303) else: return RedirectResponse(url="/login", status_code=303) # 2) Show the login form (GET /login) @app.get("/login", response_class=HTMLResponse) async def login_form(request: Request): return templates.TemplateResponse("login.html", {"request": request}) # 3) Handle login submission (POST /login) => Google OAuth => /videos @app.post("/login", response_class=HTMLResponse) async def login( request: Request, username: str = Form(None) # Make username optional ): try: # Get credentials (will return service account creds on HF, OAuth creds locally) credentials = get_google_credentials() youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=credentials ) # If running in Hugging Face Space, use a default username if os.getenv("HF_SPACE") == "true": channel_username = "hf_space_user" else: req = youtube_service.channels().list(part="snippet", mine=True) resp = req.execute() if "items" in resp and len(resp["items"]) > 0: channel_username = resp['items'][0]['snippet']['title'] else: channel_username = "unknown_user" # Convert credentials to dict for storage import json if hasattr(credentials, "to_json"): credentials_dict = json.loads(credentials.to_json()) elif hasattr(credentials, "_sa_info"): credentials_dict = credentials._sa_info else: credentials_dict = {} # Create or update user in our "database" user = UserDatabase.create_user(channel_username, credentials_dict) # Determine cookie settings based on environment if os.getenv("HF_SPACE") == "true": secure_cookie = True samesite_value = "none" else: secure_cookie = False samesite_value = "lax" # Set the user token in a cookie and redirect to /videos response = RedirectResponse(url="/videos", status_code=303) response.set_cookie( key="token", value=channel_username, max_age=1800, httponly=True, secure=secure_cookie, samesite=samesite_value ) return response except Exception as e: return templates.TemplateResponse("login.html", { "request": request, "error": f"Authentication failed: {str(e)}" }) @app.post("/api/comments/keep/{comment_id}") async def api_keep_comment( request: Request, comment_id: str, video_id: str ): try: logging.debug(f"Received keep request for comment_id: {comment_id}, video_id: {video_id}") # Get current user's credentials current_user = get_current_user_from_cookie(request) logging.debug(f"Current user: {current_user.username}") user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) # Create a moderator instance with user credentials user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) user_moderator.youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=user_creds ) logging.debug("Setting moderation status to 'published' on YouTube...") # Mark comment as approved on YouTube result = user_moderator.youtube_service.comments().setModerationStatus( id=comment_id, moderationStatus="published" # This marks the comment as approved ).execute() logging.debug(f"YouTube API response: {result}") # Add the comment ID to the manual overrides so it won't be reflagged keep_comment(comment_id, video_id) # Ensure this function is defined and working logging.debug("Manual override saved for comment.") return {"success": True, "message": "Comment kept successfully"} except Exception as e: logging.error(f"Error keeping comment: {e}", exc_info=True) return {"success": False, "error": str(e)} @app.get("/refresh_comments/{video_id}") async def refresh_video_comments( request: Request, video_id: str, threshold: float = 0.55 ): """ Refresh comments for a specific video, reapplying moderation. :param request: Request object :param video_id: ID of the video to refresh comments for :param threshold: Gambling confidence threshold :return: Rendered template with updated comments """ # Get current user's credentials current_user = get_current_user_from_cookie(request) if not current_user: return RedirectResponse(url="/login", status_code=303) try: # Recreate moderator with current user's credentials user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) moderator = YouTubeCommentModerator(gambling_filter=filter_instance) moderator.youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=user_creds ) # Moderate comments for the video result = moderator.moderate_video_comments(video_id, threshold) # Fetch video details to pass to template youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=user_creds ) video_request = youtube_service.videos().list( part="snippet", id=video_id ) video_response = video_request.execute() video_info = video_response['items'][0]['snippet'] if video_response['items'] else {} return templates.TemplateResponse("video_comments.html", { "request": request, "video": { "id": video_id, "title": video_info.get('title', 'Unknown Video') }, "safe_comments": result.get('safe_comments', []), "flagged_comments": result.get('gambling_comments', []), "total_comments": result.get('total_comments', 0) }) except Exception as e: logging.error(f"Error refreshing comments: {e}") return templates.TemplateResponse("error.html", { "request": request, "error": f"Failed to refresh comments: {str(e)}" }) # 4) Protected route to fetch current user from cookie def get_current_user(token: str = Depends(oauth2_scheme)): username = token # In a real app, decode/validate token properly user = UserDatabase.get_user(username) if not user: raise HTTPException(status_code=401, detail="Invalid authentication credentials") return user def get_current_user_from_cookie(request: Request): token = request.cookies.get("token") if not token: raise HTTPException(status_code=401, detail="Not authenticated") user = UserDatabase.get_user(token) if not user: raise HTTPException(status_code=401, detail="Invalid authentication credentials") return user @app.exception_handler(HTTPException) async def auth_exception_handler(request: Request, exc: HTTPException): if exc.status_code == 401: # Redirect the user to the login page return RedirectResponse(url="/login") # For other HTTP errors, return a JSON response return JSONResponse( status_code=exc.status_code, content={"detail": exc.detail}, ) # 5) List user's videos (GET /videos) - requires login @app.get("/videos", response_class=HTMLResponse) async def list_videos(request: Request, current_user: User = Depends(get_current_user_from_cookie)): # Reconstruct the credentials from the stored dictionary user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) user_moderator.youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=user_creds ) videos = user_moderator.get_channel_videos() return templates.TemplateResponse("videos.html", { "request": request, "current_user": current_user, "videos": videos }) # 6) Moderate a specific video's comments (GET /video/{video_id}) - requires login @app.get("/video/{video_id}", response_class=HTMLResponse) async def moderate_video_comments( request: Request, video_id: str, current_user: User = Depends(get_current_user_from_cookie) ): # Reconstruct the Credentials object from the stored dict user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) user_moderator = YouTubeCommentModerator(gambling_filter=GamblingFilter()) user_moderator.youtube_service = googleapiclient.discovery.build( "youtube", "v3", credentials=user_creds ) moderation_results = user_moderator.moderate_video_comments(video_id) return templates.TemplateResponse("video_comments.html", { "request": request, "current_user": current_user, "video": {"id": video_id, "title": "Sample Video Title"}, # Optionally fetch actual title "safe_comments": moderation_results.get('safe_comments', []), "flagged_comments": moderation_results.get('gambling_comments', []) }) # 7) Logout => remove token @app.get("/logout") async def logout(): response = RedirectResponse(url="/login") response.delete_cookie("token") return response from jinja2 import Undefined import json def pretty_json(value): if isinstance(value, Undefined): return "" return json.dumps(value, ensure_ascii=False, indent=2) templates.env.filters["pretty_json"] = pretty_json @app.get("/classify", response_class=HTMLResponse) async def read_root(request: Request): return templates.TemplateResponse("index.html", { "request": request, "result": None, "comment": "", "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) @app.post("/classify", response_class=HTMLResponse) async def classify_comment(request: Request, comment: str = Form(...)): is_gambling, metrics = filter_instance.is_gambling_comment(comment) result = {"is_gambling": is_gambling, "metrics": metrics} return templates.TemplateResponse("index.html", { "request": request, "result": result, "comment": comment, "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) @app.post("/add_rule", response_class=HTMLResponse) async def add_rule(request: Request, rule_type: str = Form(...), rule_value: str = Form(...)): try: filter_instance.add_rule(rule_type, rule_value) message = f"Added rule '{rule_value}' as type '{rule_type}'." except ValueError as e: message = str(e) return templates.TemplateResponse("index.html", { "request": request, "result": {"message": message}, "comment": "", "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) @app.post("/upload", response_class=HTMLResponse) async def upload_file(request: Request, file: UploadFile = File(...), column: str = Form("comment")): content = await file.read() try: if file.filename.endswith('.csv'): df = pd.read_csv(io.BytesIO(content)) elif file.filename.endswith('.xls') or file.filename.endswith('.xlsx'): df = pd.read_excel(io.BytesIO(content)) else: raise ValueError("Unsupported file type.") except Exception as e: return templates.TemplateResponse("index.html", { "request": request, "result": {"message": f"Error reading file: {e}"}, "comment": "", "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) if column not in df.columns: return templates.TemplateResponse("index.html", { "request": request, "result": {"message": f"Column '{column}' not found. Available columns: {list(df.columns)}"}, "comment": "", "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) comments = df[column].astype(str).tolist() results = filter_instance.filter_comments(comments) # Return the results as part of the template context. return templates.TemplateResponse("index.html", { "request": request, "result": {"upload_result": results}, "comment": "", "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } }) @app.post("/add_visual_char") async def add_visual_char(request: Request, char: str = Form(...), ascii_equiv: str = Form(...)): # Add a new mapping VISUAL_MAP[char] = ascii_equiv message = f"Added visual map entry '{char}' -> '{ascii_equiv}'." return templates.TemplateResponse("index.html", { "request": request, "result": {"message": message}, "comment": "", "rules": { "platform": sorted(list(filter_instance._platform_names)), "gambling_term": sorted(list(filter_instance._gambling_terms)), "safe_indicator": sorted(list(filter_instance._safe_indicators)), "gambling_context": sorted(list(filter_instance._gambling_contexts)), "ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) } })