Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
import gradio as gr | |
import time | |
import os | |
import json | |
import PyPDF2 | |
import io | |
import asyncio | |
import aiohttp | |
import aiofiles | |
from concurrent.futures import ThreadPoolExecutor | |
import re | |
from datetime import datetime | |
import zipfile | |
import base64 | |
def get_rank_papers(url, progress=gr.Progress(track_tqdm=True)): | |
base_url = "https://paperswithcode.com" | |
session = requests.Session() | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', | |
'Cache-Control': 'no-cache' | |
} | |
print("Time run at : ", time.ctime()) | |
offset = 0 | |
data_list = {} | |
break_duplicate = 10 | |
while True: | |
response = session.get(url, headers=headers, params={'page': offset}) | |
if response.status_code != 200: | |
print('Failed to retrieve data') | |
break | |
soup = BeautifulSoup(response.text, 'html.parser') | |
paper_info = soup.find_all('div', class_='row infinite-item item paper-card') | |
if not paper_info: | |
break | |
for ppr in paper_info: | |
title = ppr.find('h1').text.strip() | |
if "paper" in ppr.find('a')['href']: | |
link = base_url + ppr.find('a')['href'] | |
else: | |
link = ppr.find('a')['href'] | |
Github_Star = ppr.find('span', class_='badge badge-secondary').text.strip().replace(',', '') | |
pdf_link = '' | |
try: | |
response_link = session.get(link, headers=headers) | |
soup_link = BeautifulSoup(response_link.text, 'html.parser') | |
paper_info_link = soup_link.find_all('div', class_='paper-abstract') | |
pdf_link = paper_info_link[0].find('div', class_='col-md-12').find('a')['href'] | |
except: | |
pass | |
if title not in data_list: | |
data_list[title] = {'link': link, 'Github Star': int(Github_Star), 'pdf_link': pdf_link.strip()} | |
else: | |
break_duplicate -= 1 | |
if break_duplicate == 0: | |
return data_list | |
offset += 1 | |
progress.update(offset) | |
print('Data retrieval complete') | |
return data_list | |
def load_cached_data(cache_file): | |
if os.path.exists(cache_file): | |
with open(cache_file, 'r') as f: | |
return json.load(f) | |
return None | |
def save_cached_data(data, cache_file): | |
with open(cache_file, 'w') as f: | |
json.dump(data, f) | |
def format_dataframe(data): | |
df = pd.DataFrame(data).T | |
df['title'] = df.index | |
df = df[['title', 'Github Star', 'link', 'pdf_link']] | |
df['link'] = df['link'].apply(lambda x: f'<a href="{x}" target="_blank">Link</a>') | |
df['pdf_link'] = df['pdf_link'].apply(lambda x: f'<a href="{x}" target="_blank">{x}</a>') | |
return df | |
def load_and_cache_data(url, cache_file): | |
cached_data = load_cached_data(cache_file) | |
if cached_data: | |
print(f"Loading cached data from {cache_file}") | |
return cached_data | |
print(f"Fetching new data from {url}") | |
new_data = get_rank_papers(url) | |
save_cached_data(new_data, cache_file) | |
return new_data | |
def update_display(category): | |
cache_file = f"{category}_papers_cache.json" | |
url = f"https://paperswithcode.com/{category}" if category != "top" else "https://paperswithcode.com/" | |
data = load_and_cache_data(url, cache_file) | |
df = format_dataframe(data) | |
return len(df), df.to_html(escape=False, index=False) | |
def load_all_data(): | |
top_count, top_html = update_display("top") | |
new_count, new_html = update_display("latest") | |
greatest_count, greatest_html = update_display("greatest") | |
return top_count, top_html, new_count, new_html, greatest_count, greatest_html | |
def safe_filename(title): | |
"""Convert a string to a safe filename.""" | |
return re.sub(r'[^\w\-_\. ]', '_', title) | |
def create_date_directory(): | |
"""Create a directory named with the current date.""" | |
date_str = datetime.now().strftime("%Y-%m-%d") | |
os.makedirs(date_str, exist_ok=True) | |
return date_str | |
async def download_and_save_pdf(session, title, paper_info, directory): | |
pdf_url = paper_info['pdf_link'] | |
if not pdf_url: | |
return f"No PDF link available for: {title}", None | |
try: | |
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout | |
async with session.get(pdf_url, timeout=timeout) as response: | |
if response.status != 200: | |
return f"Failed to download PDF for {title}: HTTP {response.status}", None | |
pdf_content = await response.read() | |
if len(pdf_content) < 2048: # Check if the PDF is less than 2KB | |
return f"Downloaded PDF for {title} is too small (less than 2KB). Skipping.", None | |
safe_title = safe_filename(title) | |
filename = f"{safe_title}.pdf" | |
filepath = os.path.join(directory, filename) | |
async with aiofiles.open(filepath, 'wb') as f: | |
await f.write(pdf_content) | |
return f"Successfully saved: {filename}", filepath | |
except asyncio.TimeoutError: | |
return f"Timeout while downloading PDF for {title}", None | |
except Exception as e: | |
return f"Error saving PDF for {title}: {str(e)}", None | |
async def process_papers(data, directory, progress=gr.Progress()): | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for title, paper_info in data.items(): | |
task = asyncio.ensure_future(download_and_save_pdf(session, title, paper_info, directory)) | |
tasks.append(task) | |
results = [] | |
successful_downloads = [] | |
errors = [] | |
for i, task in enumerate(asyncio.as_completed(tasks), start=1): | |
result, filepath = await task | |
results.append(result) | |
if filepath: | |
successful_downloads.append(filepath) | |
else: | |
errors.append(result) | |
progress(i / len(tasks), f"Processed {i}/{len(tasks)} papers") | |
return results, successful_downloads, errors | |
def zip_directory(files_to_zip, directory): | |
"""Zip the specified files.""" | |
zip_filename = f"{directory}.zip" | |
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
for file in files_to_zip: | |
zipf.write(file, os.path.relpath(file, os.path.join(directory, '..'))) | |
return zip_filename | |
def get_base64_download_link(file_path): | |
"""Create a base64 download link for a file.""" | |
with open(file_path, "rb") as file: | |
content = file.read() | |
b64 = base64.b64encode(content).decode() | |
return f'<a href="data:application/zip;base64,{b64}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>' | |
def get_existing_zip_links(): | |
"""Get download links for existing zip files.""" | |
links = [] | |
for file in os.listdir('.'): | |
if file.endswith('.zip') and os.path.isfile(file): | |
links.append(get_base64_download_link(file)) | |
return "<br>".join(links) | |
def download_all_papers(progress=gr.Progress()): | |
all_data = {} | |
for category in ["top", "latest", "greatest"]: | |
cache_file = f"{category}_papers_cache.json" | |
data = load_cached_data(cache_file) | |
if data: | |
all_data.update(data) | |
date_directory = create_date_directory() | |
results, successful_downloads, errors = asyncio.run(process_papers(all_data, date_directory, progress)) | |
if successful_downloads: | |
zip_file = zip_directory(successful_downloads, date_directory) | |
download_link = get_base64_download_link(zip_file) | |
else: | |
download_link = "No papers were successfully downloaded." | |
existing_links = get_existing_zip_links() | |
summary = f"Papers processed: {len(all_data)}\n" | |
summary += f"Successfully downloaded: {len(successful_downloads)}\n" | |
summary += f"Errors: {len(errors)}\n\n" | |
summary += "Error List:\n" + "\n".join(errors) if errors else "No errors occurred." | |
return summary, f"{download_link}<br><br>Previous downloads:<br>{existing_links}" | |
with gr.Blocks() as demo: | |
gr.Markdown("<h1><center>Papers Leaderboard</center></h1>") | |
with gr.Tab("Top Trending Papers"): | |
top_count = gr.Textbox(label="Number of Papers Fetched") | |
top_html = gr.HTML() | |
top_button = gr.Button("Refresh Leaderboard") | |
top_button.click(fn=lambda: update_display("top"), inputs=None, outputs=[top_count, top_html]) | |
with gr.Tab("New Papers"): | |
new_count = gr.Textbox(label="Number of Papers Fetched") | |
new_html = gr.HTML() | |
new_button = gr.Button("Refresh Leaderboard") | |
new_button.click(fn=lambda: update_display("latest"), inputs=None, outputs=[new_count, new_html]) | |
with gr.Tab("Greatest Papers"): | |
greatest_count = gr.Textbox(label="Number of Papers Fetched") | |
greatest_html = gr.HTML() | |
greatest_button = gr.Button("Refresh Leaderboard") | |
greatest_button.click(fn=lambda: update_display("greatest"), inputs=None, outputs=[greatest_count, greatest_html]) | |
download_button = gr.Button("๐ Download All Papers", variant="primary") | |
download_output = gr.Textbox(label="Download Status") | |
download_links = gr.HTML(label="Download Links") | |
download_button.click(fn=download_all_papers, inputs=None, outputs=[download_output, download_links]) | |
# Load initial data for all tabs | |
demo.load(fn=load_all_data, outputs=[top_count, top_html, new_count, new_html, greatest_count, greatest_html]) | |
# Launch the Gradio interface with a public link | |
demo.launch(share=True) |