from pdfminer.high_level import extract_text from pdf2image import convert_from_path # Convert PDF pages to images import base64 import io import os from PIL import Image import json from openai import OpenAI from dotenv import load_dotenv import gradio as gr load_dotenv() client = OpenAI() # Function to encode image to Base64 def encode_image(image_input): """ Encode an image to Base64. Supports both file paths (str) and in-memory PIL images. """ if isinstance(image_input, str): # If input is a file path with open(image_input, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") elif isinstance(image_input, Image.Image): # If input is a PIL image buffered = io.BytesIO() image_input.save(buffered, format="JPEG") return base64.b64encode(buffered.getvalue()).decode("utf-8") else: raise ValueError("Unsupported input type. Provide a file path or a PIL image.") # Function to process image files def process_image(image_path): print(f"🖼️ Processing image file: {image_path}") image_base64 = encode_image(image_path) image_url = f"data:image/jpeg;base64,{image_base64}" response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ {"type": "text", "text": "Extract all text from this image."}, {"type": "image_url", "image_url": {"url": image_url}}, ], } ], ) extracted_text = response.choices[0].message.content.strip() # print(f"📝 Extracted text: {extracted_text}") return extracted_text # Function to process text-based PDFs def process_text_pdf(pdf_path): text_content = extract_text(pdf_path).strip() if text_content: print(f"📄 Extracting text from PDF: {pdf_path}") return text_content return None # No text found, fallback to image processing # Function to process scanned PDFs (image-based) def process_image_pdf(pdf_path): print(f"🖼️ No text found! Processing as an image-based (scanned) PDF: {pdf_path}") images = convert_from_path(pdf_path) extracted_text = [] for i, image in enumerate(images): image_text = process_image(image) extracted_text.append(image_text) return "\n\n".join(extracted_text) # Function to detect file type and extract text accordingly def process_file(file_path): if not os.path.exists(file_path): print(f"❌ Error: File not found: {file_path}") return None file_extension = file_path.lower().split(".")[-1] if file_extension in ["jpg", "jpeg", "png"]: return process_image(file_path) # Process images elif file_extension == "pdf": text_data = process_text_pdf(file_path) if text_data: # If text extraction succeeds, return it return text_data return process_image_pdf(file_path) # Otherwise, process as image else: print(f"❌ Unsupported file type: {file_path}") return None def extract_certificate_details(certificate_path): certificate_text = process_file(certificate_path) print(f"🖼️ Extracting details from certificate: {certificate_path}") if not certificate_text: print(f"❌ Error: Certificate text could not be extracted from {certificate_path}") return None # Ask GPT-4o to extract the details response = client.chat.completions.create( model="gpt-4o", response_format={ "type": "json_object" }, seed=123, temperature=0, messages=[ { "role": "developer", "content": f"""Extract the following details from the certificate text in JSON format, leave blank if not found: {{ "Certificate Name": "", "Certificate ID": "", "Ship Name": "", "Date of Issue": "", "Expiration Date": "" }} Certificate Text: {certificate_text} """ } ], ) result = response.choices[0].message.content result_json = json.loads(result) # Parse the result as JSON certificate_name = result_json.get("Certificate Name", "") certificate_id = result_json.get("Certificate ID", "") ship_name = result_json.get("Ship Name", "") date_of_issue = result_json.get("Date of Issue", "") expiration_date = result_json.get("Expiration Date", "") print(f"✅ Extracted details:\n- Certificate Name: {certificate_name}\n- Certificate ID: {certificate_id}\n- Ship Name: {ship_name}\n- Date of Issue: {date_of_issue}\n- Expiration Date: {expiration_date}") return { "Certificate Name": certificate_name, "Certificate ID": certificate_id, "Ship Name": ship_name, "Date of Issue": date_of_issue, "Expiration Date": expiration_date, "Certificate Text": certificate_text } # Function to compare two certificates using AI def compare_certificates(new_cert_details, old_cert_details): # Ask GPT-4o to compare the texts response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": f"""Compare the two certificates below and provide a structured summary highlighting key differences in the format below: ### Comparison Summary: - Identify differences in terms of: - Certificate ID - Date of Issue - Expiration Date - Highlight any changes in other key details, if applicable. ### Take Note: - Clearly structure the output for easy reading - Do not include any structural changes in the text, only content changes ### Old Certificate: {old_cert_details} ### New Certificate: {new_cert_details}""" } ], ) comparison_result = response.choices[0].message.content.strip() return comparison_result def gradio_process_certificate(certificate, old_cert_details=""): # Process the certificate cert_details = extract_certificate_details(certificate) if not cert_details: return "❌ Failed to extract certificate details." # If old_certificate is provided, compare the certificates if old_cert_details: print(f"🔍 Comparing certificates") # Compare the certificates comparison_result = compare_certificates(cert_details, old_cert_details) # Return both certificate details and comparison result return { "new_certificate": cert_details, "old_certificate": old_cert_details, "comparison": comparison_result } # If only one certificate is provided, return just its details return cert_details # Launch Gradio UI gr.Interface( fn=gradio_process_certificate, inputs=[ gr.File(label="Certificate (PDF or Image)"), gr.Textbox(label="Old Certificate Details (JSON) - Optional") ], outputs=gr.JSON(label="Certificate Details"), title="📜 Certificate Details Extractor", description="Upload a certificate to extract details, or upload two certificates to compare them.", show_progress='full', allow_flagging="never" ).launch()