document_redaction / tools /find_duplicate_pages.py
seanpedrickcase's picture
Major update. General code revision. Improved config variables. Dataframe based review frame now includes text, items can be searched and excluded. Costs now estimated. Option for adding cost codes added. Option to extract text only.
0ea8b9e
raw
history blame contribute delete
9.87 kB
import pandas as pd
#import argparse
#import glob
import os
import re
from tools.helper_functions import OUTPUT_FOLDER
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# import nltk
# from nltk.corpus import stopwords
# from nltk.tokenize import word_tokenize
# from nltk.stem import PorterStemmer
#import spacy
import numpy as np
import random
import string
from typing import List
from gradio import Progress
import en_core_web_lg #en_core_web_sm
nlp = en_core_web_lg.load()
#from tqdm import tqdm
# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('punkt_tab')
similarity_threshold = 0.9
def combine_ocr_output_text(input_files:List[str], output_folder:str=OUTPUT_FOLDER):
"""
Combines text from multiple CSV files containing page and text columns.
Groups text by file and page number, concatenating text within these groups.
Args:
input_files (list): List of paths to CSV files
Returns:
pd.DataFrame: Combined dataframe with columns [file, page, text]
"""
all_data = []
output_files = []
if isinstance(input_files, str):
file_paths_list = [input_files]
else:
file_paths_list = input_files
for file in file_paths_list:
if isinstance(file, str):
file_path = file
else:
file_path = file.name
# Read CSV file
df = pd.read_csv(file_path)
# Ensure required columns exist
if 'page' not in df.columns or 'text' not in df.columns:
print(f"Warning: Skipping {file_path} - missing required columns 'page' and 'text'")
continue
df['text'] = df['text'].fillna('').astype(str)
# Group by page and concatenate text
grouped = df.groupby('page')['text'].apply(' '.join).reset_index()
# Add filename column
grouped['file'] = os.path.basename(file_path)
all_data.append(grouped)
if not all_data:
raise ValueError("No valid CSV files were processed")
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=True)
# Reorder columns
combined_df = combined_df[['file', 'page', 'text']]
output_combined_file_path = output_folder + "combined_ocr_output_files.csv"
combined_df.to_csv(output_combined_file_path, index=None)
output_files.append(output_combined_file_path)
return combined_df, output_files
def process_data(df:pd.DataFrame, column:str):
'''
Clean and stem text columns in a data frame
'''
def _clean_text(raw_text):
# Remove HTML tags
clean = re.sub(r'<.*?>', '', raw_text)
# clean = re.sub(r'&nbsp;', ' ', clean)
# clean = re.sub(r'\r\n', ' ', clean)
# clean = re.sub(r'&lt;', ' ', clean)
# clean = re.sub(r'&gt;', ' ', clean)
# clean = re.sub(r'<strong>', ' ', clean)
# clean = re.sub(r'</strong>', ' ', clean)
# Replace non-breaking space \xa0 with a space
# clean = clean.replace(u'\xa0', u' ')
# Remove extra whitespace
clean = ' '.join(clean.split())
# # Tokenize the text
# words = word_tokenize(clean.lower())
# # Remove punctuation and numbers
# words = [word for word in words if word.isalpha()]
# # Remove stopwords
# words = [word for word in words if word not in stop_words]
# Join the cleaned words back into a string
return clean
# Function to apply lemmatization and remove stopwords
def _apply_lemmatization(text):
doc = nlp(text)
# Keep only alphabetic tokens and remove stopwords
lemmatized_words = [token.lemma_ for token in doc if token.is_alpha and not token.is_stop]
return ' '.join(lemmatized_words)
df['text_clean'] = df[column].apply(_clean_text)
df['text_clean'] = df['text_clean'].apply(_apply_lemmatization)
return df
def identify_similar_pages(input_files: List[str], similarity_threshold: float = 0.9, output_folder:str=OUTPUT_FOLDER, progress=Progress(track_tqdm=True)):
output_paths = []
progress(0.1, desc="Cleaning input texts")
# Load and clean data
df, output_files = combine_ocr_output_text(input_files)
output_paths.extend(output_files)
df = process_data(df, 'text') # Assume this returns 'text_clean', 'file', and 'page' columns
# Vectorize text
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(df['text_clean'])
progress(0.3, desc="Calculating text similarity")
# Compute sparse cosine similarity
similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False) # Keep sparse format
# Extract indices of similar pages above threshold
coo_matrix = similarity_matrix.tocoo()
similar_pages = np.array([(i, j, v) for i, j, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data) if v > similarity_threshold])
if similar_pages.size == 0:
return pd.DataFrame(), output_paths # Return empty if no matches
# Create a DataFrame for similar pairs
similarity_df = pd.DataFrame(similar_pages, columns=['Page1_Index', 'Page2_Index', 'Similarity_Score'])
# Remove duplicate pairs (keep one direction)
similarity_df = similarity_df[similarity_df['Page1_Index'] < similarity_df['Page2_Index']]
progress(0.8, desc="Mapping back results")
# Map indices to metadata
# index_map = df[['file', 'page', 'text']].to_dict(orient='index')
# similarity_df['Page1_File'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['file'])
# similarity_df['Page2_File'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['file'])
# similarity_df['Page1_Page'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['page'])
# similarity_df['Page2_Page'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['page'])
# similarity_df['Page1_Text'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['text'][0:200])
# similarity_df['Page2_Text'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['text'][0:200])
# Create a DataFrame with the metadata
metadata_df = df[['file', 'page', 'text']].reset_index()
# Merge to get the metadata for Page1
similarity_df = similarity_df.merge(metadata_df, left_on='Page1_Index', right_on='index', suffixes=('', '_Page1'))
similarity_df = similarity_df.rename(columns={'file': 'Page1_File', 'page': 'Page1_Page', 'text': 'Page1_Text'})
# Merge to get the metadata for Page2
similarity_df = similarity_df.merge(metadata_df, left_on='Page2_Index', right_on='index', suffixes=('', '_Page2'))
similarity_df = similarity_df.rename(columns={'file': 'Page2_File', 'page': 'Page2_Page', 'text': 'Page2_Text'})
# Optionally, drop the index columns if not needed
#similarity_df = similarity_df.drop(columns=['index_Page1', 'index_Page2'])
similarity_df["Similarity_Score"] = similarity_df["Similarity_Score"].round(3)
# Sort results
similarity_df_out = similarity_df[['Page1_File', 'Page1_Page', 'Page2_File', 'Page2_Page', 'Similarity_Score', 'Page1_Text', 'Page2_Text']]
similarity_df_out = similarity_df_out.sort_values(["Page1_File", "Page1_Page", "Page2_File", "Page2_Page", "Similarity_Score"], ascending=[True, True, True, True, False])
similarity_df_out['Page1_Text'] = similarity_df_out['Page1_Text'][0:100]
similarity_df_out['Page2_Text'] = similarity_df_out['Page2_Text'][0:100]
progress(0.8, desc="Saving output files")
# Save results
similarity_file_output_path = output_folder + 'page_similarity_results.csv'
similarity_df_out.to_csv(similarity_file_output_path, index=False)
output_paths.append(similarity_file_output_path)
# Save per-file redaction lists
for redact_file in similarity_df_out['Page2_File'].unique():
output_file_name = output_folder + redact_file + "_whole_page.csv"
whole_pages_to_redact_df = similarity_df_out.loc[similarity_df_out['Page2_File'] == redact_file, ['Page2_Page']].drop_duplicates(['Page2_Page']).sort_values('Page2_Page')
whole_pages_to_redact_df.to_csv(output_file_name, header=False, index=False)
output_paths.append(output_file_name)
return similarity_df_out, output_paths
# Perturb text
# Apply the perturbation function with a 10% error probability
def perturb_text_with_errors(series:pd.Series):
def _perturb_text(text, error_probability=0.1):
words = text.split() # Split text into words
perturbed_words = []
for word in words:
if random.random() < error_probability: # Add a random error
perturbation_type = random.choice(['char_error', 'extra_space', 'extra_punctuation'])
if perturbation_type == 'char_error': # Introduce a character error
idx = random.randint(0, len(word) - 1)
char = random.choice(string.ascii_lowercase) # Add a random letter
word = word[:idx] + char + word[idx:]
elif perturbation_type == 'extra_space': # Add extra space around a word
word = ' ' + word + ' '
elif perturbation_type == 'extra_punctuation': # Add punctuation to the word
punctuation = random.choice(string.punctuation)
idx = random.randint(0, len(word)) # Insert punctuation randomly
word = word[:idx] + punctuation + word[idx:]
perturbed_words.append(word)
return ' '.join(perturbed_words)
series = series.apply(lambda x: _perturb_text(x, error_probability=0.1))
return series