Spaces:
Build error
Build error
File size: 9,042 Bytes
52c1998 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import streamlit as st
import os
from PIL import Image
import pymupdf
from datetime import datetime
import re
from utils import setup_logger
from .passport import prune_passport_for_display, display_passport
from .driving_license import prune_driving_license_for_display, display_driving_license
from .bank_statement import prune_bank_statement_for_display, display_bank_statement
from .payslip import prune_payslip_for_display, display_payslip
from .p60 import prune_p60_for_display, display_p60
from .others import display_others
logger = setup_logger(__name__)
def load_pdf_as_image(file_path):
# Open PDF file
doc = pymupdf.Document(file_path)
# Get the first page
page = doc[0]
# Convert to image
pix = page.get_pixmap()
# Convert to PIL Image
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
return img
def generate_metadata(file_path):
"""Generate metadata dictionary from file path and properties"""
file_stat = os.stat(file_path)
file_name = os.path.basename(file_path)
parent_dir = os.path.basename(os.path.dirname(file_path))
metadata = {
"File Name": file_name,
"Directory": parent_dir,
"File Size": f"{file_stat.st_size / 1024:.2f} KB",
"Last Modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
"Created": datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
"File Extension": os.path.splitext(file_name)[1],
"Full Path": file_path
}
# Add image-specific metadata if it's an image
if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')):
try:
with Image.open(file_path) as img:
metadata.update({
"Image Size": f"{img.size[0]}x{img.size[1]}",
"Image Mode": img.mode,
"Image Format": img.format
})
except Exception as e:
st.error(f"Error reading image metadata: {str(e)}")
# Add PDF-specific metadata if it's a PDF
elif file_name.lower().endswith('.pdf'):
try:
doc = pymupdf.Document(file_path)
metadata.update({
"Page Count": len(doc),
"PDF Version": doc.pdf_version,
"Document Info": doc.metadata if doc.metadata else "No PDF metadata available"
})
except Exception as e:
st.error(f"Error reading PDF metadata: {str(e)}")
return metadata
def merge_dict_values(dict1, dict2):
"""
Merge two dictionaries based on the following rules:
1. If key is unique, keep the value
2. If key exists in both dicts, keep non-None value; if both have values, keep in a list
3. If value is None in both, keep the key with None value
"""
result = {}
# Get all unique keys from both dictionaries
all_keys = set(dict1.keys()).union(set(dict2.keys()))
for key in all_keys:
# Case 1: Key only in dict1
if key in dict1 and key not in dict2:
result[key] = dict1[key]
# Case 2: Key only in dict2
elif key in dict2 and key not in dict1:
result[key] = dict2[key]
# Case 3: Key in both dictionaries
else:
value1 = dict1[key]
value2 = dict2[key]
# If both are dictionaries, recursively merge them
if isinstance(value1, dict) and isinstance(value2, dict):
result[key] = merge_dict_values(value1, value2)
# If one is None, use the non-None value
elif value1 is None and value2 is not None:
result[key] = value2
elif value2 is None and value1 is not None:
result[key] = value1
# If both have values, store in a list (unless they're the same)
elif value1 is not None and value2 is not None and value1 != value2:
result[key] = [value1, value2]
# If both are None or identical, keep one
else:
result[key] = value1
return result
def merge_json_file(json_data):
"""
Process a JSON file with nested dictionaries to create a consolidated result.
"""
result = {}
# Process each top-level key
for file_path, pages in json_data.items():
# Initialize an empty dictionary for this file path
result[file_path] = {}
# Get all page dictionaries for this file
page_dicts = list(pages.values())
# Start with the first page as the base
merged_dict = page_dicts[0]
# Merge with each subsequent page
for page_dict in page_dicts[1:]:
merged_dict = merge_dict_values(merged_dict, page_dict)
result[file_path] = merged_dict
return result
def display_based_on_card(original_file, analysis_results_for_original_file, extracted_files, current_upload):
try:
analysis_results_for_id = merge_json_file(
{original_file: analysis_results_for_original_file})
analysis_results_for_id = analysis_results_for_id[original_file]
logger.info(f"analysis_results_for_id : {analysis_results_for_id}")
except Exception as e:
logger.info(
f"Exception while trying to merge results of {original_file}")
logger.info(
f"analysis_results_for_original_file : {analysis_results_for_original_file}")
logger.info(f"error : {e}")
return analysis_results_for_original_file
analysis_results_for_id_updated = analysis_results_for_id
try:
document_type = analysis_results_for_id.get(
"document_type", "None")
logger.info(f"document_type for {original_file}: {document_type}")
document_type = document_type.lower()
document_type = re.sub('[^A-Za-z0-9]+', '', document_type)
print(f"document_type : {document_type}")
# analysis_results_pruned = {}
if document_type == "passport":
analysis_results_pruned = prune_passport_for_display(
analysis_results_for_id)
display_passport(extracted_files, analysis_results_pruned)
elif document_type == "drivinglicense":
analysis_results_pruned = prune_driving_license_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_driving_license(extracted_files, analysis_results_pruned)
elif document_type == "bankstatement":
analysis_results_pruned = prune_bank_statement_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_bank_statement(extracted_files, analysis_results_pruned)
elif document_type == "payslip":
analysis_results_pruned = prune_payslip_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_payslip(extracted_files, analysis_results_pruned)
elif document_type == "p60":
analysis_results_pruned = prune_p60_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_p60(extracted_files, analysis_results_pruned)
else:
analysis_results_for_id_updated["document_type"] = analysis_results_for_id.get(
"document_type", None)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_others(extracted_files, analysis_results_for_id_updated)
except Exception as e:
logger.info(
f"Exception for processing analysis results of {analysis_results_for_id}: {e}")
analysis_results_for_id_updated = analysis_results_for_id
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_others(extracted_files, analysis_results_for_id_updated)
if original_file not in st.session_state['uploads'][current_upload]['values_display']:
st.session_state['uploads'][current_upload]['values_display'][original_file] = analysis_results_for_id_updated
# return analysis_results_for_id_updated
|