vamsidharmuthireddy's picture
Upload 90 files
52c1998 verified
import streamlit as st
import os
from PIL import Image
import pymupdf
from datetime import datetime
import re
from utils import setup_logger
from .passport import prune_passport_for_display, display_passport
from .driving_license import prune_driving_license_for_display, display_driving_license
from .bank_statement import prune_bank_statement_for_display, display_bank_statement
from .payslip import prune_payslip_for_display, display_payslip
from .p60 import prune_p60_for_display, display_p60
from .others import display_others
logger = setup_logger(__name__)
def load_pdf_as_image(file_path):
# Open PDF file
doc = pymupdf.Document(file_path)
# Get the first page
page = doc[0]
# Convert to image
pix = page.get_pixmap()
# Convert to PIL Image
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
return img
def generate_metadata(file_path):
"""Generate metadata dictionary from file path and properties"""
file_stat = os.stat(file_path)
file_name = os.path.basename(file_path)
parent_dir = os.path.basename(os.path.dirname(file_path))
metadata = {
"File Name": file_name,
"Directory": parent_dir,
"File Size": f"{file_stat.st_size / 1024:.2f} KB",
"Last Modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
"Created": datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
"File Extension": os.path.splitext(file_name)[1],
"Full Path": file_path
}
# Add image-specific metadata if it's an image
if file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')):
try:
with Image.open(file_path) as img:
metadata.update({
"Image Size": f"{img.size[0]}x{img.size[1]}",
"Image Mode": img.mode,
"Image Format": img.format
})
except Exception as e:
st.error(f"Error reading image metadata: {str(e)}")
# Add PDF-specific metadata if it's a PDF
elif file_name.lower().endswith('.pdf'):
try:
doc = pymupdf.Document(file_path)
metadata.update({
"Page Count": len(doc),
"PDF Version": doc.pdf_version,
"Document Info": doc.metadata if doc.metadata else "No PDF metadata available"
})
except Exception as e:
st.error(f"Error reading PDF metadata: {str(e)}")
return metadata
def merge_dict_values(dict1, dict2):
"""
Merge two dictionaries based on the following rules:
1. If key is unique, keep the value
2. If key exists in both dicts, keep non-None value; if both have values, keep in a list
3. If value is None in both, keep the key with None value
"""
result = {}
# Get all unique keys from both dictionaries
all_keys = set(dict1.keys()).union(set(dict2.keys()))
for key in all_keys:
# Case 1: Key only in dict1
if key in dict1 and key not in dict2:
result[key] = dict1[key]
# Case 2: Key only in dict2
elif key in dict2 and key not in dict1:
result[key] = dict2[key]
# Case 3: Key in both dictionaries
else:
value1 = dict1[key]
value2 = dict2[key]
# If both are dictionaries, recursively merge them
if isinstance(value1, dict) and isinstance(value2, dict):
result[key] = merge_dict_values(value1, value2)
# If one is None, use the non-None value
elif value1 is None and value2 is not None:
result[key] = value2
elif value2 is None and value1 is not None:
result[key] = value1
# If both have values, store in a list (unless they're the same)
elif value1 is not None and value2 is not None and value1 != value2:
result[key] = [value1, value2]
# If both are None or identical, keep one
else:
result[key] = value1
return result
def merge_json_file(json_data):
"""
Process a JSON file with nested dictionaries to create a consolidated result.
"""
result = {}
# Process each top-level key
for file_path, pages in json_data.items():
# Initialize an empty dictionary for this file path
result[file_path] = {}
# Get all page dictionaries for this file
page_dicts = list(pages.values())
# Start with the first page as the base
merged_dict = page_dicts[0]
# Merge with each subsequent page
for page_dict in page_dicts[1:]:
merged_dict = merge_dict_values(merged_dict, page_dict)
result[file_path] = merged_dict
return result
def display_based_on_card(original_file, analysis_results_for_original_file, extracted_files, current_upload):
try:
analysis_results_for_id = merge_json_file(
{original_file: analysis_results_for_original_file})
analysis_results_for_id = analysis_results_for_id[original_file]
logger.info(f"analysis_results_for_id : {analysis_results_for_id}")
except Exception as e:
logger.info(
f"Exception while trying to merge results of {original_file}")
logger.info(
f"analysis_results_for_original_file : {analysis_results_for_original_file}")
logger.info(f"error : {e}")
return analysis_results_for_original_file
analysis_results_for_id_updated = analysis_results_for_id
try:
document_type = analysis_results_for_id.get(
"document_type", "None")
logger.info(f"document_type for {original_file}: {document_type}")
document_type = document_type.lower()
document_type = re.sub('[^A-Za-z0-9]+', '', document_type)
print(f"document_type : {document_type}")
# analysis_results_pruned = {}
if document_type == "passport":
analysis_results_pruned = prune_passport_for_display(
analysis_results_for_id)
display_passport(extracted_files, analysis_results_pruned)
elif document_type == "drivinglicense":
analysis_results_pruned = prune_driving_license_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_driving_license(extracted_files, analysis_results_pruned)
elif document_type == "bankstatement":
analysis_results_pruned = prune_bank_statement_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_bank_statement(extracted_files, analysis_results_pruned)
elif document_type == "payslip":
analysis_results_pruned = prune_payslip_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_payslip(extracted_files, analysis_results_pruned)
elif document_type == "p60":
analysis_results_pruned = prune_p60_for_display(
analysis_results_for_id)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_p60(extracted_files, analysis_results_pruned)
else:
analysis_results_for_id_updated["document_type"] = analysis_results_for_id.get(
"document_type", None)
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_others(extracted_files, analysis_results_for_id_updated)
except Exception as e:
logger.info(
f"Exception for processing analysis results of {analysis_results_for_id}: {e}")
analysis_results_for_id_updated = analysis_results_for_id
# if original_file not in st.session_state['tab_ocr']['values_display']:
# st.session_state['tab_ocr']['values_display'][original_file] = analysis_results_for_id_updated
display_others(extracted_files, analysis_results_for_id_updated)
if original_file not in st.session_state['uploads'][current_upload]['values_display']:
st.session_state['uploads'][current_upload]['values_display'][original_file] = analysis_results_for_id_updated
# return analysis_results_for_id_updated