import os
import tempfile
import shutil
import torch
import gradio as gr
from pathlib import Path
from typing import Optional, List, Union
import gc
import time
# Docling imports
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption, WordFormatOption, SimplePipeline
# LangChain imports
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
# Transformers imports for IBM Granite model
import spaces
from transformers import AutoTokenizer, AutoModelForCausalLM
# Initialize IBM Granite model and tokenizer
print("Loading Granite model and tokenizer...")
model_name = "ibm-granite/granite-3.3-8b-instruct"
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Load model with optimization for GPU
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype=torch.bfloat16,
load_in_8bit=True # Use 8-bit quantization for memory efficiency
)
print("Model loaded successfully!")
# Helper function to detect document format
def get_document_format(file_path) -> Optional[InputFormat]:
"""Determine the document format based on file extension"""
try:
file_path = str(file_path)
extension = os.path.splitext(file_path)[1].lower()
format_map = {
'.pdf': InputFormat.PDF,
'.docx': InputFormat.DOCX,
'.doc': InputFormat.DOCX,
'.pptx': InputFormat.PPTX,
'.html': InputFormat.HTML,
'.htm': InputFormat.HTML
}
return format_map.get(extension)
except Exception as e:
print(f"Error in get_document_format: {str(e)}")
return None
# Function to convert documents to markdown
def convert_document_to_markdown(doc_path) -> str:
"""Convert document to markdown using simplified pipeline"""
try:
# Convert to absolute path string
input_path = os.path.abspath(str(doc_path))
print(f"Converting document: {doc_path}")
# Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir:
# Copy input file to temp directory
temp_input = os.path.join(temp_dir, os.path.basename(input_path))
shutil.copy2(input_path, temp_input)
# Configure pipeline options
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = False # Disable OCR for performance
pipeline_options.do_table_structure = True
# Create converter with optimized options
converter = DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.DOCX,
InputFormat.HTML,
InputFormat.PPTX,
],
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_options=pipeline_options,
),
InputFormat.DOCX: WordFormatOption(
pipeline_cls=SimplePipeline
)
}
)
# Convert document
print("Starting conversion...")
conv_result = converter.convert(temp_input)
if not conv_result or not conv_result.document:
raise ValueError(f"Failed to convert document: {doc_path}")
# Export to markdown
print("Exporting to markdown...")
md = conv_result.document.export_to_markdown()
# Create output path
output_dir = os.path.dirname(input_path)
base_name = os.path.splitext(os.path.basename(input_path))[0]
md_path = os.path.join(output_dir, f"{base_name}_converted.md")
# Write markdown file
with open(md_path, "w", encoding="utf-8") as fp:
fp.write(md)
return md_path
except Exception as e:
return f"Error converting document: {str(e)}"
# Function to generate a summary using the IBM Granite model
def generate_summary(chunks: List[Document], length_type="sentences", length_count=3):
"""Generate a summary from document chunks using the IBM Granite model
Args:
chunks: List of document chunks to summarize
length_type: Either "sentences" or "paragraphs"
length_count: Number of sentences (1-10) or paragraphs (1-3)
"""
# Concatenate the retrieved chunks
combined_text = " ".join([chunk.page_content for chunk in chunks])
# Construct length instruction based on type and count
if length_type == "sentences":
length_instruction = f"Summarize the following text in {length_count} sentence{'s' if length_count > 1 else ''}."
else: # paragraphs
length_instruction = f"Summarize the following text in {length_count} paragraph{'s' if length_count > 1 else ''}."
# Construct the prompt
prompt = f"""
Knowledge Cutoff Date: April 2024. You are Granite, developed by IBM. You are a helpful AI assistant. {length_instruction} Your response should only include the answer. Do not provide any further explanation.
{combined_text}
"""
# Calculate appropriate max_new_tokens based on length requirements
# Approximate tokens: ~15 tokens per sentence, ~75 tokens per paragraph
if length_type == "sentences":
max_tokens = length_count * 20 # Slightly more than needed for flexibility
else: # paragraphs
max_tokens = length_count * 100 # Slightly more than needed for flexibility
# Ensure minimum tokens and add buffer
max_tokens = max(100, min(1000, max_tokens + 50))
# Generate the summary using the IBM Granite model
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=0.7,
top_p=0.9,
do_sample=True
)
# Decode and return the generated summary
summary = tokenizer.decode(output[0], skip_special_tokens=True)
# Extract just the generated response (after the prompt)
summary = summary[len(tokenizer.decode(inputs["input_ids"][0], skip_special_tokens=True)):]
return summary.strip()
# Function to process document chunks efficiently
def process_document_chunks(texts, batch_size=8):
"""Process document chunks in efficient batches"""
try:
# Create embeddings with optimized settings
embeddings = HuggingFaceEmbeddings(
model_name="nomic-ai/nomic-embed-text-v1",
model_kwargs={'trust_remote_code': True}
)
# Create vector store more efficiently
vectorstore = FAISS.from_documents(
texts,
embeddings,
# Add distance function for better retrieval
distance_strategy="cosine"
)
return vectorstore
except Exception as e:
print(f"Error in document processing: {str(e)}")
# Fallback to basic processing if optimization fails
return FAISS.from_documents(texts, embeddings)
# Main function to process document and generate summary
@spaces.GPU
def process_document(
file_obj: Optional[Union[str, tempfile._TemporaryFileWrapper]] = None,
length_type: str = "sentences",
length_count: int = 3,
progress=gr.Progress()
):
"""Process a document file and generate a summary"""
try:
# Process input file
if not file_obj:
return "Please provide a file to summarize."
document_path = file_obj.name if hasattr(file_obj, 'name') else str(file_obj)
# Validate document format
format_type = get_document_format(document_path)
if not format_type:
return "Unsupported file format. Please upload a PDF, DOCX, PPTX, or HTML file."
# Convert document to markdown
progress(0.3, "Converting document to markdown...")
markdown_path = convert_document_to_markdown(document_path)
if markdown_path.startswith("Error"):
return markdown_path
# Load and split the document
progress(0.4, "Loading and splitting document...")
loader = UnstructuredMarkdownLoader(str(markdown_path))
documents = loader.load()
# Optimize text splitting for better chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Larger chunk size for better context
chunk_overlap=100,
length_function=len,
separators=["\n\n", "\n", ".", " ", ""] # Prioritize splitting at paragraph/sentence boundaries
)
texts = text_splitter.split_documents(documents)
if not texts:
return "No text could be extracted from the document."
# Create vector store with efficient processing
progress(0.6, "Processing document content...")
vectorstore = process_document_chunks(texts)
# Create retriever with optimized settings
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4} # Number of chunks to retrieve
)
# Process chunks in smaller batches for memory efficiency
progress(0.8, "Generating summary...")
all_chunks = []
batch_size = 4 # Smaller batch size for memory efficiency
# Get all document chunks
doc_ids = list(vectorstore.index_to_docstore_id.values())
# Process in smaller batches
for i in range(0, len(doc_ids), batch_size):
batch_ids = doc_ids[i:i+batch_size]
batch_chunks = [vectorstore.docstore.search(doc_id) for doc_id in batch_ids]
all_chunks.extend(batch_chunks)
# Force garbage collection to free memory
gc.collect()
# Sleep briefly to allow memory cleanup
time.sleep(0.1)
# Generate summary from chunks
if len(all_chunks) > 8:
# If we have many chunks, process in batches
summaries = []
for i in range(0, len(all_chunks), batch_size):
batch = all_chunks[i:i+batch_size]
summary = generate_summary(
batch,
length_type=length_type,
length_count=max(1, length_count // 2) # Use smaller count for partial summaries
)
summaries.append(summary)
# Force garbage collection
gc.collect()
# Create final summary from batch summaries
final_summary = generate_summary(
[Document(page_content=s) for s in summaries],
length_type=length_type,
length_count=length_count
)
return final_summary
else:
# If we have few chunks, generate summary directly
return generate_summary(
all_chunks,
length_type=length_type,
length_count=length_count
)
except Exception as e:
return f"Error processing document: {str(e)}"
# Create Gradio interface
def create_gradio_interface():
"""Create and launch the Gradio interface"""
with gr.Blocks(title="Granite Document Summarization") as app:
gr.Markdown("# Granite Document Summarization")
gr.Markdown("Upload a document to generate a summary.")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload Document (PDF, DOCX, PPTX, HTML)",
file_types=[".pdf", ".docx", ".doc", ".pptx", ".html", ".htm"]
)
with gr.Row():
length_type = gr.Radio(
choices=["Sentences", "Paragraphs"],
value="Sentences",
label="Summary Length Type"
)
with gr.Row():
# Use slider for sentence count (1-10)
sentence_count = gr.Slider(
minimum=1,
maximum=10,
value=3,
step=1,
label="Number of Sentences",
visible=True
)
# Use radio for paragraph count (1-3)
paragraph_count = gr.Radio(
choices=["1", "2", "3"],
value="1",
label="Number of Paragraphs",
visible=False
)
submit_btn = gr.Button("Summarize", variant="primary")
with gr.Column(scale=2):
output = gr.TextArea(
label="Summary",
lines=15,
max_lines=30
)
# Add interactivity to show/hide appropriate count selector
def update_count_visibility(length_type):
return {
sentence_count: length_type == "Sentences",
paragraph_count: length_type == "Paragraphs"
}
length_type.change(
fn=update_count_visibility,
inputs=[length_type],
outputs=[sentence_count, paragraph_count]
)
# Function to convert paragraph count from string to int and handle capitalized length types
def process_document_wrapper(file, length_type, sentence_count, paragraph_count):
# Convert capitalized length_type to lowercase for processing
length_type_lower = length_type.lower()
if length_type_lower == "sentences":
return process_document(file, length_type_lower, int(sentence_count))
else:
return process_document(file, length_type_lower, int(paragraph_count))
submit_btn.click(
fn=process_document_wrapper,
inputs=[file_input, length_type, sentence_count, paragraph_count],
outputs=output
)
gr.Markdown("""
## How to use:
1. Upload a document (PDF, DOCX, PPTX, HTML)
2. Choose your summary length preference:
- Number of Sentences (1-10)
- Number of Paragraphs (1-3)
3. Click "Summarize" to process the document
*This application uses the IBM Granite 3.3-8b model to generate summaries.*
""")
return app
# Launch the application
if __name__ == "__main__":
app = create_gradio_interface()
app.launch()