Spaces:
Build error
Build error
import streamlit as st | |
import os | |
from PIL import Image | |
import pymupdf | |
from datetime import datetime | |
import re | |
from utils import setup_logger | |
from .passport import prune_passport_for_display, display_passport | |
from .driving_license import prune_driving_license_for_display, display_driving_license | |
from .bank_statement import prune_bank_statement_for_display, display_bank_statement | |
from .payslip import prune_payslip_for_display, display_payslip | |
from .p60 import prune_p60_for_display, display_p60 | |
from .others import display_others | |
logger = setup_logger(__name__) | |
def load_pdf_as_image(file_path): | |
# Open PDF file | |
doc = pymupdf.Document(file_path) | |
# Get the first page | |
page = doc[0] | |
# Convert to image | |
pix = page.get_pixmap() | |
# Convert to PIL Image | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
return img | |
def generate_metadata(file_path): | |
"""Generate metadata dictionary from file path and properties""" | |
file_stat = os.stat(file_path) | |
file_name = os.path.basename(file_path) | |
parent_dir = os.path.basename(os.path.dirname(file_path)) | |
metadata = { | |
"File Name": file_name, | |
"Directory": parent_dir, | |
"File Size": f"{file_stat.st_size / 1024:.2f} KB", | |
"Last Modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), | |
"Created": datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), | |
"File Extension": os.path.splitext(file_name)[1], | |
"Full Path": file_path | |
} | |
# Add image-specific metadata if it's an image | |
if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): | |
try: | |
with Image.open(file_path) as img: | |
metadata.update({ | |
"Image Size": f"{img.size[0]}x{img.size[1]}", | |
"Image Mode": img.mode, | |
"Image Format": img.format | |
}) | |
except Exception as e: | |
st.error(f"Error reading image metadata: {str(e)}") | |
# Add PDF-specific metadata if it's a PDF | |
elif file_name.lower().endswith('.pdf'): | |
try: | |
doc = pymupdf.Document(file_path) | |
metadata.update({ | |
"Page Count": len(doc), | |
"PDF Version": doc.pdf_version, | |
"Document Info": doc.metadata if doc.metadata else "No PDF metadata available" | |
}) | |
except Exception as e: | |
st.error(f"Error reading PDF metadata: {str(e)}") | |
return metadata | |
def merge_dict_values(dict1, dict2): | |
""" | |
Merge two dictionaries based on the following rules: | |
1. If key is unique, keep the value | |
2. If key exists in both dicts, keep non-None value; if both have values, keep in a list | |
3. If value is None in both, keep the key with None value | |
""" | |
result = {} | |
# Get all unique keys from both dictionaries | |
all_keys = set(dict1.keys()).union(set(dict2.keys())) | |
for key in all_keys: | |
# Case 1: Key only in dict1 | |
if key in dict1 and key not in dict2: | |
result[key] = dict1[key] | |
# Case 2: Key only in dict2 | |
elif key in dict2 and key not in dict1: | |
result[key] = dict2[key] | |
# Case 3: Key in both dictionaries | |
else: | |
value1 = dict1[key] | |
value2 = dict2[key] | |
# If both are dictionaries, recursively merge them | |
if isinstance(value1, dict) and isinstance(value2, dict): | |
result[key] = merge_dict_values(value1, value2) | |
# If one is None, use the non-None value | |
elif value1 is None and value2 is not None: | |
result[key] = value2 | |
elif value2 is None and value1 is not None: | |
result[key] = value1 | |
# If both have values, store in a list (unless they're the same) | |
elif value1 is not None and value2 is not None and value1 != value2: | |
result[key] = [value1, value2] | |
# If both are None or identical, keep one | |
else: | |
result[key] = value1 | |
return result | |
def merge_json_file(json_data): | |
""" | |
Process a JSON file with nested dictionaries to create a consolidated result. | |
""" | |
result = {} | |
# Process each top-level key | |
for file_path, pages in json_data.items(): | |
# Initialize an empty dictionary for this file path | |
result[file_path] = {} | |
# Get all page dictionaries for this file | |
page_dicts = list(pages.values()) | |
# Start with the first page as the base | |
merged_dict = page_dicts[0] | |
# Merge with each subsequent page | |
for page_dict in page_dicts[1:]: | |
merged_dict = merge_dict_values(merged_dict, page_dict) | |
result[file_path] = merged_dict | |
return result | |
def display_based_on_card(original_file, analysis_results_for_original_file, extracted_files, current_upload): | |
try: | |
analysis_results_for_id = merge_json_file( | |
{original_file: analysis_results_for_original_file}) | |
analysis_results_for_id = analysis_results_for_id[original_file] | |
logger.info(f"analysis_results_for_id : {analysis_results_for_id}") | |
except Exception as e: | |
logger.info( | |
f"Exception while trying to merge results of {original_file}") | |
logger.info( | |
f"analysis_results_for_original_file : {analysis_results_for_original_file}") | |
logger.info(f"error : {e}") | |
return analysis_results_for_original_file | |
analysis_results_for_id_updated = analysis_results_for_id | |
try: | |
document_type = analysis_results_for_id.get( | |
"document_type", "None") | |
logger.info(f"document_type for {original_file}: {document_type}") | |
document_type = document_type.lower() | |
document_type = re.sub('[^A-Za-z0-9]+', '', document_type) | |
print(f"document_type : {document_type}") | |
# analysis_results_pruned = {} | |
if document_type == "passport": | |
analysis_results_pruned = prune_passport_for_display( | |
analysis_results_for_id) | |
display_passport(extracted_files, analysis_results_pruned) | |
elif document_type == "drivinglicense": | |
analysis_results_pruned = prune_driving_license_for_display( | |
analysis_results_for_id) | |
# if original_file not in st.session_state['tab_ocr']['values_display']: | |
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
display_driving_license(extracted_files, analysis_results_pruned) | |
elif document_type == "bankstatement": | |
analysis_results_pruned = prune_bank_statement_for_display( | |
analysis_results_for_id) | |
# if original_file not in st.session_state['tab_ocr']['values_display']: | |
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
display_bank_statement(extracted_files, analysis_results_pruned) | |
elif document_type == "payslip": | |
analysis_results_pruned = prune_payslip_for_display( | |
analysis_results_for_id) | |
# if original_file not in st.session_state['tab_ocr']['values_display']: | |
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
display_payslip(extracted_files, analysis_results_pruned) | |
elif document_type == "p60": | |
analysis_results_pruned = prune_p60_for_display( | |
analysis_results_for_id) | |
# if original_file not in st.session_state['tab_ocr']['values_display']: | |
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
display_p60(extracted_files, analysis_results_pruned) | |
else: | |
analysis_results_for_id_updated["document_type"] = analysis_results_for_id.get( | |
"document_type", None) | |
# if original_file not in st.session_state['tab_ocr']['values_display']: | |
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
display_others(extracted_files, analysis_results_for_id_updated) | |
except Exception as e: | |
logger.info( | |
f"Exception for processing analysis results of {analysis_results_for_id}: {e}") | |
analysis_results_for_id_updated = analysis_results_for_id | |
# if original_file not in st.session_state['tab_ocr']['values_display']: | |
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated | |
display_others(extracted_files, analysis_results_for_id_updated) | |
if original_file not in st.session_state['uploads'][current_upload]['values_display']: | |
st.session_state['uploads'][current_upload]['values_display'][original_file] = analysis_results_for_id_updated | |
# return analysis_results_for_id_updated | |