Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
import gradio as gr | |
import time | |
import os | |
import json | |
import aiohttp | |
import aiofiles | |
import re | |
from datetime import datetime | |
# π§ββοΈ Magical Utility Functions π§ββοΈ | |
def safe_filename(title): | |
"""Convert a string to a safe filename. No more 'file not found' nightmares! π ββοΈπ""" | |
return re.sub(r'[^\w\-_\. ]', '_', title) | |
# π¬ Animated Banner Messages π¬ | |
def animated_banner(message, emoji): | |
"""Create an animated banner message. It's like a tiny parade for your console! ππ©""" | |
frames = [ | |
f"βββββ {emoji} βββββ\nβ {message:^16} β\nββββββββββββββ", | |
f"βββββ {emoji} βββββ\nβ {message:^16} β\nββββββββββββββ", | |
f"βββββ{emoji}βββββ\nβ {message:^14} β\nββββββββββββ", | |
f"ββββ{emoji}ββββ\nβ {message:^12} β\nββββββββββ", | |
f"βββ{emoji}βββ\nβ {message:^10} β\nββββββββ", | |
f"ββ{emoji}ββ\nβ {message:^8} β\nββββββ", | |
f"β{emoji}β\nβ {message:^6} β\nββββ", | |
] | |
return frames | |
# π΅οΈββοΈ Data Fetching and Caching Shenanigans π΅οΈββοΈ | |
def get_rank_papers(url, progress=gr.Progress(track_tqdm=True)): | |
"""Fetch papers from the interwebs. It's like fishing, but for knowledge! π£π""" | |
base_url = "https://paperswithcode.com" | |
session = requests.Session() | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', | |
'Cache-Control': 'no-cache' | |
} | |
print("Time run at : ", time.ctime()) | |
offset = 0 | |
data_list = {} | |
break_duplicate = 10 | |
while True: | |
response = session.get(url, headers=headers, params={'page': offset}) | |
if response.status_code != 200: | |
print('Failed to retrieve data') | |
break | |
soup = BeautifulSoup(response.text, 'html.parser') | |
paper_info = soup.find_all('div', class_='row infinite-item item paper-card') | |
if not paper_info: | |
print("No paper information found.") | |
break | |
for ppr in paper_info: | |
title = ppr.find('h1').text.strip() | |
if "paper" in ppr.find('a')['href']: | |
link = base_url + ppr.find('a')['href'] | |
else: | |
link = ppr.find('a')['href'] | |
Github_Star = ppr.find('span', class_='badge badge-secondary').text.strip().replace(',', '') if ppr.find('span', class_='badge badge-secondary') else "0" | |
pdf_link = '' | |
try: | |
response_link = session.get(link, headers=headers) | |
soup_link = BeautifulSoup(response_link.text, 'html.parser') | |
paper_info_link = soup_link.find_all('div', class_='paper-abstract') | |
pdf_link = paper_info_link[0].find('div', class_='col-md-12').find('a')['href'] | |
except Exception as e: | |
print(f"Failed to retrieve PDF link for {title}: {e}") | |
print(f"Title: {title}, Link: {link}, Github Star: {Github_Star}, PDF Link: {pdf_link}") | |
if title not in data_list: | |
data_list[title] = {'link': link, 'Github Star': int(Github_Star), 'pdf_link': pdf_link.strip()} | |
else: | |
break_duplicate -= 1 | |
if break_duplicate == 0: | |
return data_list | |
offset += 1 | |
progress.update(offset) | |
print('Data retrieval complete') | |
return data_list | |
def load_cached_data(cache_file): | |
"""Load cached data. It's like finding money in your old jeans! π°π§΅""" | |
if os.path.exists(cache_file): | |
with open(cache_file, 'r') as f: | |
return json.load(f) | |
return None | |
def save_cached_data(data, cache_file): | |
"""Save data to cache. Future you will thank present you! π¦ΈββοΈπ°οΈ""" | |
with open(cache_file, 'w') as f: | |
json.dump(data, f) | |
def load_and_cache_data(url, cache_file): | |
"""Load data from cache or fetch new data. It's like a time machine for your data! β°π""" | |
cached_data = load_cached_data(cache_file) | |
if cached_data: | |
print(f"Loading cached data from {cache_file}") | |
return cached_data | |
print(f"Fetching new data from {url}") | |
new_data = get_rank_papers(url) | |
save_cached_data(new_data, cache_file) | |
return new_data | |
# π Data Processing and Display Magic π | |
def format_dataframe(data): | |
"""Format data into a pretty DataFrame. It's like giving your data a makeover! π π""" | |
if not data: | |
print("No data found to format.") | |
return pd.DataFrame() | |
df = pd.DataFrame(data).T | |
df['title'] = df.index | |
# Check if required columns are present | |
if 'Github Star' in df.columns and 'link' in df.columns and 'pdf_link' in df.columns: | |
df = df[['title', 'Github Star', 'link', 'pdf_link']] | |
df = df.sort_values(by='Github Star', ascending=False) | |
df['link'] = df['link'].apply(lambda x: f'<a href="{x}" target="_blank">Link</a>') | |
df['pdf_link'] = df['pdf_link'].apply(lambda x: f'<a href="{x}" target="_blank">{x}</a>') | |
else: | |
print("Required columns are missing in the dataframe.") | |
print(f"Columns available: {df.columns}") | |
return df | |
def update_display(category): | |
"""Update the display for a category. Freshen up your data like it's spring cleaning! π§ΉπΈ""" | |
cache_file = f"{category}_papers_cache.json" | |
url = f"https://paperswithcode.com/{category}" if category != "top" else "https://paperswithcode.com/" | |
data = load_and_cache_data(url, cache_file) | |
df = format_dataframe(data) | |
return len(df), df.to_html(escape=False, index=False) | |
def load_all_data(): | |
"""Load data for all categories. It's like a buffet for your brain! π§ π½οΈ""" | |
top_count, top_html = update_display("top") | |
new_count, new_html = update_display("latest") | |
greatest_count, greatest_html = update_display("greatest") | |
return top_count, top_html, new_count, new_html, greatest_count, greatest_html | |
# π Asynchronous Web Page Downloading π | |
async def download_webpage(session, title, paper_info): | |
"""Download the webpage content instead of the PDF. It's like browsing, but faster! ππ""" | |
link_url = paper_info['link'] | |
if not link_url: | |
return f"π« No link for: {title}. It's playing hide and seek! π", None, None | |
try: | |
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout | |
async with session.get(link_url, timeout=timeout) as response: | |
if response.status != 200: | |
return f"π¨ Failed to grab webpage for {title}: HTTP {response.status}. The internet gremlins strike again! πΉ", None, None | |
page_content = await response.text() | |
# Combine the content as a Python type representation | |
code_block = f'"""\nTitle: {title}\nLink: {link_url}\n"""\n\n# Webpage Content\n{repr(page_content)}\n' | |
return f"π Successfully downloaded webpage for: {title}.", code_block, page_content | |
except asyncio.TimeoutError: | |
return f"β³ Timeout for {title}. The webpage is playing hard to get! π", None, None | |
except Exception as e: | |
return f"π₯ Oops! Error downloading {title}: {str(e)}. Gremlins in the system! π οΈ", None, None | |
async def process_webpages(data, progress=gr.Progress()): | |
"""Process multiple papers asynchronously by downloading their webpages. π€ΉββοΈπ""" | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for title, paper_info in data.items(): | |
task = asyncio.ensure_future(download_webpage(session, title, paper_info)) | |
tasks.append(task) | |
results = [] | |
codes = [] | |
for i, task in enumerate(asyncio.as_completed(tasks), start=1): | |
result, code_block, page_content = await task | |
results.append(result) | |
if code_block: | |
codes.append(code_block) | |
progress(i / len(tasks), f"π Processed {i}/{len(tasks)} papers. Downloading...") | |
return results, codes | |
def download_all_webpages(progress=gr.Progress()): | |
"""Download and display all paper webpages. It's like hosting a web party, and everyone's invited! ππ""" | |
all_data = {} | |
for category in ["top", "latest", "greatest"]: | |
cache_file = f"{category}_papers_cache.json" | |
data = load_cached_data(cache_file) | |
if data: | |
all_data.update(data) | |
# Download the webpage content | |
results, code_blocks = asyncio.run(process_webpages(all_data, progress)) | |
summary = f"π Papers processed: {len(all_data)} (We're basically librarians now!)\n" | |
summary += f"β Successfully downloaded: {len(code_blocks)} webpages\n" | |
summary += f"β Errors: {len(results) - len(code_blocks)} (Even superheroes have off days)\n\n" | |
return summary, "\n\n".join(code_blocks) | |
# π Gradio Interface: Where the Magic Happens π | |
with gr.Blocks() as demo: | |
gr.Markdown("<h1><center>Papers Leaderboard</center></h1>") | |
with gr.Tab("Top Trending Papers"): | |
top_count = gr.Textbox(label="Number of Papers Fetched") | |
top_html = gr.HTML() | |
top_button = gr.Button("Refresh Leaderboard") | |
top_button.click(fn=lambda: update_display("top"), inputs=None, outputs=[top_count, top_html]) | |
with gr.Tab("New Papers"): | |
new_count = gr.Textbox(label="Number of Papers Fetched") | |
new_html = gr.HTML() | |
new_button = gr.Button("Refresh Leaderboard") | |
new_button.click(fn=lambda: update_display("latest"), inputs=None, outputs=[new_count, new_html]) | |
with gr.Tab("Greatest Papers"): | |
greatest_count = gr.Textbox(label="Number of Papers Fetched") | |
greatest_html = gr.HTML() | |
greatest_button = gr.Button("Refresh Leaderboard") | |
greatest_button.click(fn=lambda: update_display("greatest"), inputs=None, outputs=[greatest_count, greatest_html]) | |
download_button = gr.Button("π Download All Paper Webpages", variant="primary") | |
download_output = gr.Textbox(label="Download Status") | |
code_output = gr.Code(label="Paper Webpage Contents", language="python") | |
download_button.click(fn=download_all_webpages, inputs=None, outputs=[download_output, code_output]) | |
# Load initial data for all tabs | |
demo.load(fn=load_all_data, outputs=[top_count, top_html, new_count, new_html, greatest_count, greatest_html]) | |
# π Launch the Gradio interface with a public link | |
print("π Launching the Papers Leaderboard! Get ready for a wild ride through the land of academia! π’π") | |
demo.launch(share=True) | |