Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
import pandas as pd | |
import gradio as gr | |
import time | |
import os | |
import json | |
import PyPDF2 | |
import io | |
import asyncio | |
import aiohttp | |
import aiofiles | |
import re | |
from datetime import datetime | |
import base64 | |
# ๐งโโ๏ธ Magical Utility Functions ๐งโโ๏ธ | |
def safe_filename(title): | |
"""Convert a string to a safe filename. No more 'file not found' nightmares! ๐ โโ๏ธ๐""" | |
return re.sub(r'[^\w\-_\. ]', '_', title) | |
def create_date_directory(): | |
"""Create a directory named with the current date. It's like a time capsule for your downloads! ๐๏ธ๐ฆ""" | |
date_str = datetime.now().strftime("%Y-%m-%d") | |
os.makedirs(date_str, exist_ok=True) | |
return date_str | |
def get_base64_download_link(content, filename): | |
"""Create a base64 download link for text content. It's like teleportation for your files! ๐๐ฒ""" | |
b64 = base64.b64encode(content.encode()).decode() | |
return f'<a href="data:text/plain;base64,{b64}" download="{filename}">Download {filename}</a>' | |
# ๐ต๏ธโโ๏ธ Data Fetching and Caching Shenanigans ๐ต๏ธโโ๏ธ | |
def get_rank_papers(url, progress=gr.Progress(track_tqdm=True)): | |
"""Fetch papers from the interwebs. It's like fishing, but for knowledge! ๐ฃ๐""" | |
base_url = "https://paperswithcode.com" | |
session = requests.Session() | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', | |
'Cache-Control': 'no-cache' | |
} | |
print("Time run at : ", time.ctime()) | |
offset = 0 | |
data_list = {} | |
break_duplicate = 10 | |
while True: | |
response = session.get(url, headers=headers, params={'page': offset}) | |
if response.status_code != 200: | |
print('Failed to retrieve data') | |
break | |
soup = BeautifulSoup(response.text, 'html.parser') | |
paper_info = soup.find_all('div', class_='row infinite-item item paper-card') | |
if not paper_info: | |
break | |
for ppr in paper_info: | |
title = ppr.find('h1').text.strip() | |
if "paper" in ppr.find('a')['href']: | |
link = base_url + ppr.find('a')['href'] | |
else: | |
link = ppr.find('a')['href'] | |
Github_Star = ppr.find('span', class_='badge badge-secondary').text.strip().replace(',', '') | |
pdf_link = '' | |
try: | |
response_link = session.get(link, headers=headers) | |
soup_link = BeautifulSoup(response_link.text, 'html.parser') | |
paper_info_link = soup_link.find_all('div', class_='paper-abstract') | |
pdf_link = paper_info_link[0].find('div', class_='col-md-12').find('a')['href'] | |
except: | |
pass | |
if title not in data_list: | |
data_list[title] = {'link': link, 'Github Star': int(Github_Star), 'pdf_link': pdf_link.strip()} | |
else: | |
break_duplicate -= 1 | |
if break_duplicate == 0: | |
return data_list | |
offset += 1 | |
progress.update(offset) | |
print('Data retrieval complete') | |
return data_list | |
def load_cached_data(cache_file): | |
"""Load cached data. It's like finding money in your old jeans! ๐ฐ๐งต""" | |
if os.path.exists(cache_file): | |
with open(cache_file, 'r') as f: | |
return json.load(f) | |
return None | |
def save_cached_data(data, cache_file): | |
"""Save data to cache. Future you will thank present you! ๐ฆธโโ๏ธ๐ฐ๏ธ""" | |
with open(cache_file, 'w') as f: | |
json.dump(data, f) | |
def load_and_cache_data(url, cache_file): | |
"""Load data from cache or fetch new data. It's like a time machine for your data! โฐ๐""" | |
cached_data = load_cached_data(cache_file) | |
if cached_data: | |
print(f"Loading cached data from {cache_file}") | |
return cached_data | |
print(f"Fetching new data from {url}") | |
new_data = get_rank_papers(url) | |
save_cached_data(new_data, cache_file) | |
return new_data | |
# ๐ Data Processing and Display Magic ๐ | |
def format_dataframe(data): | |
"""Format data into a pretty DataFrame. It's like giving your data a makeover! ๐ ๐""" | |
df = pd.DataFrame(data).T | |
df['title'] = df.index | |
df = df[['title', 'Github Star', 'link', 'pdf_link']] | |
df = df.sort_values(by='Github Star', ascending=False) | |
df['link'] = df['link'].apply(lambda x: f'<a href="{x}" target="_blank">Link</a>') | |
df['pdf_link'] = df['pdf_link'].apply(lambda x: f'<a href="{x}" target="_blank">{x}</a>') | |
return df | |
def update_display(category): | |
"""Update the display for a category. Freshen up your data like it's spring cleaning! ๐งน๐ธ""" | |
cache_file = f"{category}_papers_cache.json" | |
url = f"https://paperswithcode.com/{category}" if category != "top" else "https://paperswithcode.com/" | |
data = load_and_cache_data(url, cache_file) | |
df = format_dataframe(data) | |
return len(df), df.to_html(escape=False, index=False) | |
def load_all_data(): | |
"""Load data for all categories. It's like a buffet for your brain! ๐ง ๐ฝ๏ธ""" | |
top_count, top_html = update_display("top") | |
new_count, new_html = update_display("latest") | |
greatest_count, greatest_html = update_display("greatest") | |
return top_count, top_html, new_count, new_html, greatest_count, greatest_html | |
# ๐ Asynchronous Paper Processing Wizardry ๐ | |
async def download_and_process_pdf(session, title, paper_info, directory): | |
"""Download and process a PDF. It's like turning lead into gold, but with papers! ๐โจ""" | |
pdf_url = paper_info['pdf_link'] | |
if not pdf_url: | |
return f"No PDF link available for: {title}", None, None | |
try: | |
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout | |
async with session.get(pdf_url, timeout=timeout) as response: | |
if response.status != 200: | |
return f"Failed to download PDF for {title}: HTTP {response.status}", None, None | |
pdf_content = await response.read() | |
file_length = len(pdf_content) | |
if file_length < 5000: # Check if the PDF is less than 5KB | |
return f"Downloaded PDF for {title} is too small ({file_length} bytes). Skipping.", None, None | |
# Convert PDF to text | |
pdf_file = io.BytesIO(pdf_content) | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
if len(text) < 5000: # Check if the extracted text is less than 5KB | |
return f"Extracted text for {title} is too small ({len(text)} characters). Skipping.", None, None | |
safe_title = safe_filename(title) | |
txt_filename = f"{safe_title}.txt" | |
txt_filepath = os.path.join(directory, txt_filename) | |
async with aiofiles.open(txt_filepath, 'w', encoding='utf-8') as f: | |
await f.write(text) | |
return f"Successfully processed: {txt_filename} (File length: {file_length} bytes)", txt_filepath, text | |
except asyncio.TimeoutError: | |
return f"Timeout while downloading PDF for {title}", None, None | |
except Exception as e: | |
return f"Error processing PDF for {title}: {str(e)}", None, None | |
async def process_papers(data, directory, progress=gr.Progress()): | |
"""Process multiple papers asynchronously. It's like juggling papers, but faster! ๐คนโโ๏ธ๐""" | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for title, paper_info in data.items(): | |
task = asyncio.ensure_future(download_and_process_pdf(session, title, paper_info, directory)) | |
tasks.append(task) | |
results = [] | |
successful_downloads = [] | |
errors = [] | |
for i, task in enumerate(asyncio.as_completed(tasks), start=1): | |
result, filepath, text = await task | |
results.append(result) | |
if filepath and text: | |
successful_downloads.append((filepath, text)) | |
else: | |
errors.append(result) | |
progress(i / len(tasks), f"Processed {i}/{len(tasks)} papers") | |
return results, successful_downloads, errors | |
def download_all_papers(progress=gr.Progress()): | |
"""Download and process all papers. It's like hosting a paper party, and everyone's invited! ๐๐""" | |
all_data = {} | |
for category in ["top", "latest", "greatest"]: | |
cache_file = f"{category}_papers_cache.json" | |
data = load_cached_data(cache_file) | |
if data: | |
all_data.update(data) | |
date_directory = create_date_directory() | |
results, successful_downloads, errors = asyncio.run(process_papers(all_data, date_directory, progress)) | |
summary = f"Papers processed: {len(all_data)}\n" | |
summary += f"Successfully downloaded and converted: {len(successful_downloads)}\n" | |
summary += f"Errors: {len(errors)}\n\n" | |
summary += "Error List:\n" + "\n".join(errors) if errors else "No errors occurred." | |
download_links = [] | |
text_contents = [] | |
for filepath, text in successful_downloads: | |
filename = os.path.basename(filepath) | |
download_links.append(get_base64_download_link(text, filename)) | |
text_contents.append(f"--- {filename} ---\n\n{text[:1000]}...\n\n") # Show first 1000 characters | |
return summary, "<br>".join(download_links), "\n".join(text_contents) | |
# ๐ญ Gradio Interface: Where the Magic Happens ๐ญ | |
with gr.Blocks() as demo: | |
gr.Markdown("<h1><center>๐โ๏ธPaperPulse๐๐ - Your ๐AI๐Research๐ฅAccelerator๐</center></h1>") | |
with gr.Tab("๐งโโ๏ธTop๐ต๏ธโโ๏ธTrending๐Papers"): | |
top_count = gr.Textbox(label="Number of Papers Fetched") | |
top_html = gr.HTML() | |
top_button = gr.Button("Refresh Leaderboard") | |
top_button.click(fn=lambda: update_display("top"), inputs=None, outputs=[top_count, top_html]) | |
with gr.Tab("๐คดNew๐PapersWithCode๐ค"): | |
new_count = gr.Textbox(label="Number of Papers Fetched") | |
new_html = gr.HTML() | |
new_button = gr.Button("Refresh Leaderboard") | |
new_button.click(fn=lambda: update_display("latest"), inputs=None, outputs=[new_count, new_html]) | |
with gr.Tab("โจGithub๐Superstars๐ซ"): | |
greatest_count = gr.Textbox(label="Number of Papers Fetched") | |
greatest_html = gr.HTML() | |
greatest_button = gr.Button("Refresh Leaderboard") | |
greatest_button.click(fn=lambda: update_display("greatest"), inputs=None, outputs=[greatest_count, greatest_html]) | |
download_button = gr.Button("๐ Download All Papers", variant="primary") | |
download_output = gr.Textbox(label="Download Status") | |
download_links = gr.HTML(label="Download Links") | |
text_output = gr.Code(label="Paper Contents", language="python") | |
download_button.click(fn=download_all_papers, inputs=None, outputs=[download_output, download_links, text_output]) | |
# Load initial data for all tabs | |
demo.load(fn=load_all_data, outputs=[top_count, top_html, new_count, new_html, greatest_count, greatest_html]) | |
# ๐ Launch the Gradio interface with a public link | |
demo.launch(share=True) |