#!/usr/bin/env python3 import os import glob import pandas as pd import gradio as gr import time import pyarrow as pa import pyarrow.parquet as pq import json from pathlib import Path # Configuration DATA_DIR = Path("../data/tiktok_profiles") CACHE_FILE = Path("../data/tiktok_profiles_combined.parquet") PROCESSED_FILES_LOG = Path("../data/processed_files.json") COLUMNS = [ "id", "unique_id", "follower_count", "nickname", "video_count", "following_count", "signature", "email", "bio_link", "updated_at", "tt_seller", "region", "language", "url", ] def get_processed_files(): """ Get the list of already processed files from the log. Returns a set of filenames that have been processed. """ if PROCESSED_FILES_LOG.exists(): with open(PROCESSED_FILES_LOG, "r") as f: return set(json.load(f)) return set() def update_processed_files(processed_files): """ Update the log of processed files. """ PROCESSED_FILES_LOG.parent.mkdir(exist_ok=True) with open(PROCESSED_FILES_LOG, "w") as f: json.dump(list(processed_files), f) def load_data(force_reload=False): """ Load data from either the cache file or from individual CSV files. Only processes new files that haven't been processed before. Returns a pandas DataFrame with all the data. Args: force_reload: If True, reprocess all files regardless of whether they've been processed before. """ start_time = time.time() # Get all available CSV files all_csv_files = {file.name: file for file in DATA_DIR.glob("*.csv")} # If cache exists and we're not forcing a reload, load from cache if CACHE_FILE.exists() and not force_reload: print(f"Loading data from cache file: {CACHE_FILE}") df = pd.read_parquet(CACHE_FILE) # Check for new files processed_files = get_processed_files() new_files = [ all_csv_files[name] for name in all_csv_files if name not in processed_files ] if not new_files: print( f"No new files to process. Data loaded in {time.time() - start_time:.2f} seconds" ) return df print(f"Found {len(new_files)} new files to process") # Process only the new files new_dfs = [] for i, file in enumerate(new_files): print(f"Loading new file {i+1}/{len(new_files)}: {file.name}") # Read CSV with optimized settings chunk_df = pd.read_csv( file, dtype={ "id": "str", "unique_id": "str", "follower_count": "Int64", "nickname": "str", "video_count": "Int64", "following_count": "Int64", "signature": "str", "email": "str", "bio_link": "str", "updated_at": "str", "tt_seller": "str", "region": "str", "language": "str", "url": "str", }, low_memory=False, ) new_dfs.append(chunk_df) processed_files.add(file.name) if new_dfs: # Combine new data with existing data print("Combining new data with existing data...") new_data = pd.concat(new_dfs, ignore_index=True) df = pd.concat([df, new_data], ignore_index=True) # Remove duplicates based on unique_id df = df.drop_duplicates(subset=["unique_id"], keep="last") # Save updated data to cache file print(f"Saving updated data to {CACHE_FILE}") df.to_parquet(CACHE_FILE, index=False) # Update the processed files log update_processed_files(processed_files) print(f"Data loaded and updated in {time.time() - start_time:.2f} seconds") return df # If no cache file or force_reload is True, process all files print(f"Loading data from CSV files in {DATA_DIR}") # Get all CSV files csv_files = list(all_csv_files.values()) total_files = len(csv_files) print(f"Found {total_files} CSV files") # Load data in chunks dfs = [] processed_files = set() for i, file in enumerate(csv_files): if i % 10 == 0: print(f"Loading file {i+1}/{total_files}: {file.name}") # Read CSV with optimized settings chunk_df = pd.read_csv( file, dtype={ "id": "str", "unique_id": "str", "follower_count": "Int64", "nickname": "str", "video_count": "Int64", "following_count": "Int64", "signature": "str", "email": "str", "bio_link": "str", "updated_at": "str", "tt_seller": "str", "region": "str", "language": "str", "url": "str", }, low_memory=False, ) dfs.append(chunk_df) processed_files.add(file.name) # Combine all dataframes print("Combining all dataframes...") df = pd.concat(dfs, ignore_index=True) # Remove duplicates based on unique_id df = df.drop_duplicates(subset=["unique_id"], keep="last") # Save to cache file print(f"Saving combined data to {CACHE_FILE}") CACHE_FILE.parent.mkdir(exist_ok=True) df.to_parquet(CACHE_FILE, index=False) # Update the processed files log update_processed_files(processed_files) print(f"Data loaded and cached in {time.time() - start_time:.2f} seconds") return df def search_by_username(df, username): """Search for profiles by username (unique_id)""" if not username: return pd.DataFrame() # Case-insensitive search results = df[df["unique_id"].str.lower().str.contains(username.lower(), na=False)] return results.head(100) # Limit results to prevent UI overload def search_by_nickname(df, nickname): """Search for profiles by nickname""" if not nickname: return pd.DataFrame() # Case-insensitive search results = df[df["nickname"].str.lower().str.contains(nickname.lower(), na=False)] return results.head(100) # Limit results to prevent UI overload def search_by_follower_count(df, min_followers, max_followers): """Search for profiles by follower count range""" if min_followers is None: min_followers = 0 if max_followers is None: max_followers = df["follower_count"].max() results = df[ (df["follower_count"] >= min_followers) & (df["follower_count"] <= max_followers) ] return results.head(100) # Limit results to prevent UI overload def format_results(df): """Format the results for display""" if df.empty: # Return an empty DataFrame with the same columns instead of a string return pd.DataFrame(columns=df.columns) # Format the DataFrame for display display_df = df.copy() # Convert follower count to human-readable format def format_number(num): if pd.isna(num): return "N/A" if num >= 1_000_000: return f"{num/1_000_000:.1f}M" elif num >= 1_000: return f"{num/1_000:.1f}K" return str(num) display_df["follower_count"] = display_df["follower_count"].apply(format_number) display_df["video_count"] = display_df["video_count"].apply(format_number) display_df["following_count"] = display_df["following_count"].apply(format_number) return display_df def combined_search( df, min_followers, max_followers, min_videos, max_videos, signature_query, region, has_email, ): """Combined search function using all criteria""" results = df.copy() # Apply each filter if provided if min_followers is not None: results = results[results["follower_count"] >= min_followers] if max_followers is not None: results = results[results["follower_count"] <= max_followers] if min_videos is not None: results = results[results["video_count"] >= min_videos] if max_videos is not None: results = results[results["video_count"] <= max_videos] if signature_query: results = results[ results["signature"] .str.lower() .str.contains(signature_query.lower(), na=False) ] if region: results = results[results["region"].str.lower() == region.lower()] # Filter for profiles with email if has_email: results = results[results["email"].notna() & (results["email"] != "")] return results.head(1000) # Limit to 1000 results to prevent UI overload def create_interface(df): """Create the Gradio interface""" # Get min and max follower counts for slider min_followers_global = max(1000, int(df["follower_count"].min())) max_followers_global = min(10000000, int(df["follower_count"].max())) # Get min and max video counts for slider min_videos_global = max(1, int(df["video_count"].min())) max_videos_global = min(10000, int(df["video_count"].max())) # Get unique regions for dropdown regions = sorted(df["region"].dropna().unique().tolist()) regions = [""] + regions # Add empty option with gr.Blocks(title="TikTok Creator Analyzer") as interface: gr.Markdown("# TikTok Creator Analyzer") gr.Markdown(f"Database contains {len(df):,} creator profiles") # Show top 100 profiles by default top_profiles = df.sort_values(by="follower_count", ascending=False).head(100) default_view = format_results(top_profiles) with gr.Tab("Overview"): gr.Markdown("## Top 100 Profiles by Follower Count") overview_results = gr.Dataframe(value=default_view, label="Top Profiles") refresh_btn = gr.Button("Refresh") refresh_btn.click( fn=lambda: format_results( df.sort_values(by="follower_count", ascending=False).head(100) ), inputs=[], outputs=overview_results, ) with gr.Tab("Advanced Search"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Follower Count") min_followers_slider = gr.Slider( minimum=min_followers_global, maximum=max_followers_global, value=min_followers_global, step=1000, label="Minimum Followers", interactive=True, ) max_followers_slider = gr.Slider( minimum=min_followers_global, maximum=max_followers_global, value=max_followers_global, step=1000, label="Maximum Followers", interactive=True, ) gr.Markdown("### Video Count") min_videos_slider = gr.Slider( minimum=min_videos_global, maximum=max_videos_global, value=min_videos_global, step=10, label="Minimum Videos", interactive=True, ) max_videos_slider = gr.Slider( minimum=min_videos_global, maximum=max_videos_global, value=max_videos_global, step=10, label="Maximum Videos", interactive=True, ) with gr.Column(scale=1): signature_input = gr.Textbox(label="Keywords in Signature") region_input = gr.Dropdown(label="Region", choices=regions) has_email_checkbox = gr.Checkbox(label="Has Email", value=False) search_btn = gr.Button("Search", variant="primary", size="lg") results_count = gr.Markdown("### Results: 0 profiles found") # Create a dataframe with download button with gr.Row(): search_results = gr.Dataframe(label="Results") download_btn = gr.Button("Download Results as CSV") # Function to update results count def update_results_count(results_df): count = len(results_df) return f"### Results: {count:,} profiles found" # Function to perform search and update results def perform_search( min_followers, max_followers, min_videos, max_videos, signature, region, has_email, ): results = combined_search( df, min_followers, max_followers, min_videos, max_videos, signature, region, has_email, ) formatted_results = format_results(results) count_text = update_results_count(results) return formatted_results, count_text # Function to download results as CSV def download_results(results_df): if results_df.empty: return None # Convert back to original format for download download_df = df[df["unique_id"].isin(results_df["unique_id"])] # Save to temporary CSV file temp_csv = "temp_results.csv" download_df.to_csv(temp_csv, index=False) return temp_csv # Connect the search button search_btn.click( fn=perform_search, inputs=[ min_followers_slider, max_followers_slider, min_videos_slider, max_videos_slider, signature_input, region_input, has_email_checkbox, ], outputs=[search_results, results_count], ) # Connect the download button download_btn.click( fn=download_results, inputs=[search_results], outputs=[gr.File(label="Download")], ) with gr.Tab("Statistics"): gr.Markdown("## Database Statistics") # Calculate some basic statistics total_creators = len(df) total_followers = df["follower_count"].sum() avg_followers = df["follower_count"].mean() median_followers = df["follower_count"].median() max_followers = df["follower_count"].max() stats_md = f""" - Total Creators: {total_creators:,} - Total Followers: {total_followers:,} - Average Followers: {avg_followers:,.2f} - Median Followers: {median_followers:,} - Max Followers: {max_followers:,} """ gr.Markdown(stats_md) with gr.Tab("Maintenance"): gr.Markdown("## Database Maintenance") # Get processed files info processed_files = get_processed_files() maintenance_md = f""" - Total processed files: {len(processed_files)} - Last update: {time.ctime(CACHE_FILE.stat().st_mtime) if CACHE_FILE.exists() else 'Never'} """ gr.Markdown(maintenance_md) with gr.Row(): force_reload_btn = gr.Button("Force Reload All Files") reload_status = gr.Markdown("Click to reload all files from scratch") def reload_all_files(): return "Reloading all files... This may take a while. Please restart the application." force_reload_btn.click( fn=reload_all_files, inputs=[], outputs=reload_status ) return interface def main(): print("Loading TikTok creator data...") df = load_data() print(f"Loaded {len(df):,} creator profiles") # Create and launch the interface interface = create_interface(df) interface.launch(share=True, server_name="0.0.0.0") if __name__ == "__main__": main()