PaperPulse / app.py
awacke1's picture
Update app.py
02eff1c verified
import requests
from bs4 import BeautifulSoup
import pandas as pd
import gradio as gr
import time
import os
import json
import PyPDF2
import io
import asyncio
import aiohttp
import aiofiles
import re
from datetime import datetime
import base64
# ๐Ÿง™โ€โ™‚๏ธ Magical Utility Functions ๐Ÿง™โ€โ™‚๏ธ
def safe_filename(title):
"""Convert a string to a safe filename. No more 'file not found' nightmares! ๐Ÿ™…โ€โ™‚๏ธ๐Ÿ“"""
return re.sub(r'[^\w\-_\. ]', '_', title)
def create_date_directory():
"""Create a directory named with the current date. It's like a time capsule for your downloads! ๐Ÿ—“๏ธ๐Ÿ“ฆ"""
date_str = datetime.now().strftime("%Y-%m-%d")
os.makedirs(date_str, exist_ok=True)
return date_str
def get_base64_download_link(content, filename):
"""Create a base64 download link for text content. It's like teleportation for your files! ๐ŸŒŸ๐Ÿ“ฒ"""
b64 = base64.b64encode(content.encode()).decode()
return f'<a href="data:text/plain;base64,{b64}" download="{filename}">Download {filename}</a>'
# ๐Ÿ•ต๏ธโ€โ™‚๏ธ Data Fetching and Caching Shenanigans ๐Ÿ•ต๏ธโ€โ™‚๏ธ
def get_rank_papers(url, progress=gr.Progress(track_tqdm=True)):
"""Fetch papers from the interwebs. It's like fishing, but for knowledge! ๐ŸŽฃ๐Ÿ“š"""
base_url = "https://paperswithcode.com"
session = requests.Session()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Cache-Control': 'no-cache'
}
print("Time run at : ", time.ctime())
offset = 0
data_list = {}
break_duplicate = 10
while True:
response = session.get(url, headers=headers, params={'page': offset})
if response.status_code != 200:
print('Failed to retrieve data')
break
soup = BeautifulSoup(response.text, 'html.parser')
paper_info = soup.find_all('div', class_='row infinite-item item paper-card')
if not paper_info:
break
for ppr in paper_info:
title = ppr.find('h1').text.strip()
if "paper" in ppr.find('a')['href']:
link = base_url + ppr.find('a')['href']
else:
link = ppr.find('a')['href']
Github_Star = ppr.find('span', class_='badge badge-secondary').text.strip().replace(',', '')
pdf_link = ''
try:
response_link = session.get(link, headers=headers)
soup_link = BeautifulSoup(response_link.text, 'html.parser')
paper_info_link = soup_link.find_all('div', class_='paper-abstract')
pdf_link = paper_info_link[0].find('div', class_='col-md-12').find('a')['href']
except:
pass
if title not in data_list:
data_list[title] = {'link': link, 'Github Star': int(Github_Star), 'pdf_link': pdf_link.strip()}
else:
break_duplicate -= 1
if break_duplicate == 0:
return data_list
offset += 1
progress.update(offset)
print('Data retrieval complete')
return data_list
def load_cached_data(cache_file):
"""Load cached data. It's like finding money in your old jeans! ๐Ÿ’ฐ๐Ÿงต"""
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return json.load(f)
return None
def save_cached_data(data, cache_file):
"""Save data to cache. Future you will thank present you! ๐Ÿฆธโ€โ™‚๏ธ๐Ÿ•ฐ๏ธ"""
with open(cache_file, 'w') as f:
json.dump(data, f)
def load_and_cache_data(url, cache_file):
"""Load data from cache or fetch new data. It's like a time machine for your data! โฐ๐Ÿ”„"""
cached_data = load_cached_data(cache_file)
if cached_data:
print(f"Loading cached data from {cache_file}")
return cached_data
print(f"Fetching new data from {url}")
new_data = get_rank_papers(url)
save_cached_data(new_data, cache_file)
return new_data
# ๐Ÿ“Š Data Processing and Display Magic ๐Ÿ“Š
def format_dataframe(data):
"""Format data into a pretty DataFrame. It's like giving your data a makeover! ๐Ÿ’…๐Ÿ“ˆ"""
df = pd.DataFrame(data).T
df['title'] = df.index
df = df[['title', 'Github Star', 'link', 'pdf_link']]
df = df.sort_values(by='Github Star', ascending=False)
df['link'] = df['link'].apply(lambda x: f'<a href="{x}" target="_blank">Link</a>')
df['pdf_link'] = df['pdf_link'].apply(lambda x: f'<a href="{x}" target="_blank">{x}</a>')
return df
def update_display(category):
"""Update the display for a category. Freshen up your data like it's spring cleaning! ๐Ÿงน๐ŸŒธ"""
cache_file = f"{category}_papers_cache.json"
url = f"https://paperswithcode.com/{category}" if category != "top" else "https://paperswithcode.com/"
data = load_and_cache_data(url, cache_file)
df = format_dataframe(data)
return len(df), df.to_html(escape=False, index=False)
def load_all_data():
"""Load data for all categories. It's like a buffet for your brain! ๐Ÿง ๐Ÿฝ๏ธ"""
top_count, top_html = update_display("top")
new_count, new_html = update_display("latest")
greatest_count, greatest_html = update_display("greatest")
return top_count, top_html, new_count, new_html, greatest_count, greatest_html
# ๐Ÿš€ Asynchronous Paper Processing Wizardry ๐Ÿš€
async def download_and_process_pdf(session, title, paper_info, directory):
"""Download and process a PDF. It's like turning lead into gold, but with papers! ๐Ÿ“œโœจ"""
pdf_url = paper_info['pdf_link']
if not pdf_url:
return f"No PDF link available for: {title}", None, None
try:
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout
async with session.get(pdf_url, timeout=timeout) as response:
if response.status != 200:
return f"Failed to download PDF for {title}: HTTP {response.status}", None, None
pdf_content = await response.read()
file_length = len(pdf_content)
if file_length < 5000: # Check if the PDF is less than 5KB
return f"Downloaded PDF for {title} is too small ({file_length} bytes). Skipping.", None, None
# Convert PDF to text
pdf_file = io.BytesIO(pdf_content)
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
if len(text) < 5000: # Check if the extracted text is less than 5KB
return f"Extracted text for {title} is too small ({len(text)} characters). Skipping.", None, None
safe_title = safe_filename(title)
txt_filename = f"{safe_title}.txt"
txt_filepath = os.path.join(directory, txt_filename)
async with aiofiles.open(txt_filepath, 'w', encoding='utf-8') as f:
await f.write(text)
return f"Successfully processed: {txt_filename} (File length: {file_length} bytes)", txt_filepath, text
except asyncio.TimeoutError:
return f"Timeout while downloading PDF for {title}", None, None
except Exception as e:
return f"Error processing PDF for {title}: {str(e)}", None, None
async def process_papers(data, directory, progress=gr.Progress()):
"""Process multiple papers asynchronously. It's like juggling papers, but faster! ๐Ÿคนโ€โ™‚๏ธ๐Ÿ“š"""
async with aiohttp.ClientSession() as session:
tasks = []
for title, paper_info in data.items():
task = asyncio.ensure_future(download_and_process_pdf(session, title, paper_info, directory))
tasks.append(task)
results = []
successful_downloads = []
errors = []
for i, task in enumerate(asyncio.as_completed(tasks), start=1):
result, filepath, text = await task
results.append(result)
if filepath and text:
successful_downloads.append((filepath, text))
else:
errors.append(result)
progress(i / len(tasks), f"Processed {i}/{len(tasks)} papers")
return results, successful_downloads, errors
def download_all_papers(progress=gr.Progress()):
"""Download and process all papers. It's like hosting a paper party, and everyone's invited! ๐ŸŽ‰๐Ÿ“š"""
all_data = {}
for category in ["top", "latest", "greatest"]:
cache_file = f"{category}_papers_cache.json"
data = load_cached_data(cache_file)
if data:
all_data.update(data)
date_directory = create_date_directory()
results, successful_downloads, errors = asyncio.run(process_papers(all_data, date_directory, progress))
summary = f"Papers processed: {len(all_data)}\n"
summary += f"Successfully downloaded and converted: {len(successful_downloads)}\n"
summary += f"Errors: {len(errors)}\n\n"
summary += "Error List:\n" + "\n".join(errors) if errors else "No errors occurred."
download_links = []
text_contents = []
for filepath, text in successful_downloads:
filename = os.path.basename(filepath)
download_links.append(get_base64_download_link(text, filename))
text_contents.append(f"--- {filename} ---\n\n{text[:1000]}...\n\n") # Show first 1000 characters
return summary, "<br>".join(download_links), "\n".join(text_contents)
# ๐ŸŽญ Gradio Interface: Where the Magic Happens ๐ŸŽญ
with gr.Blocks() as demo:
gr.Markdown("<h1><center>๐Ÿ“–โš™๏ธPaperPulse๐Ÿ“๐ŸŒ - Your ๐Ÿ“šAI๐Ÿ”Research๐Ÿ”ฅAccelerator๐Ÿš€</center></h1>")
with gr.Tab("๐Ÿง™โ€โ™‚๏ธTop๐Ÿ•ต๏ธโ€โ™‚๏ธTrending๐Ÿ“–Papers"):
top_count = gr.Textbox(label="Number of Papers Fetched")
top_html = gr.HTML()
top_button = gr.Button("Refresh Leaderboard")
top_button.click(fn=lambda: update_display("top"), inputs=None, outputs=[top_count, top_html])
with gr.Tab("๐ŸคดNew๐Ÿ“–PapersWithCode๐Ÿค–"):
new_count = gr.Textbox(label="Number of Papers Fetched")
new_html = gr.HTML()
new_button = gr.Button("Refresh Leaderboard")
new_button.click(fn=lambda: update_display("latest"), inputs=None, outputs=[new_count, new_html])
with gr.Tab("โœจGithub๐Ÿ™Superstars๐Ÿ’ซ"):
greatest_count = gr.Textbox(label="Number of Papers Fetched")
greatest_html = gr.HTML()
greatest_button = gr.Button("Refresh Leaderboard")
greatest_button.click(fn=lambda: update_display("greatest"), inputs=None, outputs=[greatest_count, greatest_html])
download_button = gr.Button("๐Ÿ“š Download All Papers", variant="primary")
download_output = gr.Textbox(label="Download Status")
download_links = gr.HTML(label="Download Links")
text_output = gr.Code(label="Paper Contents", language="python")
download_button.click(fn=download_all_papers, inputs=None, outputs=[download_output, download_links, text_output])
# Load initial data for all tabs
demo.load(fn=load_all_data, outputs=[top_count, top_html, new_count, new_html, greatest_count, greatest_html])
# ๐Ÿš€ Launch the Gradio interface with a public link
demo.launch(share=True)