import requests from bs4 import BeautifulSoup import pandas as pd import gradio as gr import time import os import json import PyPDF2 import io import asyncio import aiohttp import aiofiles import re from datetime import datetime import base64 # ๐Ÿง™โ€โ™‚๏ธ Magical Utility Functions ๐Ÿง™โ€โ™‚๏ธ def safe_filename(title): """Convert a string to a safe filename. No more 'file not found' nightmares! ๐Ÿ™…โ€โ™‚๏ธ๐Ÿ“""" return re.sub(r'[^\w\-_\. ]', '_', title) def create_date_directory(): """Create a directory named with the current date. It's like a time capsule for your downloads! ๐Ÿ—“๏ธ๐Ÿ“ฆ""" date_str = datetime.now().strftime("%Y-%m-%d") os.makedirs(date_str, exist_ok=True) return date_str def get_base64_download_link(content, filename): """Create a base64 download link for text content. It's like teleportation for your files! ๐ŸŒŸ๐Ÿ“ฒ""" b64 = base64.b64encode(content.encode()).decode() return f'Download {filename}' # ๐ŸŽฌ Animated Banner Messages ๐ŸŽฌ def animated_banner(message, emoji): """Create an animated banner message. It's like a tiny parade for your console! ๐ŸŽ‰๐Ÿšฉ""" frames = [ f"โ•”โ•โ•โ•โ• {emoji} โ•โ•โ•โ•โ•—\nโ•‘ {message:^16} โ•‘\nโ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•", f"โ•”โ•โ•โ•โ• {emoji} โ•โ•โ•โ•โ•—\nโ•‘ {message:^16} โ•‘\nโ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•", f"โ•”โ•โ•โ•โ•{emoji}โ•โ•โ•โ•โ•—\nโ•‘ {message:^14} โ•‘\nโ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•", f"โ•”โ•โ•โ•{emoji}โ•โ•โ•โ•—\nโ•‘ {message:^12} โ•‘\nโ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•", f"โ•”โ•โ•{emoji}โ•โ•โ•—\nโ•‘ {message:^10} โ•‘\nโ•šโ•โ•โ•โ•โ•โ•โ•", f"โ•”โ•{emoji}โ•โ•—\nโ•‘ {message:^8} โ•‘\nโ•šโ•โ•โ•โ•โ•", f"โ•”{emoji}โ•—\nโ•‘ {message:^6} โ•‘\nโ•šโ•โ•โ•", ] return frames # ๐Ÿ•ต๏ธโ€โ™‚๏ธ Data Fetching and Caching Shenanigans ๐Ÿ•ต๏ธโ€โ™‚๏ธ def get_rank_papers(url, progress=gr.Progress(track_tqdm=True)): """Fetch papers from the interwebs. It's like fishing, but for knowledge! ๐ŸŽฃ๐Ÿ“š""" base_url = "https://paperswithcode.com" session = requests.Session() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', 'Cache-Control': 'no-cache' } print("Time run at : ", time.ctime()) offset = 0 data_list = {} break_duplicate = 10 while True: response = session.get(url, headers=headers, params={'page': offset}) if response.status_code != 200: print('Failed to retrieve data') break soup = BeautifulSoup(response.text, 'html.parser') paper_info = soup.find_all('div', class_='row infinite-item item paper-card') if not paper_info: print("No paper information found.") break for ppr in paper_info: title = ppr.find('h1').text.strip() if "paper" in ppr.find('a')['href']: link = base_url + ppr.find('a')['href'] else: link = ppr.find('a')['href'] Github_Star = ppr.find('span', class_='badge badge-secondary').text.strip().replace(',', '') if ppr.find('span', class_='badge badge-secondary') else "0" pdf_link = '' try: response_link = session.get(link, headers=headers) soup_link = BeautifulSoup(response_link.text, 'html.parser') paper_info_link = soup_link.find_all('div', class_='paper-abstract') pdf_link = paper_info_link[0].find('div', class_='col-md-12').find('a')['href'] except Exception as e: print(f"Failed to retrieve PDF link for {title}: {e}") print(f"Title: {title}, Link: {link}, Github Star: {Github_Star}, PDF Link: {pdf_link}") if title not in data_list: data_list[title] = {'link': link, 'Github Star': int(Github_Star), 'pdf_link': pdf_link.strip()} else: break_duplicate -= 1 if break_duplicate == 0: return data_list offset += 1 progress.update(offset) print('Data retrieval complete') return data_list def load_cached_data(cache_file): """Load cached data. It's like finding money in your old jeans! ๐Ÿ’ฐ๐Ÿงต""" if os.path.exists(cache_file): with open(cache_file, 'r') as f: return json.load(f) return None def save_cached_data(data, cache_file): """Save data to cache. Future you will thank present you! ๐Ÿฆธโ€โ™‚๏ธ๐Ÿ•ฐ๏ธ""" with open(cache_file, 'w') as f: json.dump(data, f) def load_and_cache_data(url, cache_file): """Load data from cache or fetch new data. It's like a time machine for your data! โฐ๐Ÿ”„""" cached_data = load_cached_data(cache_file) if cached_data: print(f"Loading cached data from {cache_file}") return cached_data print(f"Fetching new data from {url}") new_data = get_rank_papers(url) save_cached_data(new_data, cache_file) return new_data # ๐Ÿ“Š Data Processing and Display Magic ๐Ÿ“Š def format_dataframe(data): """Format data into a pretty DataFrame. It's like giving your data a makeover! ๐Ÿ’…๐Ÿ“ˆ""" if not data: print("No data found to format.") return pd.DataFrame() df = pd.DataFrame(data).T df['title'] = df.index # Check if required columns are present if 'Github Star' in df.columns and 'link' in df.columns and 'pdf_link' in df.columns: df = df[['title', 'Github Star', 'link', 'pdf_link']] df = df.sort_values(by='Github Star', ascending=False) df['link'] = df['link'].apply(lambda x: f'Link') df['pdf_link'] = df['pdf_link'].apply(lambda x: f'{x}') else: print("Required columns are missing in the dataframe.") print(f"Columns available: {df.columns}") return df def update_display(category): """Update the display for a category. Freshen up your data like it's spring cleaning! ๐Ÿงน๐ŸŒธ""" cache_file = f"{category}_papers_cache.json" url = f"https://paperswithcode.com/{category}" if category != "top" else "https://paperswithcode.com/" data = load_and_cache_data(url, cache_file) df = format_dataframe(data) return len(df), df.to_html(escape=False, index=False) def load_all_data(): """Load data for all categories. It's like a buffet for your brain! ๐Ÿง ๐Ÿฝ๏ธ""" top_count, top_html = update_display("top") new_count, new_html = update_display("latest") greatest_count, greatest_html = update_display("greatest") return top_count, top_html, new_count, new_html, greatest_count, greatest_html # ๐Ÿš€ Asynchronous Paper Processing Wizardry ๐Ÿš€ async def download_and_process_pdf(session, title, paper_info, directory): """Download and process a PDF. It's like turning lead into gold, but with papers! ๐Ÿ“œโœจ""" pdf_url = paper_info['pdf_link'] if not pdf_url: return f"๐Ÿšซ No PDF link for: {title}. It's playing hide and seek! ๐Ÿ™ˆ", None, None try: timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout async with session.get(pdf_url, timeout=timeout) as response: if response.status != 200: return f"๐Ÿšจ Failed to grab PDF for {title}: HTTP {response.status}. The internet gremlins strike again! ๐Ÿ‘น", None, None pdf_content = await response.read() file_length = len(pdf_content) if file_length < 5000: # Check if the PDF is less than 5KB return f"๐Ÿœ PDF for {title} is tiny ({file_length} bytes). It's like a paper for ants! ๐Ÿœ๐Ÿ“„", None, None # Convert PDF to text pdf_file = io.BytesIO(pdf_content) pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page in pdf_reader.pages: text += page.extract_text() if len(text) < 5000: # Check if the extracted text is less than 5KB return f"๐Ÿ“‰ Extracted text for {title} is too small ({len(text)} characters). It's not you, it's the PDF! ๐Ÿ’”", None, None safe_title = safe_filename(title) txt_filename = f"{safe_title}.txt" txt_filepath = os.path.join(directory, txt_filename) async with aiofiles.open(txt_filepath, 'w', encoding='utf-8') as f: await f.write(text) return f"๐ŸŽ‰ Successfully processed: {txt_filename} (File length: {file_length} bytes). It's alive! ๐Ÿงฌ", txt_filepath, text except asyncio.TimeoutError: return f"โณ Timeout for {title}. The PDF is playing hard to get! ๐Ÿ’ƒ", None, None except Exception as e: return f"๐Ÿ’ฅ Oops! Error processing {title}: {str(e)}. Gremlins in the system! ๐Ÿ› ๏ธ", None, None async def process_papers(data, directory, progress=gr.Progress()): """Process multiple papers asynchronously. It's like juggling papers, but faster! ๐Ÿคนโ€โ™‚๏ธ๐Ÿ“š""" async with aiohttp.ClientSession() as session: tasks = [] for title, paper_info in data.items(): task = asyncio.ensure_future(download_and_process_pdf(session, title, paper_info, directory)) tasks.append(task) results = [] successful_downloads = [] errors = [] for i, task in enumerate(asyncio.as_completed(tasks), start=1): result, filepath, text = await task results.append(result) if filepath and text: successful_downloads.append((filepath, text)) else: errors.append(result) progress(i / len(tasks), f"๐Ÿš€ Processed {i}/{len(tasks)} papers. Science waits for no one!") # Display animated banner banner_frames = animated_banner("Processing", "๐Ÿ“„") for frame in banner_frames: print(frame, end='\r') await asyncio.sleep(0.1) print(" " * len(banner_frames[-1]), end='\r') # Clear the last frame return results, successful_downloads, errors def download_all_papers(progress=gr.Progress()): """Download and process all papers. It's like hosting a paper party, and everyone's invited! ๐ŸŽ‰๐Ÿ“š""" all_data = {} for category in ["top", "latest", "greatest"]: cache_file = f"{category}_papers_cache.json" data = load_cached_data(cache_file) if data: all_data.update(data) date_directory = create_date_directory() results, successful_downloads, errors = asyncio.run(process_papers(all_data, date_directory, progress)) summary = f"๐Ÿ“Š Papers processed: {len(all_data)} (We're basically librarians now!)\n" summary += f"โœ… Successfully downloaded and converted: {len(successful_downloads)} (Take that, PDF gremlins!)\n" summary += f"โŒ Errors: {len(errors)} (Even superheroes have off days)\n\n" summary += "๐Ÿšจ Error List (AKA 'The Wall of Shame'):\n" + "\n".join(errors) if errors else "No errors occurred. It's a miracle! ๐Ÿ™Œ" download_links = [] text_contents = [] for filepath, text in successful_downloads: filename = os.path.basename(filepath) download_links.append(get_base64_download_link(text, filename)) text_contents.append(f"--- {filename} ---\n\n{text[:1000]}... (There's more, but we don't want to spoil the ending! ๐Ÿ“š๐Ÿ”ฎ)\n\n") return summary, "
".join(download_links), "\n".join(text_contents) # ๐ŸŽญ Gradio Interface: Where the Magic Happens ๐ŸŽญ with gr.Blocks() as demo: gr.Markdown("

Papers Leaderboard

") with gr.Tab("Top Trending Papers"): top_count = gr.Textbox(label="Number of Papers Fetched") top_html = gr.HTML() top_button = gr.Button("Refresh Leaderboard") top_button.click(fn=lambda: update_display("top"), inputs=None, outputs=[top_count, top_html]) with gr.Tab("New Papers"): new_count = gr.Textbox(label="Number of Papers Fetched") new_html = gr.HTML() new_button = gr.Button("Refresh Leaderboard") new_button.click(fn=lambda: update_display("latest"), inputs=None, outputs=[new_count, new_html]) with gr.Tab("Greatest Papers"): greatest_count = gr.Textbox(label="Number of Papers Fetched") greatest_html = gr.HTML() greatest_button = gr.Button("Refresh Leaderboard") greatest_button.click(fn=lambda: update_display("greatest"), inputs=None, outputs=[greatest_count, greatest_html]) download_button = gr.Button("๐Ÿ“š Download All Papers", variant="primary") download_output = gr.Textbox(label="Download Status") download_links = gr.HTML(label="Download Links") # Updated this to gr.Code with Python as language for displaying paper contents text_output = gr.Code(label="Paper Contents", language="python") download_button.click(fn=download_all_papers, inputs=None, outputs=[download_output, download_links, text_output]) # Load initial data for all tabs demo.load(fn=load_all_data, outputs=[top_count, top_html, new_count, new_html, greatest_count, greatest_html]) # ๐Ÿš€ Launch the Gradio interface with a public link print("๐ŸŽญ Launching the Papers Leaderboard! Get ready for a wild ride through the land of academia! ๐ŸŽข๐Ÿ“š") demo.launch(share=True)