Major update. General code revision. Improved config variables. Dataframe based review frame now includes text, items can be searched and excluded. Costs now estimated. Option for adding cost codes added. Option to extract text only.
0ea8b9e
import pandas as pd | |
#import argparse | |
#import glob | |
import os | |
import re | |
from tools.helper_functions import OUTPUT_FOLDER | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
# import nltk | |
# from nltk.corpus import stopwords | |
# from nltk.tokenize import word_tokenize | |
# from nltk.stem import PorterStemmer | |
#import spacy | |
import numpy as np | |
import random | |
import string | |
from typing import List | |
from gradio import Progress | |
import en_core_web_lg #en_core_web_sm | |
nlp = en_core_web_lg.load() | |
#from tqdm import tqdm | |
# nltk.download('punkt') | |
# nltk.download('stopwords') | |
# nltk.download('punkt_tab') | |
similarity_threshold = 0.9 | |
def combine_ocr_output_text(input_files:List[str], output_folder:str=OUTPUT_FOLDER): | |
""" | |
Combines text from multiple CSV files containing page and text columns. | |
Groups text by file and page number, concatenating text within these groups. | |
Args: | |
input_files (list): List of paths to CSV files | |
Returns: | |
pd.DataFrame: Combined dataframe with columns [file, page, text] | |
""" | |
all_data = [] | |
output_files = [] | |
if isinstance(input_files, str): | |
file_paths_list = [input_files] | |
else: | |
file_paths_list = input_files | |
for file in file_paths_list: | |
if isinstance(file, str): | |
file_path = file | |
else: | |
file_path = file.name | |
# Read CSV file | |
df = pd.read_csv(file_path) | |
# Ensure required columns exist | |
if 'page' not in df.columns or 'text' not in df.columns: | |
print(f"Warning: Skipping {file_path} - missing required columns 'page' and 'text'") | |
continue | |
df['text'] = df['text'].fillna('').astype(str) | |
# Group by page and concatenate text | |
grouped = df.groupby('page')['text'].apply(' '.join).reset_index() | |
# Add filename column | |
grouped['file'] = os.path.basename(file_path) | |
all_data.append(grouped) | |
if not all_data: | |
raise ValueError("No valid CSV files were processed") | |
# Combine all dataframes | |
combined_df = pd.concat(all_data, ignore_index=True) | |
# Reorder columns | |
combined_df = combined_df[['file', 'page', 'text']] | |
output_combined_file_path = output_folder + "combined_ocr_output_files.csv" | |
combined_df.to_csv(output_combined_file_path, index=None) | |
output_files.append(output_combined_file_path) | |
return combined_df, output_files | |
def process_data(df:pd.DataFrame, column:str): | |
''' | |
Clean and stem text columns in a data frame | |
''' | |
def _clean_text(raw_text): | |
# Remove HTML tags | |
clean = re.sub(r'<.*?>', '', raw_text) | |
# clean = re.sub(r' ', ' ', clean) | |
# clean = re.sub(r'\r\n', ' ', clean) | |
# clean = re.sub(r'<', ' ', clean) | |
# clean = re.sub(r'>', ' ', clean) | |
# clean = re.sub(r'<strong>', ' ', clean) | |
# clean = re.sub(r'</strong>', ' ', clean) | |
# Replace non-breaking space \xa0 with a space | |
# clean = clean.replace(u'\xa0', u' ') | |
# Remove extra whitespace | |
clean = ' '.join(clean.split()) | |
# # Tokenize the text | |
# words = word_tokenize(clean.lower()) | |
# # Remove punctuation and numbers | |
# words = [word for word in words if word.isalpha()] | |
# # Remove stopwords | |
# words = [word for word in words if word not in stop_words] | |
# Join the cleaned words back into a string | |
return clean | |
# Function to apply lemmatization and remove stopwords | |
def _apply_lemmatization(text): | |
doc = nlp(text) | |
# Keep only alphabetic tokens and remove stopwords | |
lemmatized_words = [token.lemma_ for token in doc if token.is_alpha and not token.is_stop] | |
return ' '.join(lemmatized_words) | |
df['text_clean'] = df[column].apply(_clean_text) | |
df['text_clean'] = df['text_clean'].apply(_apply_lemmatization) | |
return df | |
def identify_similar_pages(input_files: List[str], similarity_threshold: float = 0.9, output_folder:str=OUTPUT_FOLDER, progress=Progress(track_tqdm=True)): | |
output_paths = [] | |
progress(0.1, desc="Cleaning input texts") | |
# Load and clean data | |
df, output_files = combine_ocr_output_text(input_files) | |
output_paths.extend(output_files) | |
df = process_data(df, 'text') # Assume this returns 'text_clean', 'file', and 'page' columns | |
# Vectorize text | |
vectorizer = TfidfVectorizer() | |
tfidf_matrix = vectorizer.fit_transform(df['text_clean']) | |
progress(0.3, desc="Calculating text similarity") | |
# Compute sparse cosine similarity | |
similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False) # Keep sparse format | |
# Extract indices of similar pages above threshold | |
coo_matrix = similarity_matrix.tocoo() | |
similar_pages = np.array([(i, j, v) for i, j, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data) if v > similarity_threshold]) | |
if similar_pages.size == 0: | |
return pd.DataFrame(), output_paths # Return empty if no matches | |
# Create a DataFrame for similar pairs | |
similarity_df = pd.DataFrame(similar_pages, columns=['Page1_Index', 'Page2_Index', 'Similarity_Score']) | |
# Remove duplicate pairs (keep one direction) | |
similarity_df = similarity_df[similarity_df['Page1_Index'] < similarity_df['Page2_Index']] | |
progress(0.8, desc="Mapping back results") | |
# Map indices to metadata | |
# index_map = df[['file', 'page', 'text']].to_dict(orient='index') | |
# similarity_df['Page1_File'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['file']) | |
# similarity_df['Page2_File'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['file']) | |
# similarity_df['Page1_Page'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['page']) | |
# similarity_df['Page2_Page'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['page']) | |
# similarity_df['Page1_Text'] = similarity_df['Page1_Index'].map(lambda x: index_map[x]['text'][0:200]) | |
# similarity_df['Page2_Text'] = similarity_df['Page2_Index'].map(lambda x: index_map[x]['text'][0:200]) | |
# Create a DataFrame with the metadata | |
metadata_df = df[['file', 'page', 'text']].reset_index() | |
# Merge to get the metadata for Page1 | |
similarity_df = similarity_df.merge(metadata_df, left_on='Page1_Index', right_on='index', suffixes=('', '_Page1')) | |
similarity_df = similarity_df.rename(columns={'file': 'Page1_File', 'page': 'Page1_Page', 'text': 'Page1_Text'}) | |
# Merge to get the metadata for Page2 | |
similarity_df = similarity_df.merge(metadata_df, left_on='Page2_Index', right_on='index', suffixes=('', '_Page2')) | |
similarity_df = similarity_df.rename(columns={'file': 'Page2_File', 'page': 'Page2_Page', 'text': 'Page2_Text'}) | |
# Optionally, drop the index columns if not needed | |
#similarity_df = similarity_df.drop(columns=['index_Page1', 'index_Page2']) | |
similarity_df["Similarity_Score"] = similarity_df["Similarity_Score"].round(3) | |
# Sort results | |
similarity_df_out = similarity_df[['Page1_File', 'Page1_Page', 'Page2_File', 'Page2_Page', 'Similarity_Score', 'Page1_Text', 'Page2_Text']] | |
similarity_df_out = similarity_df_out.sort_values(["Page1_File", "Page1_Page", "Page2_File", "Page2_Page", "Similarity_Score"], ascending=[True, True, True, True, False]) | |
similarity_df_out['Page1_Text'] = similarity_df_out['Page1_Text'][0:100] | |
similarity_df_out['Page2_Text'] = similarity_df_out['Page2_Text'][0:100] | |
progress(0.8, desc="Saving output files") | |
# Save results | |
similarity_file_output_path = output_folder + 'page_similarity_results.csv' | |
similarity_df_out.to_csv(similarity_file_output_path, index=False) | |
output_paths.append(similarity_file_output_path) | |
# Save per-file redaction lists | |
for redact_file in similarity_df_out['Page2_File'].unique(): | |
output_file_name = output_folder + redact_file + "_whole_page.csv" | |
whole_pages_to_redact_df = similarity_df_out.loc[similarity_df_out['Page2_File'] == redact_file, ['Page2_Page']].drop_duplicates(['Page2_Page']).sort_values('Page2_Page') | |
whole_pages_to_redact_df.to_csv(output_file_name, header=False, index=False) | |
output_paths.append(output_file_name) | |
return similarity_df_out, output_paths | |
# Perturb text | |
# Apply the perturbation function with a 10% error probability | |
def perturb_text_with_errors(series:pd.Series): | |
def _perturb_text(text, error_probability=0.1): | |
words = text.split() # Split text into words | |
perturbed_words = [] | |
for word in words: | |
if random.random() < error_probability: # Add a random error | |
perturbation_type = random.choice(['char_error', 'extra_space', 'extra_punctuation']) | |
if perturbation_type == 'char_error': # Introduce a character error | |
idx = random.randint(0, len(word) - 1) | |
char = random.choice(string.ascii_lowercase) # Add a random letter | |
word = word[:idx] + char + word[idx:] | |
elif perturbation_type == 'extra_space': # Add extra space around a word | |
word = ' ' + word + ' ' | |
elif perturbation_type == 'extra_punctuation': # Add punctuation to the word | |
punctuation = random.choice(string.punctuation) | |
idx = random.randint(0, len(word)) # Insert punctuation randomly | |
word = word[:idx] + punctuation + word[idx:] | |
perturbed_words.append(word) | |
return ' '.join(perturbed_words) | |
series = series.apply(lambda x: _perturb_text(x, error_probability=0.1)) | |
return series | |