import streamlit as st import os from PIL import Image import pymupdf from datetime import datetime import re from utils import setup_logger from .passport import prune_passport_for_display, display_passport from .driving_license import prune_driving_license_for_display, display_driving_license from .bank_statement import prune_bank_statement_for_display, display_bank_statement from .payslip import prune_payslip_for_display, display_payslip from .p60 import prune_p60_for_display, display_p60 from .others import display_others logger = setup_logger(__name__) def load_pdf_as_image(file_path): # Open PDF file doc = pymupdf.Document(file_path) # Get the first page page = doc[0] # Convert to image pix = page.get_pixmap() # Convert to PIL Image img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) return img def generate_metadata(file_path): """Generate metadata dictionary from file path and properties""" file_stat = os.stat(file_path) file_name = os.path.basename(file_path) parent_dir = os.path.basename(os.path.dirname(file_path)) metadata = { "File Name": file_name, "Directory": parent_dir, "File Size": f"{file_stat.st_size / 1024:.2f} KB", "Last Modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "Created": datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), "File Extension": os.path.splitext(file_name)[1], "Full Path": file_path } # Add image-specific metadata if it's an image if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): try: with Image.open(file_path) as img: metadata.update({ "Image Size": f"{img.size[0]}x{img.size[1]}", "Image Mode": img.mode, "Image Format": img.format }) except Exception as e: st.error(f"Error reading image metadata: {str(e)}") # Add PDF-specific metadata if it's a PDF elif file_name.lower().endswith('.pdf'): try: doc = pymupdf.Document(file_path) metadata.update({ "Page Count": len(doc), "PDF Version": doc.pdf_version, "Document Info": doc.metadata if doc.metadata else "No PDF metadata available" }) except Exception as e: st.error(f"Error reading PDF metadata: {str(e)}") return metadata def merge_dict_values(dict1, dict2): """ Merge two dictionaries based on the following rules: 1. If key is unique, keep the value 2. If key exists in both dicts, keep non-None value; if both have values, keep in a list 3. If value is None in both, keep the key with None value """ result = {} # Get all unique keys from both dictionaries all_keys = set(dict1.keys()).union(set(dict2.keys())) for key in all_keys: # Case 1: Key only in dict1 if key in dict1 and key not in dict2: result[key] = dict1[key] # Case 2: Key only in dict2 elif key in dict2 and key not in dict1: result[key] = dict2[key] # Case 3: Key in both dictionaries else: value1 = dict1[key] value2 = dict2[key] # If both are dictionaries, recursively merge them if isinstance(value1, dict) and isinstance(value2, dict): result[key] = merge_dict_values(value1, value2) # If one is None, use the non-None value elif value1 is None and value2 is not None: result[key] = value2 elif value2 is None and value1 is not None: result[key] = value1 # If both have values, store in a list (unless they're the same) elif value1 is not None and value2 is not None and value1 != value2: result[key] = [value1, value2] # If both are None or identical, keep one else: result[key] = value1 return result def merge_json_file(json_data): """ Process a JSON file with nested dictionaries to create a consolidated result. """ result = {} # Process each top-level key for file_path, pages in json_data.items(): # Initialize an empty dictionary for this file path result[file_path] = {} # Get all page dictionaries for this file page_dicts = list(pages.values()) # Start with the first page as the base merged_dict = page_dicts[0] # Merge with each subsequent page for page_dict in page_dicts[1:]: merged_dict = merge_dict_values(merged_dict, page_dict) result[file_path] = merged_dict return result def display_based_on_card(original_file, analysis_results_for_original_file, extracted_files, current_upload): try: analysis_results_for_id = merge_json_file( {original_file: analysis_results_for_original_file}) analysis_results_for_id = analysis_results_for_id[original_file] logger.info(f"analysis_results_for_id : {analysis_results_for_id}") except Exception as e: logger.info( f"Exception while trying to merge results of {original_file}") logger.info( f"analysis_results_for_original_file : {analysis_results_for_original_file}") logger.info(f"error : {e}") return analysis_results_for_original_file analysis_results_for_id_updated = analysis_results_for_id try: document_type = analysis_results_for_id.get( "document_type", "None") logger.info(f"document_type for {original_file}: {document_type}") document_type = document_type.lower() document_type = re.sub('[^A-Za-z0-9]+', '', document_type) print(f"document_type : {document_type}") # analysis_results_pruned = {} if document_type == "passport": analysis_results_pruned = prune_passport_for_display( analysis_results_for_id) display_passport(extracted_files, analysis_results_pruned) elif document_type == "drivinglicense": analysis_results_pruned = prune_driving_license_for_display( analysis_results_for_id) # if original_file not in st.session_state['tab_ocr']['values_display']: # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated display_driving_license(extracted_files, analysis_results_pruned) elif document_type == "bankstatement": analysis_results_pruned = prune_bank_statement_for_display( analysis_results_for_id) # if original_file not in st.session_state['tab_ocr']['values_display']: # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated display_bank_statement(extracted_files, analysis_results_pruned) elif document_type == "payslip": analysis_results_pruned = prune_payslip_for_display( analysis_results_for_id) # if original_file not in st.session_state['tab_ocr']['values_display']: # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated display_payslip(extracted_files, analysis_results_pruned) elif document_type == "p60": analysis_results_pruned = prune_p60_for_display( analysis_results_for_id) # if original_file not in st.session_state['tab_ocr']['values_display']: # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated display_p60(extracted_files, analysis_results_pruned) else: analysis_results_for_id_updated["document_type"] = analysis_results_for_id.get( "document_type", None) # if original_file not in st.session_state['tab_ocr']['values_display']: # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated display_others(extracted_files, analysis_results_for_id_updated) except Exception as e: logger.info( f"Exception for processing analysis results of {analysis_results_for_id}: {e}") analysis_results_for_id_updated = analysis_results_for_id # if original_file not in st.session_state['tab_ocr']['values_display']: # st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated display_others(extracted_files, analysis_results_for_id_updated) if original_file not in st.session_state['uploads'][current_upload]['values_display']: st.session_state['uploads'][current_upload]['values_display'][original_file] = analysis_results_for_id_updated # return analysis_results_for_id_updated