Spaces:
Sleeping
Sleeping
import re | |
import nltk | |
import spacy | |
import fitz # PyMuPDF | |
from transformers import pipeline | |
import textwrap | |
import gradio as gr | |
import spacy | |
# Download NLTK punkt if not already done | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
# Load spaCy model | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
except Exception as e: | |
spacy.cli.download("en_core_web_sm") | |
nlp = spacy.load("en_core_web_sm") | |
# Initialize the BigBird-Pegasus summarization pipeline for PubMed texts | |
summarizer = pipeline("summarization", model="google/bigbird-pegasus-large-pubmed") | |
# Helper Function: Read PDF with Content Filter | |
def read_pdf_with_content_filter(file_path, keywords=["Abstract", "Introduction", "Methods", "Results", "Conclusions"]): | |
""" | |
Reads a PDF file and returns text only from pages that contain one of the specified keywords. | |
This helps exclude pages that mainly contain header/metadata. | |
""" | |
doc = fitz.open(file_path) | |
content_pages = [] | |
for i in range(len(doc)): | |
page_text = doc[i].get_text() | |
if any(keyword.lower() in page_text.lower() for keyword in keywords): | |
content_pages.append(page_text) | |
return "\n".join(content_pages) | |
# Helper Function: Clean Text | |
def clean_text(text): | |
""" | |
Cleans the text by removing citations, extra whitespace, and unwanted characters. | |
""" | |
text = re.sub(r'\[\d+\]', '', text) # Remove citations like [12] | |
text = re.sub(r'\(\d+\)', '', text) # Remove citations like (3) | |
text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
return text.strip() | |
# Helper Function: Extract Core Sections | |
def extract_core_sections(text): | |
""" | |
Attempts to extract core sections using common headings. | |
Returns a dictionary with section name (lowercase) as key and its content as value. | |
""" | |
pattern = r'(?i)(Abstract|Introduction|Methods|Results|Conclusions|Discussion)\s*[:\n\.]' | |
splits = re.split(pattern, text) | |
sections = {} | |
if len(splits) > 1: | |
for i in range(1, len(splits), 2): | |
heading = splits[i].strip().lower() | |
content = splits[i+1].strip() if i+1 < len(splits) else "" | |
sections[heading] = content | |
return sections | |
# Helper Function: Remove Header Metadata | |
def remove_header_metadata(text, marker="Competing Interests:"): | |
""" | |
Removes header/metadata from the text by using a marker. | |
If the marker is found, returns text after it; otherwise, returns the original text. | |
""" | |
idx = text.find(marker) | |
if idx != -1: | |
return text[idx + len(marker):].strip() | |
return text | |
# Helper Function: Split Text into Chunks | |
def split_into_chunks(text, chunk_size=500): | |
""" | |
Splits the text into chunks of approximately chunk_size words. | |
""" | |
words = text.split() | |
chunks = [] | |
for i in range(0, len(words), chunk_size): | |
chunk = " ".join(words[i:i+chunk_size]) | |
chunks.append(chunk) | |
return chunks | |
# Helper Function: Summarize Text | |
def summarize_text(text, max_length=200, min_length=50): | |
""" | |
Summarizes the given text using BigBird-Pegasus. | |
Adjusts output lengths if the input is very short. | |
""" | |
input_length = len(text.split()) | |
if input_length < 60: | |
max_length = min(max_length, 40) | |
min_length = min(min_length, 10) | |
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) | |
return summary[0]['summary_text'] | |
# Helper Function: Format Bullet Points | |
def format_bullet_points(summary): | |
""" | |
Splits the summary into sentences and formats each as a bullet point. | |
""" | |
sentences = nltk.sent_tokenize(summary) | |
bullets = ["- " + sentence for sentence in sentences] | |
return "\n".join(bullets) | |
# Helper Function: Convert Bullets to Wrapped Paragraph | |
def bullet_to_paragraph_wrapped(bullet_text, width=80): | |
""" | |
Converts bullet point summary into a paragraph and wraps the text to a specified width. | |
""" | |
paragraph = bullet_text.replace("- ", "").replace("<n>", " ") | |
paragraph = re.sub(r'\s+', ' ', paragraph).strip() | |
wrapped_paragraph = textwrap.fill(paragraph, width=width) | |
return wrapped_paragraph | |
# Process PDF Function (Gradio Interface) | |
def process_pdf(file_obj): | |
""" | |
Processes the uploaded PDF file and returns a bullet summary and a wrapped paragraph summary. | |
""" | |
# file_obj is a temporary file path provided by Gradio | |
full_text = read_pdf_with_content_filter(file_obj.name) | |
cleaned_text = clean_text(full_text) | |
sections = extract_core_sections(cleaned_text) | |
if not sections: | |
core_text = remove_header_metadata(cleaned_text) | |
else: | |
order = ['abstract', 'introduction', 'methods', 'results', 'conclusions', 'discussion'] | |
core_content = [sections[sec] for sec in order if sec in sections] | |
core_text = " ".join(core_content) if core_content else cleaned_text | |
chunks = split_into_chunks(core_text, chunk_size=500) | |
chunk_summaries = [] | |
for chunk in chunks: | |
try: | |
chunk_summary = summarize_text(chunk, max_length=200, min_length=50) | |
except Exception as e: | |
chunk_summary = "" | |
chunk_summaries.append(chunk_summary) | |
final_core_summary_text = " ".join(chunk_summaries) | |
final_summary = summarize_text(final_core_summary_text, max_length=200, min_length=50) | |
bullet_points = format_bullet_points(final_summary) | |
paragraph_summary_wrapped = bullet_to_paragraph_wrapped(bullet_points, width=80) | |
return bullet_points, paragraph_summary_wrapped | |
# Create Gradio Interface | |
iface = gr.Interface( | |
fn=process_pdf, | |
inputs=gr.File(label="Upload a Medical PDF"), | |
outputs=[ | |
gr.Textbox(label="Bullet Summary"), | |
gr.Textbox(label="Paragraph Summary") | |
], | |
title="Medical Document Summarization", | |
description="Upload a medical PDF document to get a summarized bullet-point and paragraph summary of its core content." | |
) | |
if __name__ == "__main__": | |
iface.launch() |