import gradio as gr import argilla as rg import pandas as pd import os import time from collections import defaultdict from fastapi import FastAPI from functools import lru_cache client = rg.Argilla( api_url=os.getenv("ARGILLA_API_URL", ""), api_key=os.getenv("ARGILLA_API_KEY", "") ) countries = { "Argentina": { "iso": "ARG", "emoji": "🇦🇷" }, "Bolivia": { "iso": "BOL", "emoji": "🇧🇴" }, "Chile": { "iso": "CHL", "emoji": "🇨🇱" }, "Colombia": { "iso": "COL", "emoji": "🇨🇴" }, "Costa Rica": { "iso": "CRI", "emoji": "🇨🇷" }, "Cuba": { "iso": "CUB", "emoji": "🇨🇺" }, "Ecuador": { "iso": "ECU", "emoji": "🇪🇨" }, "El Salvador": { "iso": "SLV", "emoji": "🇸🇻" }, "España": { "iso": "ESP", "emoji": "🇪🇸" }, "Guatemala": { "iso": "GTM", "emoji": "🇬🇹" }, "Honduras": { "iso": "HND", "emoji": "🇭🇳" }, "México": { "iso": "MEX", "emoji": "🇲🇽" }, "Nicaragua": { "iso": "NIC", "emoji": "🇳🇮" }, "Panamá": { "iso": "PAN", "emoji": "🇵🇦" }, "Paraguay": { "iso": "PRY", "emoji": "🇵🇾" }, "Perú": { "iso": "PER", "emoji": "🇵🇪" }, "Puerto Rico": { "iso": "PRI", "emoji": "🇵🇷" }, "República Dominicana": { "iso": "DOM", "emoji": "🇩🇴" }, "Uruguay": { "iso": "URY", "emoji": "🇺🇾" }, "Venezuela": { "iso": "VEN", "emoji": "🇻🇪" } } def get_blend_es_data(): data = [] for country in countries.keys(): iso = countries[country]["iso"] emoji = countries[country]["emoji"] dataset_name = f"{emoji} {country} - {iso} - Responder" try: print(f"Processing dataset: {dataset_name}") dataset = client.datasets(dataset_name) records = list(dataset.records(with_responses=True)) dataset_contributions = defaultdict(int) user_mapping = {} for record in records: record_dict = record.to_dict() if "answer_1" in record_dict["responses"]: for answer in record_dict["responses"]["answer_1"]: if answer["user_id"]: user_id = answer["user_id"] dataset_contributions[user_id] += 1 if user_id not in user_mapping: try: user = client.users(id=user_id) user_mapping[user_id] = user.username except Exception as e: print(f"Error getting username for {user_id}: {e}") user_mapping[user_id] = f"User-{user_id[:8]}" for user_id, count in dataset_contributions.items(): username = user_mapping.get(user_id, f"User-{user_id[:8]}") data.append({ "source": "blend-es", "username": username, "count": count }) except Exception as e: print(f"Error processing dataset {dataset_name}: {e}") return data def get_include_data(): data = [] try: if os.path.exists("include.csv"): include_df = pd.read_csv("include.csv") if "Nombre en Discord / username" in include_df.columns and "Número de preguntas / number of questions" in include_df.columns: discord_users = defaultdict(int) for _, row in include_df.iterrows(): username = row["Nombre en Discord / username"][1:] questions = row["Número de preguntas / number of questions"] if pd.notna(username) and pd.notna(questions): discord_users[username.lower()] += int(questions) for username, count in discord_users.items(): data.append({ "source": "include", "username": username, "count": count }) except Exception as e: print(f"Error loading include.csv: {e}") return data def get_mail_to_username_mapping(): mail_to_discord = {} try: if os.path.exists("mail_to_username.csv"): mapping_df = pd.read_csv("mail_to_username.csv") if "gmail" in mapping_df.columns and "discord" in mapping_df.columns: for _, row in mapping_df.iterrows(): mail = row["gmail"] discord = row["discord"] if pd.notna(mail) and pd.notna(discord): mail_to_discord[mail.lower()] = discord.lower() except Exception as e: print(f"Error loading mail_to_username.csv: {e}") return mail_to_discord def get_estereotipos_data(): data = [] mail_to_discord = get_mail_to_username_mapping() try: if os.path.exists("token_id_counts.csv"): counts_df = pd.read_csv("token_id_counts.csv") if "token_id" in counts_df.columns and "count" in counts_df.columns: mail_counts = defaultdict(int) for _, row in counts_df.iterrows(): mail = row["token_id"] count = row["count"] if pd.notna(mail) and pd.notna(count): mail_counts[mail.lower()] += int(count) for mail, count in mail_counts.items(): username = mail_to_discord.get(mail.lower(), "") if not username: username = mail.split('@')[0] if '@' in mail else mail data.append({ "source": "estereotipos", "username": username, "count": count }) except Exception as e: print(f"Error loading estereotipos data: {e}") return data def get_arena_data(): data = [] mail_to_discord = get_mail_to_username_mapping() try: if os.path.exists("arena.json"): import json with open("arena.json", "r", encoding="utf-8") as f: arena_data = json.load(f) mail_counts = defaultdict(int) for country, conversations in arena_data.items(): for conversation in conversations: if "username" in conversation: mail = conversation["username"] if mail: mail_counts[mail.lower()] += 1 for mail, count in mail_counts.items(): username = mail_to_discord.get(mail.lower(), "") if not username: username = mail.split('@')[0] if '@' in mail else mail data.append({ "source": "arena", "username": username, "count": count }) except Exception as e: print(f"Error loading arena data: {e}") return data @lru_cache(maxsize=32) def get_user_contributions_cached(cache_buster: int): return consolidate_all_data() def consolidate_all_data(): all_data = [] all_data.extend(get_blend_es_data()) all_data.extend(get_include_data()) all_data.extend(get_estereotipos_data()) all_data.extend(get_arena_data()) user_contributions = defaultdict(lambda: {"username": "", "blend_es": 0, "include": 0, "estereotipos": 0, "arena": 0}) for item in all_data: source = item["source"] username = item["username"] count = item["count"] user_key = username.lower() if not user_contributions[user_key]["username"]: user_contributions[user_key]["username"] = username if source == "blend-es": user_contributions[user_key]["blend_es"] += count elif source == "include": user_contributions[user_key]["include"] += count elif source == "estereotipos": user_contributions[user_key]["estereotipos"] += count elif source == "arena": user_contributions[user_key]["arena"] += count rows = [] for _, data in user_contributions.items(): total = data["blend_es"] + data["include"] + data["estereotipos"] + data["arena"] row = { "Username": data["username"], "Total": total, "Blend-es": data["blend_es"], "INCLUDE": data["include"], "Estereotipos": data["estereotipos"], "Arena": data["arena"] } rows.append(row) df = pd.DataFrame(rows) if not df.empty: df = df.sort_values("Total", ascending=False) return df app = FastAPI() last_update_time = 0 cached_data = None def create_leaderboard_ui(): global cached_data, last_update_time current_time = time.time() if cached_data is not None and current_time - last_update_time < 300: df = cached_data else: cache_buster = int(current_time) df = get_user_contributions_cached(cache_buster) cached_data = df last_update_time = current_time if not df.empty: df = df.reset_index(drop=True) df.index = df.index + 1 df = df.rename_axis("Rank") df = df.reset_index() df_html = df.to_html(classes="leaderboard-table", border=0, index=False) styled_html = f"""

Última Actualización: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_update_time))}

{df_html}
""" return styled_html def refresh_data(): global cached_data, last_update_time cached_data = None last_update_time = 0 return create_leaderboard_ui() with gr.Blocks(theme=gr.themes.Default()) as demo: with gr.Column(scale=1): gr.Markdown("""# 🏆 Hackaton Leaderboard""") leaderboard_html = gr.HTML(create_leaderboard_ui) refresh_btn = gr.Button("🔄 Actualizar Datos", variant="primary") refresh_btn.click(fn=refresh_data, outputs=leaderboard_html) gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)