|
import os |
|
import re |
|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
from xml.etree.ElementTree import Element, SubElement, tostring, parse |
|
from xml.dom import minidom |
|
import uuid |
|
from typing import List, Tuple |
|
from gradio_image_annotation import image_annotator |
|
from gradio_image_annotation.image_annotator import AnnotatedImageData |
|
from pymupdf import Document, Rect |
|
import pymupdf |
|
from PIL import ImageDraw, Image |
|
|
|
from tools.config import OUTPUT_FOLDER, CUSTOM_BOX_COLOUR, MAX_IMAGE_PIXELS, INPUT_FOLDER |
|
from tools.file_conversion import is_pdf, convert_annotation_json_to_review_df, convert_review_df_to_annotation_json, process_single_page_for_image_conversion, multiply_coordinates_by_page_sizes, convert_annotation_data_to_dataframe, create_annotation_dicts_from_annotation_df, remove_duplicate_images_with_blank_boxes, fill_missing_ids, divide_coordinates_by_page_sizes |
|
from tools.helper_functions import get_file_name_without_type, detect_file_type |
|
from tools.file_redaction import redact_page_with_pymupdf |
|
|
|
if not MAX_IMAGE_PIXELS: Image.MAX_IMAGE_PIXELS = None |
|
|
|
def decrease_page(number:int): |
|
''' |
|
Decrease page number for review redactions page. |
|
''' |
|
if number > 1: |
|
return number - 1, number - 1 |
|
else: |
|
return 1, 1 |
|
|
|
def increase_page(number:int, page_image_annotator_object:AnnotatedImageData): |
|
''' |
|
Increase page number for review redactions page. |
|
''' |
|
|
|
if not page_image_annotator_object: |
|
return 1, 1 |
|
|
|
max_pages = len(page_image_annotator_object) |
|
|
|
if number < max_pages: |
|
return number + 1, number + 1 |
|
else: |
|
return max_pages, max_pages |
|
|
|
def update_zoom(current_zoom_level:int, annotate_current_page:int, decrease:bool=True): |
|
if decrease == False: |
|
if current_zoom_level >= 70: |
|
current_zoom_level -= 10 |
|
else: |
|
if current_zoom_level < 110: |
|
current_zoom_level += 10 |
|
|
|
return current_zoom_level, annotate_current_page |
|
|
|
def update_dropdown_list_based_on_dataframe(df:pd.DataFrame, column:str) -> List["str"]: |
|
''' |
|
Gather unique elements from a string pandas Series, then append 'ALL' to the start and return the list. |
|
''' |
|
if isinstance(df, pd.DataFrame): |
|
|
|
if column not in df.columns or df[column].empty or df[column].isna().all(): |
|
return ["ALL"] |
|
elif column != "page": |
|
entities = df[column].astype(str).unique().tolist() |
|
entities_for_drop = sorted(entities) |
|
entities_for_drop.insert(0, "ALL") |
|
else: |
|
|
|
try: |
|
entities = df[column].astype(int).unique() |
|
entities_for_drop = sorted(entities) |
|
entities_for_drop = [str(e) for e in entities_for_drop] |
|
entities_for_drop.insert(0, "ALL") |
|
except ValueError: |
|
return ["ALL"] |
|
|
|
return entities_for_drop |
|
else: |
|
return ["ALL"] |
|
|
|
def get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object:AnnotatedImageData, |
|
recogniser_dataframe_base:pd.DataFrame, |
|
recogniser_dropdown_value:str, |
|
text_dropdown_value:str, |
|
page_dropdown_value:str, |
|
review_df:pd.DataFrame=[], |
|
page_sizes:List[str]=[]): |
|
''' |
|
Create a filtered recogniser dataframe and associated dropdowns based on current information in the image annotator and review data frame. |
|
''' |
|
|
|
recogniser_entities_list = ["Redaction"] |
|
recogniser_dataframe_out = recogniser_dataframe_base |
|
recogniser_dataframe_out_gr = gr.Dataframe() |
|
review_dataframe = review_df |
|
|
|
try: |
|
|
|
|
|
review_dataframe = convert_annotation_json_to_review_df(page_image_annotator_object, review_df, page_sizes) |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=recogniser_dropdown_value, choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
|
|
recogniser_entities_list = [entity for entity in recogniser_entities_for_drop.copy() if entity != 'Redaction' and entity != 'ALL'] |
|
recogniser_entities_list.insert(0, 'Redaction') |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value, choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(review_dataframe, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value, choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
recogniser_dataframe_out_gr = gr.Dataframe(review_dataframe[["page", "label", "text", "id"]], show_search="filter", col_count=(4, "fixed"), type="pandas", headers=["page", "label", "text", "id"], show_fullscreen_button=True, wrap=True, max_height=400, static_columns=[0,1,2,3]) |
|
|
|
recogniser_dataframe_out = review_dataframe[["page", "label", "text", "id"]] |
|
|
|
except Exception as e: |
|
print("Could not extract recogniser information:", e) |
|
recogniser_dataframe_out = recogniser_dataframe_base[["page", "label", "text", "id"]] |
|
|
|
label_choices = review_dataframe["label"].astype(str).unique().tolist() |
|
text_choices = review_dataframe["text"].astype(str).unique().tolist() |
|
page_choices = review_dataframe["page"].astype(str).unique().tolist() |
|
|
|
recogniser_entities_drop = gr.Dropdown(value=recogniser_dropdown_value, choices=label_choices, allow_custom_value=True, interactive=True) |
|
recogniser_entities_list = ["Redaction"] |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value, choices=text_choices, allow_custom_value=True, interactive=True) |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value, choices=page_choices, allow_custom_value=True, interactive=True) |
|
|
|
return recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list, text_entities_drop, page_entities_drop |
|
|
|
def update_recogniser_dataframes(page_image_annotator_object:AnnotatedImageData, recogniser_dataframe_base:pd.DataFrame, recogniser_entities_dropdown_value:str="ALL", text_dropdown_value:str="ALL", page_dropdown_value:str="ALL", review_df:pd.DataFrame=[], page_sizes:list[str]=[]): |
|
''' |
|
Update recogniser dataframe information that appears alongside the pdf pages on the review screen. |
|
''' |
|
recogniser_entities_list = ["Redaction"] |
|
recogniser_dataframe_out = pd.DataFrame() |
|
recogniser_dataframe_out_gr = gr.Dataframe() |
|
|
|
|
|
if recogniser_dataframe_base.empty: |
|
recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, recogniser_entities_list, text_entities_drop, page_entities_drop = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) |
|
elif recogniser_dataframe_base.iloc[0,0] == "": |
|
recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_dropdown_value, recogniser_entities_list, text_entities_drop, page_entities_drop = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) |
|
else: |
|
recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_dropdown, recogniser_entities_list, text_dropdown, page_dropdown = get_filtered_recogniser_dataframe_and_dropdowns(page_image_annotator_object, recogniser_dataframe_base, recogniser_entities_dropdown_value, text_dropdown_value, page_dropdown_value, review_df, page_sizes) |
|
|
|
review_dataframe, text_entities_drop, page_entities_drop = update_entities_df_recogniser_entities(recogniser_entities_dropdown_value, recogniser_dataframe_out, page_dropdown_value, text_dropdown_value) |
|
|
|
recogniser_dataframe_out_gr = gr.Dataframe(review_dataframe[["page", "label", "text", "id"]], show_search="filter", col_count=(4, "fixed"), type="pandas", headers=["page", "label", "text", "id"], show_fullscreen_button=True, wrap=True, max_height=400, static_columns=[0,1,2,3]) |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(recogniser_dataframe_out, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=recogniser_entities_dropdown_value, choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
recogniser_entities_list_base = recogniser_dataframe_out["label"].astype(str).unique().tolist() |
|
|
|
|
|
recogniser_entities_list = [entity for entity in recogniser_entities_list_base if entity != 'Redaction'] |
|
recogniser_entities_list.insert(0, 'Redaction') |
|
|
|
return recogniser_entities_list, recogniser_dataframe_out_gr, recogniser_dataframe_out, recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
def undo_last_removal(backup_review_state:pd.DataFrame, backup_image_annotations_state:list[dict], backup_recogniser_entity_dataframe_base:pd.DataFrame): |
|
return backup_review_state, backup_image_annotations_state, backup_recogniser_entity_dataframe_base |
|
|
|
def update_annotator_page_from_review_df( |
|
review_df: pd.DataFrame, |
|
image_file_paths:List[str], |
|
page_sizes:List[dict], |
|
current_image_annotations_state:List[str], |
|
current_page_annotator:object, |
|
selected_recogniser_entity_df_row:pd.DataFrame, |
|
input_folder:str, |
|
doc_full_file_name_textbox:str |
|
) -> Tuple[object, List[dict], int, List[dict], pd.DataFrame, int]: |
|
''' |
|
Update the visible annotation object and related objects with the latest review file information, |
|
optimizing by processing only the current page's data. |
|
''' |
|
|
|
out_image_annotations_state: List[dict] = list(current_image_annotations_state) |
|
out_current_page_annotator: dict = current_page_annotator |
|
|
|
|
|
|
|
gradio_annotator_current_page_number: int = 0 |
|
annotate_previous_page: int = 0 |
|
if not selected_recogniser_entity_df_row.empty and 'page' in selected_recogniser_entity_df_row.columns: |
|
try: |
|
|
|
gradio_annotator_current_page_number = int(selected_recogniser_entity_df_row['page'].iloc[0]) |
|
annotate_previous_page = gradio_annotator_current_page_number |
|
except (IndexError, ValueError, TypeError): |
|
print("Warning: Could not extract valid page number from selected_recogniser_entity_df_row. Defaulting to page 0 (or 1).") |
|
gradio_annotator_current_page_number = 1 |
|
|
|
|
|
if gradio_annotator_current_page_number <= 0: |
|
gradio_annotator_current_page_number = 1 |
|
|
|
page_max_reported = len(out_image_annotations_state) |
|
if gradio_annotator_current_page_number > page_max_reported: |
|
gradio_annotator_current_page_number = page_max_reported |
|
|
|
page_num_reported_zero_indexed = gradio_annotator_current_page_number - 1 |
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
if not page_sizes_df.empty: |
|
|
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
page_sizes_df.dropna(subset=["page"], inplace=True) |
|
if not page_sizes_df.empty: |
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
|
else: |
|
print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
|
|
|
if not review_df.empty: |
|
|
|
|
|
if 'page' in review_df.columns: |
|
review_df['page'] = pd.to_numeric(review_df['page'], errors='coerce').fillna(-1).astype(int) |
|
|
|
current_image_path = out_image_annotations_state[page_num_reported_zero_indexed]['image'] |
|
|
|
replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image(doc_full_file_name_textbox, current_image_path, page_sizes_df, gradio_annotator_current_page_number, input_folder) |
|
|
|
|
|
page_sizes = page_sizes_df.to_dict(orient='records') |
|
review_df.loc[review_df["page"]==gradio_annotator_current_page_number, 'image'] = replaced_image_path |
|
images_list = list(page_sizes_df["image_path"]) |
|
images_list[page_num_reported_zero_indexed] = replaced_image_path |
|
out_image_annotations_state[page_num_reported_zero_indexed]['image'] = replaced_image_path |
|
|
|
current_page_review_df = review_df[review_df['page'] == gradio_annotator_current_page_number].copy() |
|
current_page_review_df = multiply_coordinates_by_page_sizes(current_page_review_df, page_sizes_df) |
|
|
|
else: |
|
print(f"Warning: 'page' column not found in review_df. Cannot filter for page {gradio_annotator_current_page_number}. Skipping update from review_df.") |
|
current_page_review_df = pd.DataFrame() |
|
|
|
if not current_page_review_df.empty: |
|
|
|
|
|
current_page_annotations_list = [] |
|
|
|
|
|
expected_annotation_keys = ['label', 'color', 'xmin', 'ymin', 'xmax', 'ymax', 'text', 'id'] |
|
|
|
|
|
for key in expected_annotation_keys: |
|
if key not in current_page_review_df.columns: |
|
|
|
|
|
default_value = np.nan if key in ['xmin', 'ymin', 'xmax', 'ymax'] else '' |
|
current_page_review_df[key] = default_value |
|
|
|
|
|
|
|
current_page_annotations_list_raw = current_page_review_df[expected_annotation_keys].to_dict(orient='records') |
|
|
|
current_page_annotations_list = current_page_annotations_list_raw |
|
|
|
|
|
|
|
|
|
|
|
page_state_entry_found = False |
|
for i, page_state_entry in enumerate(out_image_annotations_state): |
|
|
|
|
|
match = re.search(r"(\d+)\.png$", page_state_entry['image']) |
|
if match: page_no = int(match.group(1)) |
|
else: page_no = 0 |
|
|
|
if 'image' in page_state_entry and page_no == page_num_reported_zero_indexed: |
|
|
|
out_image_annotations_state[i]['boxes'] = current_page_annotations_list |
|
|
|
|
|
|
|
if 'image' in current_page_review_df.columns and not current_page_review_df.empty: |
|
|
|
out_image_annotations_state[i]['image'] = current_page_review_df['image'].iloc[0] |
|
page_state_entry_found = True |
|
break |
|
|
|
if not page_state_entry_found: |
|
|
|
|
|
|
|
|
|
print(f"Warning: Entry for page {gradio_annotator_current_page_number} not found in current_image_annotations_state. Cannot update page annotations.") |
|
|
|
|
|
|
|
|
|
|
|
current_image_path = None |
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed and 'image' in out_image_annotations_state[page_num_reported_zero_indexed]: |
|
current_image_path = out_image_annotations_state[page_num_reported_zero_indexed]['image'] |
|
else: |
|
print(f"Warning: Could not get image path from state for page index {page_num_reported_zero_indexed}.") |
|
|
|
|
|
|
|
if current_image_path and not page_sizes_df.empty: |
|
try: |
|
replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image( |
|
doc_full_file_name_textbox, current_image_path, page_sizes_df, |
|
gradio_annotator_current_page_number, input_folder |
|
) |
|
|
|
|
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed: |
|
out_image_annotations_state[page_num_reported_zero_indexed]['image'] = replaced_image_path |
|
|
|
if 'page' in review_df.columns and 'image' in review_df.columns: |
|
review_df.loc[review_df["page"]==gradio_annotator_current_page_number, 'image'] = replaced_image_path |
|
|
|
except Exception as e: |
|
print(f"Error during image path replacement for page {gradio_annotator_current_page_number}: {e}") |
|
|
|
|
|
|
|
if not page_sizes_df.empty: |
|
page_sizes = page_sizes_df.to_dict(orient='records') |
|
else: |
|
page_sizes = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
out_image_annotations_state = remove_duplicate_images_with_blank_boxes(out_image_annotations_state) |
|
except Exception as e: |
|
print(f"Error during duplicate removal: {e}. Proceeding without duplicate removal.") |
|
|
|
|
|
|
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed: |
|
out_current_page_annotator = out_image_annotations_state[page_num_reported_zero_indexed] |
|
else: |
|
print(f"Warning: Cannot select current page annotator object for index {page_num_reported_zero_indexed}.") |
|
out_current_page_annotator = {} |
|
|
|
|
|
|
|
|
|
final_page_number_returned = gradio_annotator_current_page_number |
|
|
|
return (out_current_page_annotator, |
|
out_image_annotations_state, |
|
final_page_number_returned, |
|
page_sizes, |
|
review_df, |
|
annotate_previous_page) |
|
|
|
def exclude_selected_items_from_redaction(review_df: pd.DataFrame, |
|
selected_rows_df: pd.DataFrame, |
|
image_file_paths:List[str], |
|
page_sizes:List[dict], |
|
image_annotations_state:dict, |
|
recogniser_entity_dataframe_base:pd.DataFrame): |
|
''' |
|
Remove selected items from the review dataframe from the annotation object and review dataframe. |
|
''' |
|
|
|
backup_review_state = review_df |
|
backup_image_annotations_state = image_annotations_state |
|
backup_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base |
|
|
|
if not selected_rows_df.empty and not review_df.empty: |
|
use_id = ( |
|
"id" in selected_rows_df.columns |
|
and "id" in review_df.columns |
|
and not selected_rows_df["id"].isnull().all() |
|
and not review_df["id"].isnull().all() |
|
) |
|
|
|
selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] |
|
|
|
|
|
selected_subset = selected_rows_df[selected_merge_cols].drop_duplicates(subset=selected_merge_cols) |
|
|
|
|
|
merged_df = review_df.merge(selected_subset, on=selected_merge_cols, how='left', indicator=True) |
|
out_review_df = merged_df[merged_df['_merge'] == 'left_only'].drop(columns=['_merge']) |
|
|
|
out_image_annotations_state = convert_review_df_to_annotation_json(out_review_df, image_file_paths, page_sizes) |
|
|
|
out_recogniser_entity_dataframe_base = out_review_df[["page", "label", "text", "id"]] |
|
|
|
|
|
else: |
|
out_review_df = review_df |
|
out_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base |
|
out_image_annotations_state = image_annotations_state |
|
|
|
return out_review_df, out_image_annotations_state, out_recogniser_entity_dataframe_base, backup_review_state, backup_image_annotations_state, backup_recogniser_entity_dataframe_base |
|
|
|
def replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
all_image_annotations:List[dict], |
|
page_image_annotator_object:AnnotatedImageData, |
|
page_sizes:List[dict], |
|
page:int): |
|
|
|
''' |
|
Check if the image value in an AnnotatedImageData dict is a placeholder or np.array. If either of these, replace the value with the file path of the image that is hopefully already loaded into the app related to this page. |
|
''' |
|
|
|
page_zero_index = page - 1 |
|
|
|
if isinstance(all_image_annotations[page_zero_index]["image"], np.ndarray) or "placeholder_image" in all_image_annotations[page_zero_index]["image"] or isinstance(page_image_annotator_object['image'], np.ndarray): |
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
page_sizes_df[["page"]] = page_sizes_df[["page"]].apply(pd.to_numeric, errors="coerce") |
|
|
|
|
|
matching_paths = page_sizes_df.loc[page_sizes_df['page'] == page, "image_path"].unique() |
|
|
|
if matching_paths.size > 0: |
|
image_path = matching_paths[0] |
|
page_image_annotator_object['image'] = image_path |
|
all_image_annotations[page_zero_index]["image"] = image_path |
|
else: |
|
print(f"No image path found for page {page}.") |
|
|
|
return page_image_annotator_object, all_image_annotations |
|
|
|
def replace_placeholder_image_with_real_image(doc_full_file_name_textbox:str, current_image_path:str, page_sizes_df:pd.DataFrame, page_num_reported:int, input_folder:str): |
|
''' If image path is still not valid, load in a new image an overwrite it. Then replace all items in the image annotation object for all pages based on the updated information.''' |
|
page_num_reported_zero_indexed = page_num_reported - 1 |
|
|
|
if not os.path.exists(current_image_path): |
|
|
|
page_num, replaced_image_path, width, height = process_single_page_for_image_conversion(doc_full_file_name_textbox, page_num_reported_zero_indexed, input_folder=input_folder) |
|
|
|
|
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"] = width |
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"] = height |
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_path"] = replaced_image_path |
|
|
|
else: |
|
if not page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"].isnull().all(): |
|
width = page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"].max() |
|
height = page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"].max() |
|
else: |
|
image = Image.open(current_image_path) |
|
width = image.width |
|
height = image.height |
|
|
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_width"] = width |
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_height"] = height |
|
|
|
page_sizes_df.loc[page_sizes_df['page']==page_num_reported, "image_path"] = current_image_path |
|
|
|
replaced_image_path = current_image_path |
|
|
|
return replaced_image_path, page_sizes_df |
|
|
|
def update_annotator_object_and_filter_df( |
|
all_image_annotations:List[AnnotatedImageData], |
|
gradio_annotator_current_page_number:int, |
|
recogniser_entities_dropdown_value:str="ALL", |
|
page_dropdown_value:str="ALL", |
|
text_dropdown_value:str="ALL", |
|
recogniser_dataframe_base:gr.Dataframe=None, |
|
zoom:int=100, |
|
review_df:pd.DataFrame=None, |
|
page_sizes:List[dict]=[], |
|
doc_full_file_name_textbox:str='', |
|
input_folder:str=INPUT_FOLDER |
|
) -> Tuple[image_annotator, gr.Number, gr.Number, int, str, gr.Dataframe, pd.DataFrame, List[str], List[str], List[dict], List[AnnotatedImageData]]: |
|
''' |
|
Update a gradio_image_annotation object with new annotation data for the current page |
|
and update filter dataframes, optimizing by processing only the current page's data for display. |
|
''' |
|
zoom_str = str(zoom) + '%' |
|
|
|
|
|
if review_df is None or not isinstance(review_df, pd.DataFrame): |
|
review_df = pd.DataFrame(columns=["image", "page", "label", "color", "xmin", "ymin", "xmax", "ymax", "text", "id"]) |
|
if recogniser_dataframe_base is None: |
|
recogniser_dataframe_base = gr.Dataframe(pd.DataFrame(data={"page":[], "label":[], "text":[], "id":[]})) |
|
|
|
|
|
|
|
if not all_image_annotations: |
|
print("No all_image_annotation object found") |
|
|
|
blank_annotator = gr.ImageAnnotator( |
|
value = None, boxes_alpha=0.1, box_thickness=1, label_list=[], label_colors=[], |
|
show_label=False, height=zoom_str, width=zoom_str, box_min_size=1, |
|
box_selected_thickness=2, handle_size=4, sources=None, |
|
show_clear_button=False, show_share_button=False, show_remove_button=False, |
|
handles_cursor=True, interactive=True, use_default_label=True |
|
) |
|
blank_df_out_gr = gr.Dataframe(pd.DataFrame(columns=["page", "label", "text", "id"])) |
|
blank_df_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
|
return (blank_annotator, gr.Number(value=1), gr.Number(value=1), 1, |
|
recogniser_entities_dropdown_value, blank_df_out_gr, blank_df_modified, |
|
[], [], [], []) |
|
|
|
|
|
page_num_reported = max(1, gradio_annotator_current_page_number) |
|
page_max_reported = len(all_image_annotations) |
|
if page_num_reported > page_max_reported: |
|
page_num_reported = page_max_reported |
|
|
|
page_num_reported_zero_indexed = page_num_reported - 1 |
|
annotate_previous_page = page_num_reported |
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
if not page_sizes_df.empty: |
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
page_sizes_df.dropna(subset=["page"], inplace=True) |
|
if not page_sizes_df.empty: |
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
|
else: |
|
print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
|
|
|
|
|
|
|
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
|
|
|
|
|
|
|
|
page_object_to_update = all_image_annotations[page_num_reported_zero_indexed] |
|
|
|
|
|
|
|
|
|
|
|
updated_page_object, all_image_annotations_after_img_replace = replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
all_image_annotations, page_object_to_update, page_sizes, page_num_reported) |
|
|
|
|
|
|
|
|
|
all_image_annotations = all_image_annotations_after_img_replace |
|
|
|
|
|
|
|
current_image_path = updated_page_object.get('image') |
|
|
|
if current_image_path and not page_sizes_df.empty: |
|
try: |
|
replaced_image_path, page_sizes_df = replace_placeholder_image_with_real_image( |
|
doc_full_file_name_textbox, current_image_path, page_sizes_df, |
|
page_num_reported, input_folder=input_folder |
|
) |
|
|
|
|
|
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
all_image_annotations[page_num_reported_zero_indexed]['image'] = replaced_image_path |
|
|
|
|
|
if 'page' in review_df.columns and 'image' in review_df.columns: |
|
|
|
review_df['page'] = pd.to_numeric(review_df['page'], errors='coerce').fillna(-1).astype(int) |
|
review_df.loc[review_df["page"]==page_num_reported, 'image'] = replaced_image_path |
|
|
|
|
|
except Exception as e: |
|
print(f"Error during image path replacement for page {page_num_reported}: {e}") |
|
else: |
|
print(f"Warning: Page index {page_num_reported_zero_indexed} out of bounds for all_image_annotations list.") |
|
|
|
|
|
|
|
if not page_sizes_df.empty: |
|
page_sizes = page_sizes_df.to_dict(orient='records') |
|
else: |
|
page_sizes = [] |
|
|
|
|
|
current_page_image_annotator_object = None |
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
page_data_for_display = all_image_annotations[page_num_reported_zero_indexed] |
|
|
|
|
|
|
|
current_page_annotations_df = convert_annotation_data_to_dataframe([page_data_for_display]) |
|
|
|
|
|
if not current_page_annotations_df.empty and not page_sizes_df.empty: |
|
|
|
try: |
|
|
|
page_size_row = page_sizes_df[page_sizes_df['page'] == page_num_reported] |
|
if not page_size_row.empty: |
|
current_page_annotations_df = multiply_coordinates_by_page_sizes( |
|
current_page_annotations_df, page_size_row, |
|
xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax" |
|
) |
|
|
|
except Exception as e: |
|
print(f"Warning: Error during coordinate multiplication for page {page_num_reported}: {e}. Using original coordinates.") |
|
|
|
|
|
if "color" not in current_page_annotations_df.columns: |
|
current_page_annotations_df['color'] = '(0, 0, 0)' |
|
|
|
|
|
processed_current_page_annotations_list = current_page_annotations_df[["xmin", "xmax", "ymin", "ymax", "label", "color", "text", "id"]].to_dict(orient='records') |
|
|
|
|
|
current_page_image_annotator_object: AnnotatedImageData = { |
|
'image': page_data_for_display.get('image'), |
|
'boxes': processed_current_page_annotations_list |
|
} |
|
|
|
|
|
|
|
|
|
try: |
|
recogniser_entities_list, recogniser_dataframe_out_gr, recogniser_dataframe_modified, recogniser_entities_dropdown_value, text_entities_drop, page_entities_drop = update_recogniser_dataframes( |
|
all_image_annotations, |
|
recogniser_dataframe_base, |
|
recogniser_entities_dropdown_value, |
|
text_dropdown_value, |
|
page_dropdown_value, |
|
review_df.copy(), |
|
page_sizes |
|
) |
|
|
|
recogniser_colour_list = [(0, 0, 0) for _ in range(len(recogniser_entities_list))] |
|
|
|
except Exception as e: |
|
print(f"Error calling update_recogniser_dataframes: {e}. Returning empty/default filter data.") |
|
recogniser_entities_list = [] |
|
recogniser_colour_list = [] |
|
recogniser_dataframe_out_gr = gr.Dataframe(pd.DataFrame(columns=["page", "label", "text", "id"])) |
|
recogniser_dataframe_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
text_entities_drop = [] |
|
page_entities_drop = [] |
|
|
|
|
|
|
|
page_number_reported_gradio_comp = gr.Number(label = "Current page", value=page_num_reported, precision=0) |
|
|
|
|
|
|
|
|
|
|
|
if current_page_image_annotator_object is None: |
|
|
|
|
|
print("Warning: Could not prepare annotator object for the current page.") |
|
out_image_annotator = image_annotator(value=None, interactive=False) |
|
else: |
|
out_image_annotator = image_annotator( |
|
value = current_page_image_annotator_object, |
|
boxes_alpha=0.1, |
|
box_thickness=1, |
|
label_list=recogniser_entities_list, |
|
label_colors=recogniser_colour_list, |
|
show_label=False, |
|
height=zoom_str, |
|
width=zoom_str, |
|
box_min_size=1, |
|
box_selected_thickness=2, |
|
handle_size=4, |
|
sources=None, |
|
show_clear_button=False, |
|
show_share_button=False, |
|
show_remove_button=False, |
|
handles_cursor=True, |
|
interactive=True |
|
) |
|
|
|
|
|
|
|
|
|
return (out_image_annotator, |
|
page_number_reported_gradio_comp, |
|
page_number_reported_gradio_comp, |
|
page_num_reported, |
|
recogniser_entities_dropdown_value, |
|
recogniser_dataframe_out_gr, |
|
recogniser_dataframe_modified, |
|
text_entities_drop, |
|
page_entities_drop, |
|
page_sizes, |
|
all_image_annotations) |
|
|
|
def update_all_page_annotation_object_based_on_previous_page( |
|
page_image_annotator_object:AnnotatedImageData, |
|
current_page:int, |
|
previous_page:int, |
|
all_image_annotations:List[AnnotatedImageData], |
|
page_sizes:List[dict]=[], |
|
clear_all:bool=False |
|
): |
|
''' |
|
Overwrite image annotations on the page we are moving from with modifications. |
|
''' |
|
|
|
previous_page_zero_index = previous_page -1 |
|
|
|
if not current_page: current_page = 1 |
|
|
|
|
|
page_image_annotator_object, all_image_annotations = replace_annotator_object_img_np_array_with_page_sizes_image_path(all_image_annotations, page_image_annotator_object, page_sizes, previous_page) |
|
|
|
if clear_all == False: all_image_annotations[previous_page_zero_index] = page_image_annotator_object |
|
else: all_image_annotations[previous_page_zero_index]["boxes"] = [] |
|
|
|
return all_image_annotations, current_page, current_page |
|
|
|
def apply_redactions_to_review_df_and_files(page_image_annotator_object:AnnotatedImageData, |
|
file_paths:List[str], |
|
doc:Document, |
|
all_image_annotations:List[AnnotatedImageData], |
|
current_page:int, |
|
review_file_state:pd.DataFrame, |
|
output_folder:str = OUTPUT_FOLDER, |
|
save_pdf:bool=True, |
|
page_sizes:List[dict]=[], |
|
progress=gr.Progress(track_tqdm=True)): |
|
''' |
|
Apply modified redactions to a pymupdf and export review files |
|
''' |
|
|
|
output_files = [] |
|
output_log_files = [] |
|
pdf_doc = [] |
|
review_df = review_file_state |
|
|
|
page_image_annotator_object = all_image_annotations[current_page - 1] |
|
|
|
|
|
page_image_annotator_object, all_image_annotations = replace_annotator_object_img_np_array_with_page_sizes_image_path(all_image_annotations, page_image_annotator_object, page_sizes, current_page) |
|
page_image_annotator_object['image'] = all_image_annotations[current_page - 1]["image"] |
|
|
|
if not page_image_annotator_object: |
|
print("No image annotations object found for page") |
|
return doc, all_image_annotations, output_files, output_log_files, review_df |
|
|
|
if isinstance(file_paths, str): |
|
file_paths = [file_paths] |
|
|
|
for file_path in file_paths: |
|
file_name_without_ext = get_file_name_without_type(file_path) |
|
file_name_with_ext = os.path.basename(file_path) |
|
|
|
file_extension = os.path.splitext(file_path)[1].lower() |
|
|
|
if save_pdf == True: |
|
|
|
if (is_pdf(file_path) == False) & (file_extension not in '.csv'): |
|
image = Image.open(file_paths[-1]) |
|
|
|
draw = ImageDraw.Draw(image) |
|
|
|
for img_annotation_box in page_image_annotator_object['boxes']: |
|
coords = [img_annotation_box["xmin"], |
|
img_annotation_box["ymin"], |
|
img_annotation_box["xmax"], |
|
img_annotation_box["ymax"]] |
|
|
|
fill = img_annotation_box["color"] |
|
|
|
|
|
if isinstance(fill, tuple) and len(fill) == 3: |
|
|
|
if all(isinstance(c, int) and 0 <= c <= 255 for c in fill): |
|
pass |
|
|
|
else: |
|
print(f"Invalid color values: {fill}. Defaulting to black.") |
|
fill = (0, 0, 0) |
|
else: |
|
print(f"Invalid fill format: {fill}. Defaulting to black.") |
|
fill = (0, 0, 0) |
|
|
|
|
|
if image.mode not in ("RGB", "RGBA"): |
|
image = image.convert("RGB") |
|
|
|
draw = ImageDraw.Draw(image) |
|
|
|
draw.rectangle(coords, fill=fill) |
|
|
|
output_image_path = output_folder + file_name_without_ext + "_redacted.png" |
|
image.save(output_folder + file_name_without_ext + "_redacted.png") |
|
|
|
output_files.append(output_image_path) |
|
|
|
doc = [image] |
|
|
|
elif file_extension in '.csv': |
|
pdf_doc = [] |
|
|
|
|
|
elif is_pdf(file_path) == True: |
|
pdf_doc = pymupdf.open(file_path) |
|
orig_pdf_file_path = file_path |
|
|
|
output_files.append(orig_pdf_file_path) |
|
|
|
number_of_pages = pdf_doc.page_count |
|
original_cropboxes = [] |
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
page_sizes_df[["page"]] = page_sizes_df[["page"]].apply(pd.to_numeric, errors="coerce") |
|
|
|
for i in progress.tqdm(range(0, number_of_pages), desc="Saving redacted pages to file", unit = "pages"): |
|
|
|
image_loc = all_image_annotations[i]['image'] |
|
|
|
|
|
if isinstance(image_loc, np.ndarray): |
|
image = Image.fromarray(image_loc.astype('uint8')) |
|
elif isinstance(image_loc, Image.Image): |
|
image = image_loc |
|
elif isinstance(image_loc, str): |
|
if not os.path.exists(image_loc): |
|
image=page_sizes_df.loc[page_sizes_df['page']==i, "image_path"] |
|
try: |
|
image = Image.open(image_loc) |
|
except Exception as e: |
|
image = None |
|
|
|
pymupdf_page = pdf_doc.load_page(i) |
|
original_cropboxes.append(pymupdf_page.cropbox) |
|
pymupdf_page.set_cropbox(pymupdf_page.mediabox) |
|
|
|
pymupdf_page = redact_page_with_pymupdf(page=pymupdf_page, page_annotations=all_image_annotations[i], image=image, original_cropbox=original_cropboxes[-1], page_sizes_df= page_sizes_df) |
|
else: |
|
print("File type not recognised.") |
|
|
|
progress(0.9, "Saving output files") |
|
|
|
|
|
if pdf_doc: |
|
out_pdf_file_path = output_folder + file_name_without_ext + "_redacted.pdf" |
|
pdf_doc.save(out_pdf_file_path, garbage=4, deflate=True, clean=True) |
|
output_files.append(out_pdf_file_path) |
|
|
|
else: |
|
print("PDF input not found. Outputs not saved to PDF.") |
|
|
|
|
|
else: |
|
if is_pdf(file_path) == True: |
|
orig_pdf_file_path = file_path |
|
output_files.append(orig_pdf_file_path) |
|
|
|
try: |
|
|
|
review_df = convert_annotation_json_to_review_df(all_image_annotations, review_file_state.copy(), page_sizes=page_sizes) |
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
page_sizes_df .loc[:, "page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
review_df = divide_coordinates_by_page_sizes(review_df, page_sizes_df) |
|
|
|
review_df = review_df[["image", "page", "label","color", "xmin", "ymin", "xmax", "ymax", "text", "id"]] |
|
|
|
out_review_file_file_path = output_folder + file_name_with_ext + '_review_file.csv' |
|
|
|
review_df.to_csv(out_review_file_file_path, index=None) |
|
output_files.append(out_review_file_file_path) |
|
|
|
except Exception as e: |
|
print("In apply redactions function, could not save annotations to csv file:", e) |
|
|
|
return doc, all_image_annotations, output_files, output_log_files, review_df |
|
|
|
def get_boxes_json(annotations:AnnotatedImageData): |
|
return annotations["boxes"] |
|
|
|
def update_all_entity_df_dropdowns(df:pd.DataFrame, label_dropdown_value:str, page_dropdown_value:str, text_dropdown_value:str): |
|
''' |
|
Update all dropdowns based on rows that exist in a dataframe |
|
''' |
|
|
|
if isinstance(label_dropdown_value, str): |
|
label_dropdown_value = [label_dropdown_value] |
|
if isinstance(page_dropdown_value, str): |
|
page_dropdown_value = [page_dropdown_value] |
|
if isinstance(text_dropdown_value, str): |
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
def update_entities_df_recogniser_entities(choice:str, df:pd.DataFrame, page_dropdown_value:str, text_dropdown_value:str): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
|
|
if isinstance(choice, str): |
|
choice = [choice] |
|
if isinstance(page_dropdown_value, str): |
|
page_dropdown_value = [page_dropdown_value] |
|
if isinstance(text_dropdown_value, str): |
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not "ALL" in page_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] |
|
|
|
if not "ALL" in text_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] |
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(choice)] |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=choice[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df, text_entities_drop, page_entities_drop |
|
|
|
def update_entities_df_page(choice:str, df:pd.DataFrame, label_dropdown_value:str, text_dropdown_value:str): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
if isinstance(choice, str): |
|
choice = [choice] |
|
if isinstance(label_dropdown_value, str): |
|
label_dropdown_value = [label_dropdown_value] |
|
if isinstance(text_dropdown_value, str): |
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not "ALL" in text_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(text_dropdown_value)] |
|
|
|
if not "ALL" in label_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] |
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(choice)] |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=text_dropdown_value[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=choice[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df, recogniser_entities_drop, text_entities_drop |
|
|
|
def update_entities_df_text(choice:str, df:pd.DataFrame, label_dropdown_value:str, page_dropdown_value:str): |
|
''' |
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
''' |
|
if isinstance(choice, str): |
|
choice = [choice] |
|
if isinstance(label_dropdown_value, str): |
|
label_dropdown_value = [label_dropdown_value] |
|
if isinstance(page_dropdown_value, str): |
|
page_dropdown_value = [page_dropdown_value] |
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not "ALL" in page_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(page_dropdown_value)] |
|
|
|
if not "ALL" in label_dropdown_value: |
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(label_dropdown_value)] |
|
|
|
if not "ALL" in choice: |
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(choice)] |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value=label_dropdown_value[0], choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "text") |
|
text_entities_drop = gr.Dropdown(value=choice[0], choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(filtered_df, "page") |
|
page_entities_drop = gr.Dropdown(value=page_dropdown_value[0], choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return filtered_df, recogniser_entities_drop, page_entities_drop |
|
|
|
def reset_dropdowns(df:pd.DataFrame): |
|
''' |
|
Return Gradio dropdown objects with value 'ALL'. |
|
''' |
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "label") |
|
recogniser_entities_drop = gr.Dropdown(value="ALL", choices=recogniser_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "text") |
|
text_entities_drop = gr.Dropdown(value="ALL", choices=text_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "page") |
|
page_entities_drop = gr.Dropdown(value="ALL", choices=page_entities_for_drop, allow_custom_value=True, interactive=True) |
|
|
|
return recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
def df_select_callback(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_page = evt.row_value[0] |
|
row_value_label = evt.row_value[1] |
|
row_value_text = evt.row_value[2] |
|
row_value_id = evt.row_value[3] |
|
|
|
row_value_df = pd.DataFrame(data={"page":[row_value_page], "label":[row_value_label], "text":[row_value_text], "id":[row_value_id]}) |
|
|
|
return row_value_df |
|
|
|
def df_select_callback_textract_api(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_job_id = evt.row_value[0] |
|
|
|
row_value_job_type = evt.row_value[2] |
|
|
|
row_value_df = pd.DataFrame(data={"job_id":[row_value_job_id], "label":[row_value_job_type]}) |
|
|
|
return row_value_job_id, row_value_job_type, row_value_df |
|
|
|
def df_select_callback_cost(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_code = evt.row_value[0] |
|
|
|
|
|
|
|
|
|
return row_value_code |
|
|
|
def df_select_callback_ocr(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
row_value_page = evt.row_value[0] |
|
row_value_text = evt.row_value[1] |
|
|
|
row_value_df = pd.DataFrame(data={"page":[row_value_page], "text":[row_value_text]}) |
|
|
|
return row_value_page, row_value_df |
|
|
|
def update_selected_review_df_row_colour( |
|
redaction_row_selection: pd.DataFrame, |
|
review_df: pd.DataFrame, |
|
previous_id: str = "", |
|
previous_colour: str = '(0, 0, 0)', |
|
colour: str = '(1, 0, 255)' |
|
) -> tuple[pd.DataFrame, str, str]: |
|
''' |
|
Update the colour of a single redaction box based on the values in a selection row |
|
(Optimized Version) |
|
''' |
|
|
|
|
|
if "color" not in review_df.columns: |
|
review_df["color"] = previous_colour if previous_id else '(0, 0, 0)' |
|
|
|
|
|
if "id" not in review_df.columns: |
|
|
|
|
|
|
|
print("Warning: 'id' column not found. Calling fill_missing_ids.") |
|
review_df = fill_missing_ids(review_df) |
|
|
|
|
|
|
|
if previous_id and previous_id in review_df["id"].values: |
|
review_df.loc[review_df["id"] == previous_id, "color"] = previous_colour |
|
|
|
|
|
|
|
|
|
|
|
review_df.loc[review_df["color"] == colour, "color"] = '(0, 0, 0)' |
|
|
|
|
|
if not redaction_row_selection.empty and not review_df.empty: |
|
use_id = ( |
|
"id" in redaction_row_selection.columns |
|
and "id" in review_df.columns |
|
and not redaction_row_selection["id"].isnull().all() |
|
and not review_df["id"].isnull().all() |
|
) |
|
|
|
selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] |
|
|
|
|
|
|
|
merged_reviews = review_df.merge( |
|
redaction_row_selection[selected_merge_cols], |
|
on=selected_merge_cols, |
|
how="inner" |
|
) |
|
|
|
if not merged_reviews.empty: |
|
|
|
|
|
|
|
new_previous_colour = str(merged_reviews["color"].iloc[0]) |
|
new_previous_id = merged_reviews["id"].iloc[0] |
|
|
|
|
|
|
|
if use_id: |
|
|
|
review_df.loc[review_df["id"].isin(merged_reviews["id"]), "color"] = colour |
|
else: |
|
|
|
|
|
def create_merge_key(df, cols): |
|
return df[cols].astype(str).agg('_'.join, axis=1) |
|
|
|
review_df_key = create_merge_key(review_df, selected_merge_cols) |
|
merged_reviews_key = create_merge_key(merged_reviews, selected_merge_cols) |
|
|
|
review_df.loc[review_df_key.isin(merged_reviews_key), "color"] = colour |
|
|
|
previous_colour = new_previous_colour |
|
previous_id = new_previous_id |
|
else: |
|
|
|
print("No reviews found matching selection criteria") |
|
|
|
|
|
|
|
previous_colour = '(0, 0, 0)' |
|
previous_id = '' |
|
|
|
else: |
|
|
|
review_df.loc[review_df["color"] == colour, "color"] = '(0, 0, 0)' |
|
previous_colour = '(0, 0, 0)' |
|
previous_id = '' |
|
|
|
|
|
|
|
|
|
if set(["image", "page", "label", "color", "xmin","ymin", "xmax", "ymax", "text", "id"]).issubset(review_df.columns): |
|
review_df = review_df[["image", "page", "label", "color", "xmin","ymin", "xmax", "ymax", "text", "id"]] |
|
else: |
|
print("Warning: Not all expected columns are present in review_df for reordering.") |
|
|
|
|
|
return review_df, previous_id, previous_colour |
|
|
|
def update_boxes_color(images: list, redaction_row_selection: pd.DataFrame, colour: tuple = (0, 255, 0)): |
|
""" |
|
Update the color of bounding boxes in the images list based on redaction_row_selection. |
|
|
|
Parameters: |
|
- images (list): List of dictionaries containing image paths and box metadata. |
|
- redaction_row_selection (pd.DataFrame): DataFrame with 'page', 'label', and optionally 'text' columns. |
|
- colour (tuple): RGB tuple for the new color. |
|
|
|
Returns: |
|
- Updated list with modified colors. |
|
""" |
|
|
|
selection_set = set(zip(redaction_row_selection["page"], redaction_row_selection["label"])) |
|
|
|
for page_idx, image_obj in enumerate(images): |
|
if "boxes" in image_obj: |
|
for box in image_obj["boxes"]: |
|
if (page_idx, box["label"]) in selection_set: |
|
box["color"] = colour |
|
|
|
return images |
|
|
|
def update_other_annotator_number_from_current(page_number_first_counter:int): |
|
return page_number_first_counter |
|
|
|
def convert_image_coords_to_adobe(pdf_page_width:float, pdf_page_height:float, image_width:float, image_height:float, x1:float, y1:float, x2:float, y2:float): |
|
''' |
|
Converts coordinates from image space to Adobe PDF space. |
|
|
|
Parameters: |
|
- pdf_page_width: Width of the PDF page |
|
- pdf_page_height: Height of the PDF page |
|
- image_width: Width of the source image |
|
- image_height: Height of the source image |
|
- x1, y1, x2, y2: Coordinates in image space |
|
- page_sizes: List of dicts containing sizes of page as pymupdf page or PIL image |
|
|
|
Returns: |
|
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space |
|
''' |
|
|
|
|
|
|
|
|
|
scale_width = pdf_page_width / image_width |
|
scale_height = pdf_page_height / image_height |
|
|
|
|
|
pdf_x1 = x1 * scale_width |
|
pdf_x2 = x2 * scale_width |
|
|
|
|
|
|
|
pdf_y1 = pdf_page_height - (y1 * scale_height) |
|
pdf_y2 = pdf_page_height - (y2 * scale_height) |
|
|
|
|
|
if pdf_y1 > pdf_y2: |
|
pdf_y1, pdf_y2 = pdf_y2, pdf_y1 |
|
|
|
return pdf_x1, pdf_y1, pdf_x2, pdf_y2 |
|
|
|
def convert_pymupdf_coords_to_adobe(x1: float, y1: float, x2: float, y2: float, pdf_page_height: float): |
|
""" |
|
Converts coordinates from PyMuPDF (fitz) space to Adobe PDF space. |
|
|
|
Parameters: |
|
- x1, y1, x2, y2: Coordinates in PyMuPDF space |
|
- pdf_page_height: Total height of the PDF page |
|
|
|
Returns: |
|
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space |
|
""" |
|
|
|
|
|
adobe_y1 = pdf_page_height - y2 |
|
adobe_y2 = pdf_page_height - y1 |
|
|
|
return x1, adobe_y1, x2, adobe_y2 |
|
|
|
def create_xfdf(review_file_df:pd.DataFrame, pdf_path:str, pymupdf_doc:object, image_paths:List[str], document_cropboxes:List=[], page_sizes:List[dict]=[]): |
|
''' |
|
Create an xfdf file from a review csv file and a pdf |
|
''' |
|
pages_are_images = True |
|
|
|
|
|
xfdf = Element('xfdf', xmlns="http://ns.adobe.com/xfdf/", xml_space="preserve") |
|
|
|
|
|
header = SubElement(xfdf, 'header') |
|
header.set('pdf-filepath', pdf_path) |
|
|
|
|
|
annots = SubElement(xfdf, 'annots') |
|
|
|
|
|
if page_sizes: |
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
|
|
|
pages_are_images = False |
|
|
|
if "mediabox_width" not in review_file_df.columns: |
|
review_file_df = review_file_df.merge(page_sizes_df, how="left", on = "page") |
|
|
|
|
|
if review_file_df["xmin"].max() <= 1 and review_file_df["xmax"].max() <= 1 and review_file_df["ymin"].max() <= 1 and review_file_df["ymax"].max() <= 1: |
|
review_file_df["xmin"] = review_file_df["xmin"] * review_file_df["mediabox_width"] |
|
review_file_df["xmax"] = review_file_df["xmax"] * review_file_df["mediabox_width"] |
|
review_file_df["ymin"] = review_file_df["ymin"] * review_file_df["mediabox_height"] |
|
review_file_df["ymax"] = review_file_df["ymax"] * review_file_df["mediabox_height"] |
|
|
|
|
|
if len(page_sizes_df.loc[page_sizes_df["mediabox_width"].isnull(),"mediabox_width"]) == len(page_sizes_df["mediabox_width"]): |
|
|
|
pages_are_images = True |
|
|
|
review_file_df = multiply_coordinates_by_page_sizes(review_file_df, page_sizes_df, xmin="xmin", xmax="xmax", ymin="ymin", ymax="ymax") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for _, row in review_file_df.iterrows(): |
|
page_num_reported = row["page"] |
|
page_python_format = int(row["page"])-1 |
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_python_format) |
|
|
|
|
|
if document_cropboxes: |
|
|
|
|
|
match = re.findall(r"[-+]?\d*\.\d+|\d+", document_cropboxes[page_python_format]) |
|
|
|
if match and len(match) == 4: |
|
rect_values = list(map(float, match)) |
|
pymupdf_page.set_cropbox(Rect(*rect_values)) |
|
else: |
|
raise ValueError(f"Invalid cropbox format: {document_cropboxes[page_python_format]}") |
|
else: |
|
print("Document cropboxes not found.") |
|
|
|
pdf_page_height = pymupdf_page.mediabox.height |
|
pdf_page_width = pymupdf_page.mediabox.width |
|
|
|
|
|
redact_annot = SubElement(annots, 'redact') |
|
|
|
|
|
annot_id = str(uuid.uuid4()) |
|
redact_annot.set('name', annot_id) |
|
|
|
|
|
redact_annot.set('page', str(int(row['page']) - 1)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
x1, y1, x2, y2 = convert_pymupdf_coords_to_adobe(row['xmin'], |
|
row['ymin'], |
|
row['xmax'], |
|
row['ymax'], pdf_page_height) |
|
|
|
if CUSTOM_BOX_COLOUR == "grey": |
|
colour_str = "0.5,0.5,0.5" |
|
else: |
|
colour_str = row['color'].strip('()').replace(' ', '') |
|
|
|
|
|
redact_annot.set('rect', f"{x1:.2f},{y1:.2f},{x2:.2f},{y2:.2f}") |
|
|
|
|
|
redact_annot.set('title', row['label']) |
|
redact_annot.set('contents', row['text']) |
|
redact_annot.set('subject', row['label']) |
|
redact_annot.set('mimetype', "Form") |
|
|
|
|
|
redact_annot.set('border-color', colour_str) |
|
redact_annot.set('repeat', 'false') |
|
redact_annot.set('interior-color', colour_str) |
|
|
|
|
|
|
|
|
|
redact_annot.set('opacity', "0.5") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
xml_str = minidom.parseString(tostring(xfdf)).toprettyxml(indent=" ") |
|
|
|
return xml_str |
|
|
|
def convert_df_to_xfdf(input_files:List[str], pdf_doc:Document, image_paths:List[str], output_folder:str = OUTPUT_FOLDER, document_cropboxes:List=[], page_sizes:List[dict]=[]): |
|
''' |
|
Load in files to convert a review file into an Adobe comment file format |
|
''' |
|
output_paths = [] |
|
pdf_name = "" |
|
file_path_name = "" |
|
|
|
if isinstance(input_files, str): |
|
file_paths_list = [input_files] |
|
else: |
|
file_paths_list = input_files |
|
|
|
|
|
file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) |
|
|
|
for file in file_paths_list: |
|
|
|
if isinstance(file, str): |
|
file_path = file |
|
else: |
|
file_path = file.name |
|
|
|
file_path_name = get_file_name_without_type(file_path) |
|
file_path_end = detect_file_type(file_path) |
|
|
|
if file_path_end == "pdf": |
|
pdf_name = os.path.basename(file_path) |
|
|
|
if file_path_end == "csv": |
|
|
|
if not pdf_name: |
|
pdf_name = file_path_name |
|
|
|
review_file_df = pd.read_csv(file_path) |
|
|
|
review_file_df.fillna('', inplace=True) |
|
|
|
xfdf_content = create_xfdf(review_file_df, pdf_name, pdf_doc, image_paths, document_cropboxes, page_sizes) |
|
|
|
output_path = output_folder + file_path_name + "_adobe.xfdf" |
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
f.write(xfdf_content) |
|
|
|
output_paths.append(output_path) |
|
|
|
return output_paths |
|
|
|
|
|
|
|
|
|
def convert_adobe_coords_to_image(pdf_page_width:float, pdf_page_height:float, image_width:float, image_height:float, x1:float, y1:float, x2:float, y2:float): |
|
''' |
|
Converts coordinates from Adobe PDF space to image space. |
|
|
|
Parameters: |
|
- pdf_page_width: Width of the PDF page |
|
- pdf_page_height: Height of the PDF page |
|
- image_width: Width of the source image |
|
- image_height: Height of the source image |
|
- x1, y1, x2, y2: Coordinates in Adobe PDF space |
|
|
|
Returns: |
|
- Tuple of converted coordinates (x1, y1, x2, y2) in image space |
|
''' |
|
|
|
|
|
scale_width = image_width / pdf_page_width |
|
scale_height = image_height / pdf_page_height |
|
|
|
|
|
image_x1 = x1 * scale_width |
|
image_x2 = x2 * scale_width |
|
|
|
|
|
|
|
image_y1 = (pdf_page_height - y1) * scale_height |
|
image_y2 = (pdf_page_height - y2) * scale_height |
|
|
|
|
|
if image_y1 > image_y2: |
|
image_y1, image_y2 = image_y2, image_y1 |
|
|
|
return image_x1, image_y1, image_x2, image_y2 |
|
|
|
def parse_xfdf(xfdf_path:str): |
|
''' |
|
Parse the XFDF file and extract redaction annotations. |
|
|
|
Parameters: |
|
- xfdf_path: Path to the XFDF file |
|
|
|
Returns: |
|
- List of dictionaries containing redaction information |
|
''' |
|
tree = parse(xfdf_path) |
|
root = tree.getroot() |
|
|
|
|
|
namespace = {'xfdf': 'http://ns.adobe.com/xfdf/'} |
|
|
|
redactions = [] |
|
|
|
|
|
for redact in root.findall('.//xfdf:redact', namespaces=namespace): |
|
|
|
redaction_info = { |
|
'image': '', |
|
'page': int(redact.get('page')) + 1, |
|
'xmin': float(redact.get('rect').split(',')[0]), |
|
'ymin': float(redact.get('rect').split(',')[1]), |
|
'xmax': float(redact.get('rect').split(',')[2]), |
|
'ymax': float(redact.get('rect').split(',')[3]), |
|
'label': redact.get('title'), |
|
'text': redact.get('contents'), |
|
'color': redact.get('border-color', '(0, 0, 0)') |
|
} |
|
redactions.append(redaction_info) |
|
|
|
return redactions |
|
|
|
def convert_xfdf_to_dataframe(file_paths_list:List[str], pymupdf_doc, image_paths:List[str], output_folder:str=OUTPUT_FOLDER): |
|
''' |
|
Convert redaction annotations from XFDF and associated images into a DataFrame. |
|
|
|
Parameters: |
|
- xfdf_path: Path to the XFDF file |
|
- pdf_doc: PyMuPDF document object |
|
- image_paths: List of PIL Image objects corresponding to PDF pages |
|
|
|
Returns: |
|
- DataFrame containing redaction information |
|
''' |
|
output_paths = [] |
|
xfdf_paths = [] |
|
df = pd.DataFrame() |
|
|
|
|
|
file_paths_list = sorted(file_paths_list, key=lambda x: (os.path.splitext(x)[1] != '.pdf', os.path.splitext(x)[1] != '.json')) |
|
|
|
for file in file_paths_list: |
|
|
|
if isinstance(file, str): |
|
file_path = file |
|
else: |
|
file_path = file.name |
|
|
|
file_path_name = get_file_name_without_type(file_path) |
|
file_path_end = detect_file_type(file_path) |
|
|
|
if file_path_end == "pdf": |
|
pdf_name = os.path.basename(file_path) |
|
|
|
|
|
output_paths.append(file_path) |
|
|
|
if file_path_end == "xfdf": |
|
|
|
if not pdf_name: |
|
message = "Original PDF needed to convert from .xfdf format" |
|
print(message) |
|
raise ValueError(message) |
|
xfdf_path = file |
|
|
|
file_path_name = get_file_name_without_type(xfdf_path) |
|
|
|
|
|
redactions = parse_xfdf(xfdf_path) |
|
|
|
|
|
df = pd.DataFrame(redactions) |
|
|
|
df.fillna('', inplace=True) |
|
|
|
for _, row in df.iterrows(): |
|
page_python_format = int(row["page"])-1 |
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_python_format) |
|
|
|
pdf_page_height = pymupdf_page.rect.height |
|
pdf_page_width = pymupdf_page.rect.width |
|
|
|
image_path = image_paths[page_python_format] |
|
|
|
if isinstance(image_path, str): |
|
image = Image.open(image_path) |
|
|
|
image_page_width, image_page_height = image.size |
|
|
|
|
|
image_x1, image_y1, image_x2, image_y2 = convert_adobe_coords_to_image(pdf_page_width, pdf_page_height, image_page_width, image_page_height, row['xmin'], row['ymin'], row['xmax'], row['ymax']) |
|
|
|
df.loc[_, ['xmin', 'ymin', 'xmax', 'ymax']] = [image_x1, image_y1, image_x2, image_y2] |
|
|
|
|
|
df.loc[_, 'image'] = image_path |
|
|
|
out_file_path = output_folder + file_path_name + "_review_file.csv" |
|
df.to_csv(out_file_path, index=None) |
|
|
|
output_paths.append(out_file_path) |
|
|
|
return output_paths |