Spaces:
Runtime error
Runtime error
from flask import Flask, request, jsonify | |
import os | |
import pdfplumber | |
import pytesseract | |
from PIL import Image | |
from transformers import PegasusForConditionalGeneration, PegasusTokenizer | |
import torch | |
import logging | |
app = Flask(__name__) | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load Pegasus Model (load once globally) | |
logger.info("Loading Pegasus model and tokenizer...") | |
tokenizer = PegasusTokenizer.from_pretrained("google/pegasus-xsum") | |
model = PegasusForConditionalGeneration.from_pretrained("google/pegasus-xsum").to("cpu") # Force CPU to manage memory | |
logger.info("Model loaded successfully.") | |
# Extract text from PDF with page limit | |
def extract_text_from_pdf(file_path, max_pages=5): | |
text = "" | |
try: | |
with pdfplumber.open(file_path) as pdf: | |
total_pages = len(pdf.pages) | |
pages_to_process = min(total_pages, max_pages) | |
logger.info(f"Extracting text from {pages_to_process} of {total_pages} pages in {file_path}") | |
for i, page in enumerate(pdf.pages[:pages_to_process]): | |
try: | |
extracted = page.extract_text() | |
if extracted: | |
text += extracted + "\n" | |
else: | |
logger.info(f"No text on page {i+1}, attempting OCR...") | |
image = page.to_image().original | |
text += pytesseract.image_to_string(image) + "\n" | |
except Exception as e: | |
logger.warning(f"Error processing page {i+1}: {e}") | |
continue | |
except Exception as e: | |
logger.error(f"Failed to process PDF {file_path}: {e}") | |
return "" | |
return text.strip() | |
# Extract text from image (OCR) | |
def extract_text_from_image(file_path): | |
try: | |
logger.info(f"Extracting text from image {file_path} using OCR...") | |
image = Image.open(file_path) | |
text = pytesseract.image_to_string(image) | |
return text.strip() | |
except Exception as e: | |
logger.error(f"Failed to process image {file_path}: {e}") | |
return "" | |
# Summarize text with chunking for large inputs | |
def summarize_text(text, max_input_length=512, max_output_length=150): | |
try: | |
logger.info("Summarizing text...") | |
# Tokenize and truncate to max_input_length | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=max_input_length, padding=True) | |
input_length = inputs["input_ids"].shape[1] | |
logger.info(f"Input length: {input_length} tokens") | |
# Adjust generation params for efficiency | |
summary_ids = model.generate( | |
inputs["input_ids"], | |
max_length=max_output_length, | |
min_length=30, | |
num_beams=2, # Reduce beams for speedup | |
early_stopping=True, | |
length_penalty=1.0, # Encourage shorter outputs | |
) | |
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
logger.info("Summarization completed.") | |
return summary | |
except Exception as e: | |
logger.error(f"Error during summarization: {e}") | |
return "" | |
def summarize_document(): | |
if 'file' not in request.files: | |
logger.error("No file uploaded in request.") | |
return jsonify({"error": "No file uploaded"}), 400 | |
file = request.files['file'] | |
filename = file.filename | |
if not filename: | |
logger.error("Empty filename in request.") | |
return jsonify({"error": "No file uploaded"}), 400 | |
file_path = os.path.join("/tmp", filename) | |
try: | |
file.save(file_path) | |
logger.info(f"File saved to {file_path}") | |
if filename.lower().endswith('.pdf'): | |
text = extract_text_from_pdf(file_path, max_pages=2) # Reduce to 2 pages | |
elif filename.lower().endswith(('.png', '.jpeg', '.jpg')): | |
text = extract_text_from_image(file_path) | |
else: | |
logger.error(f"Unsupported file format: {filename}") | |
return jsonify({"error": "Unsupported file format. Use PDF, PNG, JPEG, or JPG"}), 400 | |
if not text: | |
logger.warning(f"No text extracted from {filename}") | |
return jsonify({"error": "No text extracted from the file"}), 400 | |
summary = summarize_text(text) | |
if not summary: | |
logger.warning("Summarization failed to produce output.") | |
return jsonify({"error": "Failed to generate summary"}), 500 | |
logger.info(f"Summary generated for {filename}") | |
return jsonify({"summary": summary}) | |
except Exception as e: | |
logger.error(f"Unexpected error processing {filename}: {e}") | |
return jsonify({"error": str(e)}), 500 | |
finally: | |
if os.path.exists(file_path): | |
try: | |
os.remove(file_path) | |
logger.info(f"Cleaned up file: {file_path}") | |
except Exception as e: | |
logger.warning(f"Failed to delete {file_path}: {e}") | |
if __name__ == '__main__': | |
logger.info("Starting Flask app...") | |
app.run(host='0.0.0.0', port=7860) | |
# --------------------------------- | |
# import os | |
# import pdfplumber | |
# from PIL import Image | |
# import pytesseract | |
# import transformers | |
# from transformers import logging | |
# logging.set_verbosity_error() | |
# import numpy as np | |
# from flask import Flask, request, jsonify | |
# from flask_cors import CORS | |
# from transformers import PegasusForConditionalGeneration, PegasusTokenizer, BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments | |
# from datasets import load_dataset, concatenate_datasets | |
# import torch | |
# from sklearn.feature_extraction.text import TfidfVectorizer | |
# from sklearn.metrics.pairwise import cosine_similarity | |
# app = Flask(__name__) | |
# CORS(app) | |
# UPLOAD_FOLDER = 'uploads' | |
# PEGASUS_MODEL_DIR = 'fine_tuned_pegasus' | |
# BERT_MODEL_DIR = 'fine_tuned_bert' | |
# LEGALBERT_MODEL_DIR = 'fine_tuned_legalbert' | |
# MAX_FILE_SIZE = 100 * 1024 * 1024 | |
# os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
# transformers.logging.set_verbosity_error() | |
# os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
# # Pegasus Fine-Tuning | |
# def load_or_finetune_pegasus(): | |
# if os.path.exists(PEGASUS_MODEL_DIR): | |
# print("Loading fine-tuned Pegasus model...") | |
# tokenizer = PegasusTokenizer.from_pretrained(PEGASUS_MODEL_DIR) | |
# model = PegasusForConditionalGeneration.from_pretrained(PEGASUS_MODEL_DIR) | |
# else: | |
# print("Fine-tuning Pegasus on CNN/Daily Mail and XSUM...") | |
# tokenizer = PegasusTokenizer.from_pretrained("google/pegasus-xsum") | |
# model = PegasusForConditionalGeneration.from_pretrained("google/pegasus-xsum") | |
# # Load and combine datasets | |
# cnn_dm = load_dataset("cnn_dailymail", "3.0.0", split="train[:5000]") # 5K samples | |
# xsum = load_dataset("xsum", split="train[:5000]") # 5K samples | |
# combined_dataset = concatenate_datasets([cnn_dm, xsum]) | |
# def preprocess_function(examples): | |
# inputs = tokenizer(examples["article"] if "article" in examples else examples["document"], | |
# max_length=512, truncation=True, padding="max_length") | |
# targets = tokenizer(examples["highlights"] if "highlights" in examples else examples["summary"], | |
# max_length=400, truncation=True, padding="max_length") | |
# inputs["labels"] = targets["input_ids"] | |
# return inputs | |
# tokenized_dataset = combined_dataset.map(preprocess_function, batched=True) | |
# train_dataset = tokenized_dataset.select(range(8000)) # 80% | |
# eval_dataset = tokenized_dataset.select(range(8000, 10000)) # 20% | |
# training_args = TrainingArguments( | |
# output_dir="./pegasus_finetune", | |
# num_train_epochs=3, # Increased for better fine-tuning | |
# per_device_train_batch_size=1, | |
# per_device_eval_batch_size=1, | |
# warmup_steps=500, | |
# weight_decay=0.01, | |
# logging_dir="./logs", | |
# logging_steps=10, | |
# eval_strategy="epoch", | |
# save_strategy="epoch", | |
# load_best_model_at_end=True, | |
# ) | |
# trainer = Trainer( | |
# model=model, | |
# args=training_args, | |
# train_dataset=train_dataset, | |
# eval_dataset=eval_dataset, | |
# ) | |
# trainer.train() | |
# trainer.save_model(PEGASUS_MODEL_DIR) | |
# tokenizer.save_pretrained(PEGASUS_MODEL_DIR) | |
# print(f"Fine-tuned Pegasus saved to {PEGASUS_MODEL_DIR}") | |
# return tokenizer, model | |
# # BERT Fine-Tuning | |
# def load_or_finetune_bert(): | |
# if os.path.exists(BERT_MODEL_DIR): | |
# print("Loading fine-tuned BERT model...") | |
# tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_DIR) | |
# model = BertForSequenceClassification.from_pretrained(BERT_MODEL_DIR, num_labels=2) | |
# else: | |
# print("Fine-tuning BERT on CNN/Daily Mail for extractive summarization...") | |
# tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") | |
# model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2) | |
# # Load dataset and preprocess for sentence classification | |
# cnn_dm = load_dataset("cnn_dailymail", "3.0.0", split="train[:5000]") | |
# def preprocess_for_extractive(examples): | |
# sentences = [] | |
# labels = [] | |
# for article, highlights in zip(examples["article"], examples["highlights"]): | |
# article_sents = article.split(". ") | |
# highlight_sents = highlights.split(". ") | |
# for sent in article_sents: | |
# if sent.strip(): | |
# # Label as 1 if sentence is similar to any highlight, else 0 | |
# is_summary = any(sent.strip() in h for h in highlight_sents) | |
# sentences.append(sent) | |
# labels.append(1 if is_summary else 0) | |
# return {"sentence": sentences, "label": labels} | |
# dataset = cnn_dm.map(preprocess_for_extractive, batched=True, remove_columns=["article", "highlights", "id"]) | |
# tokenized_dataset = dataset.map( | |
# lambda x: tokenizer(x["sentence"], max_length=512, truncation=True, padding="max_length"), | |
# batched=True | |
# ) | |
# tokenized_dataset = tokenized_dataset.remove_columns(["sentence"]) | |
# train_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset)))) | |
# eval_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset)), len(tokenized_dataset))) | |
# training_args = TrainingArguments( | |
# output_dir="./bert_finetune", | |
# num_train_epochs=3, | |
# per_device_train_batch_size=8, | |
# per_device_eval_batch_size=8, | |
# warmup_steps=500, | |
# weight_decay=0.01, | |
# logging_dir="./logs", | |
# logging_steps=10, | |
# eval_strategy="epoch", | |
# save_strategy="epoch", | |
# load_best_model_at_end=True, | |
# ) | |
# trainer = Trainer( | |
# model=model, | |
# args=training_args, | |
# train_dataset=train_dataset, | |
# eval_dataset=eval_dataset, | |
# ) | |
# trainer.train() | |
# trainer.save_model(BERT_MODEL_DIR) | |
# tokenizer.save_pretrained(BERT_MODEL_DIR) | |
# print(f"Fine-tuned BERT saved to {BERT_MODEL_DIR}") | |
# return tokenizer, model | |
# # LegalBERT Fine-Tuning | |
# def load_or_finetune_legalbert(): | |
# if os.path.exists(LEGALBERT_MODEL_DIR): | |
# print("Loading fine-tuned LegalBERT model...") | |
# tokenizer = BertTokenizer.from_pretrained(LEGALBERT_MODEL_DIR) | |
# model = BertForSequenceClassification.from_pretrained(LEGALBERT_MODEL_DIR, num_labels=2) | |
# else: | |
# print("Fine-tuning LegalBERT on Billsum for extractive summarization...") | |
# tokenizer = BertTokenizer.from_pretrained("nlpaueb/legal-bert-base-uncased") | |
# model = BertForSequenceClassification.from_pretrained("nlpaueb/legal-bert-base-uncased", num_labels=2) | |
# # Load dataset | |
# billsum = load_dataset("billsum", split="train[:5000]") | |
# def preprocess_for_extractive(examples): | |
# sentences = [] | |
# labels = [] | |
# for text, summary in zip(examples["text"], examples["summary"]): | |
# text_sents = text.split(". ") | |
# summary_sents = summary.split(". ") | |
# for sent in text_sents: | |
# if sent.strip(): | |
# is_summary = any(sent.strip() in s for s in summary_sents) | |
# sentences.append(sent) | |
# labels.append(1 if is_summary else 0) | |
# return {"sentence": sentences, "label": labels} | |
# dataset = billsum.map(preprocess_for_extractive, batched=True, remove_columns=["text", "summary", "title"]) | |
# tokenized_dataset = dataset.map( | |
# lambda x: tokenizer(x["sentence"], max_length=512, truncation=True, padding="max_length"), | |
# batched=True | |
# ) | |
# tokenized_dataset = tokenized_dataset.remove_columns(["sentence"]) | |
# train_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset)))) | |
# eval_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset)), len(tokenized_dataset))) | |
# training_args = TrainingArguments( | |
# output_dir="./legalbert_finetune", | |
# num_train_epochs=3, | |
# per_device_train_batch_size=8, | |
# per_device_eval_batch_size=8, | |
# warmup_steps=500, | |
# weight_decay=0.01, | |
# logging_dir="./logs", | |
# logging_steps=10, | |
# eval_strategy="epoch", | |
# save_strategy="epoch", | |
# load_best_model_at_end=True, | |
# ) | |
# trainer = Trainer( | |
# model=model, | |
# args=training_args, | |
# train_dataset=train_dataset, | |
# eval_dataset=eval_dataset, | |
# ) | |
# trainer.train() | |
# trainer.save_model(LEGALBERT_MODEL_DIR) | |
# tokenizer.save_pretrained(LEGALBERT_MODEL_DIR) | |
# print(f"Fine-tuned LegalBERT saved to {LEGALBERT_MODEL_DIR}") | |
# return tokenizer, model | |
# # Load models | |
# # pegasus_tokenizer, pegasus_model = load_or_finetune_pegasus() | |
# # bert_tokenizer, bert_model = load_or_finetune_bert() | |
# # legalbert_tokenizer, legalbert_model = load_or_finetune_legalbert() | |
# def extract_text_from_pdf(file_path): | |
# text = "" | |
# with pdfplumber.open(file_path) as pdf: | |
# for page in pdf.pages: | |
# text += page.extract_text() or "" | |
# return text | |
# def extract_text_from_image(file_path): | |
# image = Image.open(file_path) | |
# text = pytesseract.image_to_string(image) | |
# return text | |
# def choose_model(text): | |
# legal_keywords = ["court", "legal", "law", "judgment", "contract", "statute", "case"] | |
# tfidf = TfidfVectorizer(vocabulary=legal_keywords) | |
# tfidf_matrix = tfidf.fit_transform([text.lower()]) | |
# score = np.sum(tfidf_matrix.toarray()) | |
# if score > 0.1: | |
# return "legalbert" | |
# elif len(text.split()) > 50: | |
# return "pegasus" | |
# else: | |
# return "bert" | |
# def summarize_with_pegasus(text): | |
# inputs = pegasus_tokenizer(text, truncation=True, padding="longest", return_tensors="pt", max_length=512) | |
# summary_ids = pegasus_model.generate( | |
# inputs["input_ids"], | |
# max_length=400, min_length=80, length_penalty=1.5, num_beams=4 | |
# ) | |
# return pegasus_tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
# def summarize_with_bert(text): | |
# sentences = text.split(". ") | |
# if len(sentences) < 6: # Ensure enough for 5 sentences | |
# return text | |
# inputs = bert_tokenizer(sentences, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
# with torch.no_grad(): | |
# outputs = bert_model(**inputs) | |
# logits = outputs.logits | |
# probs = torch.softmax(logits, dim=1)[:, 1] # Probability of being a summary sentence | |
# key_sentence_idx = probs.argsort(descending=True)[:5] # Top 5 sentences | |
# return ". ".join([sentences[idx] for idx in key_sentence_idx if sentences[idx].strip()]) | |
# def summarize_with_legalbert(text): | |
# sentences = text.split(". ") | |
# if len(sentences) < 6: | |
# return text | |
# inputs = legalbert_tokenizer(sentences, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
# with torch.no_grad(): | |
# outputs = legalbert_model(**inputs) | |
# logits = outputs.logits | |
# probs = torch.softmax(logits, dim=1)[:, 1] | |
# key_sentence_idx = probs.argsort(descending=True)[:5] | |
# return ". ".join([sentences[idx] for idx in key_sentence_idx if sentences[idx].strip()]) | |
# # Load Models | |
# pegasus_tokenizer, pegasus_model = load_or_finetune_pegasus() | |
# bert_tokenizer, bert_model = load_or_finetune_bert() | |
# legalbert_tokenizer, legalbert_model = load_or_finetune_legalbert() | |
# @app.route('/summarize', methods=['POST']) | |
# def summarize_document(): | |
# if 'file' not in request.files: | |
# return jsonify({"error": "No file uploaded"}), 400 | |
# file = request.files['file'] | |
# filename = file.filename | |
# file.seek(0, os.SEEK_END) | |
# file_size = file.tell() | |
# if file_size > MAX_FILE_SIZE: | |
# return jsonify({"error": f"File size exceeds {MAX_FILE_SIZE // (1024 * 1024)} MB"}), 413 | |
# file.seek(0) | |
# file_path = os.path.join(UPLOAD_FOLDER, filename) | |
# try: | |
# file.save(file_path) | |
# except Exception as e: | |
# return jsonify({"error": f"Failed to save file: {str(e)}"}), 500 | |
# try: | |
# if filename.endswith('.pdf'): | |
# text = extract_text_from_pdf(file_path) | |
# elif filename.endswith(('.png', '.jpeg', '.jpg')): | |
# text = extract_text_from_image(file_path) | |
# else: | |
# os.remove(file_path) | |
# return jsonify({"error": "Unsupported file format."}), 400 | |
# except Exception as e: | |
# os.remove(file_path) | |
# return jsonify({"error": f"Text extraction failed: {str(e)}"}), 500 | |
# if not text.strip(): | |
# os.remove(file_path) | |
# return jsonify({"error": "No text extracted"}), 400 | |
# try: | |
# model = choose_model(text) | |
# if model == "pegasus": | |
# summary = summarize_with_pegasus(text) | |
# elif model == "bert": | |
# summary = summarize_with_bert(text) | |
# elif model == "legalbert": | |
# summary = summarize_with_legalbert(text) | |
# except Exception as e: | |
# os.remove(file_path) | |
# return jsonify({"error": f"Summarization failed: {str(e)}"}), 500 | |
# os.remove(file_path) | |
# return jsonify({"model_used": model, "summary": summary}) | |
# if __name__ == '__main__': | |
# port = int(os.environ.get("PORT", 5000)) | |
# app.run(debug=False, host='0.0.0.0', port=port) |