Spaces:
Sleeping
Sleeping
File size: 9,774 Bytes
f2c0706 657eccf f2c0706 a7988d4 f2c0706 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf 58bbfbc 657eccf f2c0706 657eccf f2c0706 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
import requests
from bs4 import BeautifulSoup
import pandas as pd
import gradio as gr
import time
import os
import json
import PyPDF2
import io
import asyncio
import aiohttp
import aiofiles
from concurrent.futures import ThreadPoolExecutor
import re
from datetime import datetime
import zipfile
import base64
def get_rank_papers(url, progress=gr.Progress(track_tqdm=True)):
base_url = "https://paperswithcode.com"
session = requests.Session()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Cache-Control': 'no-cache'
}
print("Time run at : ", time.ctime())
offset = 0
data_list = {}
break_duplicate = 10
while True:
response = session.get(url, headers=headers, params={'page': offset})
if response.status_code != 200:
print('Failed to retrieve data')
break
soup = BeautifulSoup(response.text, 'html.parser')
paper_info = soup.find_all('div', class_='row infinite-item item paper-card')
if not paper_info:
break
for ppr in paper_info:
title = ppr.find('h1').text.strip()
if "paper" in ppr.find('a')['href']:
link = base_url + ppr.find('a')['href']
else:
link = ppr.find('a')['href']
Github_Star = ppr.find('span', class_='badge badge-secondary').text.strip().replace(',', '')
pdf_link = ''
try:
response_link = session.get(link, headers=headers)
soup_link = BeautifulSoup(response_link.text, 'html.parser')
paper_info_link = soup_link.find_all('div', class_='paper-abstract')
pdf_link = paper_info_link[0].find('div', class_='col-md-12').find('a')['href']
except:
pass
if title not in data_list:
data_list[title] = {'link': link, 'Github Star': int(Github_Star), 'pdf_link': pdf_link.strip()}
else:
break_duplicate -= 1
if break_duplicate == 0:
return data_list
offset += 1
progress.update(offset)
print('Data retrieval complete')
return data_list
def load_cached_data(cache_file):
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return json.load(f)
return None
def save_cached_data(data, cache_file):
with open(cache_file, 'w') as f:
json.dump(data, f)
def format_dataframe(data):
df = pd.DataFrame(data).T
df['title'] = df.index
df = df[['title', 'Github Star', 'link', 'pdf_link']]
df['link'] = df['link'].apply(lambda x: f'<a href="{x}" target="_blank">Link</a>')
df['pdf_link'] = df['pdf_link'].apply(lambda x: f'<a href="{x}" target="_blank">{x}</a>')
return df
def load_and_cache_data(url, cache_file):
cached_data = load_cached_data(cache_file)
if cached_data:
print(f"Loading cached data from {cache_file}")
return cached_data
print(f"Fetching new data from {url}")
new_data = get_rank_papers(url)
save_cached_data(new_data, cache_file)
return new_data
def update_display(category):
cache_file = f"{category}_papers_cache.json"
url = f"https://paperswithcode.com/{category}" if category != "top" else "https://paperswithcode.com/"
data = load_and_cache_data(url, cache_file)
df = format_dataframe(data)
return len(df), df.to_html(escape=False, index=False)
def load_all_data():
top_count, top_html = update_display("top")
new_count, new_html = update_display("latest")
greatest_count, greatest_html = update_display("greatest")
return top_count, top_html, new_count, new_html, greatest_count, greatest_html
def safe_filename(title):
"""Convert a string to a safe filename."""
return re.sub(r'[^\w\-_\. ]', '_', title)
def create_date_directory():
"""Create a directory named with the current date."""
date_str = datetime.now().strftime("%Y-%m-%d")
os.makedirs(date_str, exist_ok=True)
return date_str
async def download_and_save_pdf(session, title, paper_info, directory):
pdf_url = paper_info['pdf_link']
if not pdf_url:
return f"No PDF link available for: {title}", None
try:
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout
async with session.get(pdf_url, timeout=timeout) as response:
if response.status != 200:
return f"Failed to download PDF for {title}: HTTP {response.status}", None
pdf_content = await response.read()
if len(pdf_content) < 2048: # Check if the PDF is less than 2KB
return f"Downloaded PDF for {title} is too small (less than 2KB). Skipping.", None
safe_title = safe_filename(title)
filename = f"{safe_title}.pdf"
filepath = os.path.join(directory, filename)
async with aiofiles.open(filepath, 'wb') as f:
await f.write(pdf_content)
return f"Successfully saved: {filename}", filepath
except asyncio.TimeoutError:
return f"Timeout while downloading PDF for {title}", None
except Exception as e:
return f"Error saving PDF for {title}: {str(e)}", None
async def process_papers(data, directory, progress=gr.Progress()):
async with aiohttp.ClientSession() as session:
tasks = []
for title, paper_info in data.items():
task = asyncio.ensure_future(download_and_save_pdf(session, title, paper_info, directory))
tasks.append(task)
results = []
successful_downloads = []
errors = []
for i, task in enumerate(asyncio.as_completed(tasks), start=1):
result, filepath = await task
results.append(result)
if filepath:
successful_downloads.append(filepath)
else:
errors.append(result)
progress(i / len(tasks), f"Processed {i}/{len(tasks)} papers")
return results, successful_downloads, errors
def zip_directory(files_to_zip, directory):
"""Zip the specified files."""
zip_filename = f"{directory}.zip"
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files_to_zip:
zipf.write(file, os.path.relpath(file, os.path.join(directory, '..')))
return zip_filename
def get_base64_download_link(file_path):
"""Create a base64 download link for a file."""
with open(file_path, "rb") as file:
content = file.read()
b64 = base64.b64encode(content).decode()
return f'<a href="data:application/zip;base64,{b64}" download="{os.path.basename(file_path)}">Download {os.path.basename(file_path)}</a>'
def get_existing_zip_links():
"""Get download links for existing zip files."""
links = []
for file in os.listdir('.'):
if file.endswith('.zip') and os.path.isfile(file):
links.append(get_base64_download_link(file))
return "<br>".join(links)
def download_all_papers(progress=gr.Progress()):
all_data = {}
for category in ["top", "latest", "greatest"]:
cache_file = f"{category}_papers_cache.json"
data = load_cached_data(cache_file)
if data:
all_data.update(data)
date_directory = create_date_directory()
results, successful_downloads, errors = asyncio.run(process_papers(all_data, date_directory, progress))
if successful_downloads:
zip_file = zip_directory(successful_downloads, date_directory)
download_link = get_base64_download_link(zip_file)
else:
download_link = "No papers were successfully downloaded."
existing_links = get_existing_zip_links()
summary = f"Papers processed: {len(all_data)}\n"
summary += f"Successfully downloaded: {len(successful_downloads)}\n"
summary += f"Errors: {len(errors)}\n\n"
summary += "Error List:\n" + "\n".join(errors) if errors else "No errors occurred."
return summary, f"{download_link}<br><br>Previous downloads:<br>{existing_links}"
with gr.Blocks() as demo:
gr.Markdown("<h1><center>Papers Leaderboard</center></h1>")
with gr.Tab("Top Trending Papers"):
top_count = gr.Textbox(label="Number of Papers Fetched")
top_html = gr.HTML()
top_button = gr.Button("Refresh Leaderboard")
top_button.click(fn=lambda: update_display("top"), inputs=None, outputs=[top_count, top_html])
with gr.Tab("New Papers"):
new_count = gr.Textbox(label="Number of Papers Fetched")
new_html = gr.HTML()
new_button = gr.Button("Refresh Leaderboard")
new_button.click(fn=lambda: update_display("latest"), inputs=None, outputs=[new_count, new_html])
with gr.Tab("Greatest Papers"):
greatest_count = gr.Textbox(label="Number of Papers Fetched")
greatest_html = gr.HTML()
greatest_button = gr.Button("Refresh Leaderboard")
greatest_button.click(fn=lambda: update_display("greatest"), inputs=None, outputs=[greatest_count, greatest_html])
download_button = gr.Button("๐ Download All Papers", variant="primary")
download_output = gr.Textbox(label="Download Status")
download_links = gr.HTML(label="Download Links")
download_button.click(fn=download_all_papers, inputs=None, outputs=[download_output, download_links])
# Load initial data for all tabs
demo.load(fn=load_all_data, outputs=[top_count, top_html, new_count, new_html, greatest_count, greatest_html])
# Launch the Gradio interface with a public link
demo.launch(share=True) |