|
import gradio as gr |
|
from datasets import load_dataset |
|
import numpy as np |
|
from model2vec import StaticModel |
|
from reach import Reach |
|
from difflib import ndiff |
|
|
|
|
|
model = StaticModel.from_pretrained("minishlab/M2V_base_output") |
|
|
|
|
|
default_dataset_name = "sst2" |
|
default_dataset_split = "train" |
|
default_text_column = "sentence" |
|
default_threshold = 0.9 |
|
|
|
def deduplicate_embeddings( |
|
embeddings_a: np.ndarray, |
|
embeddings_b: np.ndarray = None, |
|
threshold: float = 0.9, |
|
batch_size: int = 1024, |
|
progress=None |
|
): |
|
"""Deduplicate within one dataset or across two datasets.""" |
|
if embeddings_b is None: |
|
reach = Reach(vectors=embeddings_a, items=[str(i) for i in range(len(embeddings_a))]) |
|
duplicate_to_original = {} |
|
results = reach.nearest_neighbor_threshold( |
|
embeddings_a, threshold=threshold, batch_size=batch_size, show_progressbar=False |
|
) |
|
for i, similar_items in enumerate(progress.tqdm(results, desc="Processing duplicates", total=len(embeddings_a))): |
|
for sim_idx, _ in similar_items: |
|
sim_idx = int(sim_idx) |
|
if sim_idx != i and sim_idx not in duplicate_to_original: |
|
duplicate_to_original[sim_idx] = i |
|
deduplicated_indices = set(range(len(embeddings_a))) - set(duplicate_to_original.keys()) |
|
return deduplicated_indices, duplicate_to_original |
|
else: |
|
reach = Reach(vectors=embeddings_a, items=[str(i) for i in range(len(embeddings_a))]) |
|
duplicate_indices_in_b = [] |
|
duplicate_to_original = {} |
|
results = reach.nearest_neighbor_threshold( |
|
embeddings_b, threshold=threshold, batch_size=batch_size, show_progressbar=False |
|
) |
|
for i, similar_items in enumerate(progress.tqdm(results, desc="Processing duplicates", total=len(embeddings_b))): |
|
if similar_items: |
|
duplicate_indices_in_b.append(i) |
|
duplicate_to_original[i] = int(similar_items[0][0]) |
|
return duplicate_indices_in_b, duplicate_to_original |
|
|
|
def display_word_differences(x: str, y: str) -> str: |
|
"""Display differences between two texts.""" |
|
diff = ndiff(x.split(), y.split()) |
|
return " ".join(word for word in diff if word.startswith(("+", "-"))) |
|
|
|
def load_dataset_texts(dataset_name, dataset_split, text_column): |
|
"""Load texts from a specified dataset.""" |
|
ds = load_dataset(dataset_name, split=dataset_split) |
|
return [example[text_column] for example in ds] |
|
|
|
def perform_deduplication( |
|
deduplication_type, |
|
dataset1_name, |
|
dataset1_split, |
|
dataset1_text_column, |
|
dataset2_name="", |
|
dataset2_split="", |
|
dataset2_text_column="", |
|
threshold=default_threshold, |
|
progress=gr.Progress(track_tqdm=True), |
|
): |
|
try: |
|
threshold = float(threshold) |
|
|
|
|
|
yield "Loading Dataset 1...", "" |
|
texts1 = load_dataset_texts(dataset1_name, dataset1_split, dataset1_text_column) |
|
yield "Computing embeddings for Dataset 1...", "" |
|
|
|
embeddings1 = model.encode(texts1, show_progressbar=True) |
|
if deduplication_type == "Single dataset": |
|
|
|
yield "Deduplicating within Dataset 1...", "" |
|
deduplicated_indices, duplicate_mapping = deduplicate_embeddings( |
|
embeddings1, threshold=threshold, progress=progress |
|
) |
|
|
|
num_duplicates = len(duplicate_mapping) |
|
result_text = ( |
|
f"**Total documents:** {len(texts1)}\n\n" |
|
f"**Duplicates found:** {num_duplicates}\n\n" |
|
f"**Unique documents after deduplication:** {len(deduplicated_indices)}\n\n" |
|
) |
|
|
|
if num_duplicates > 0: |
|
result_text += "**Sample duplicates:**\n\n" |
|
for dup_idx, orig_idx in list(duplicate_mapping.items())[:5]: |
|
orig_text = texts1[orig_idx] |
|
dup_text = texts1[dup_idx] |
|
differences = display_word_differences(orig_text, dup_text) |
|
result_text += ( |
|
f"**Original:**\n{orig_text}\n\n" |
|
f"**Duplicate:**\n{dup_text}\n\n" |
|
f"**Differences:**\n{differences}\n" |
|
+ "-" * 50 + "\n\n" |
|
) |
|
else: |
|
result_text += "No duplicates found." |
|
|
|
yield "Deduplication completed.", result_text |
|
|
|
else: |
|
|
|
yield "Loading Dataset 2...", "" |
|
texts2 = load_dataset_texts(dataset2_name, dataset2_split, dataset2_text_column) |
|
yield "Computing embeddings for Dataset 2...", "" |
|
|
|
embeddings2 = model.encode(texts2, show_progressbar=True) |
|
|
|
yield "Deduplicating Dataset 2 against Dataset 1...", "" |
|
duplicate_indices, duplicate_mapping = deduplicate_embeddings( |
|
embeddings1, embeddings_b=embeddings2, threshold=threshold, progress=progress |
|
) |
|
|
|
num_duplicates = len(duplicate_indices) |
|
result_text = ( |
|
f"**Total documents in {dataset2_name}/{dataset2_split}:** {len(texts2)}\n\n" |
|
f"**Duplicates found in Dataset 2:** {num_duplicates}\n\n" |
|
f"**Unique documents after deduplication:** {len(texts2) - num_duplicates}\n\n" |
|
) |
|
|
|
if num_duplicates > 0: |
|
result_text += "**Sample duplicates from Dataset 2:**\n\n" |
|
for idx in duplicate_indices[:5]: |
|
orig_text = texts1[duplicate_mapping[idx]] |
|
dup_text = texts2[idx] |
|
differences = display_word_differences(orig_text, dup_text) |
|
result_text += ( |
|
f"**Original (Dataset 1):**\n{orig_text}\n\n" |
|
f"**Duplicate (Dataset 2):**\n{dup_text}\n\n" |
|
f"**Differences:**\n{differences}\n" |
|
+ "-" * 50 + "\n\n" |
|
) |
|
else: |
|
result_text += "No duplicates found." |
|
|
|
yield "Deduplication completed.", result_text |
|
|
|
except Exception as e: |
|
yield f"An error occurred: {e}", "" |
|
raise e |
|
|
|
with gr.Blocks(css="#status_output { height: 150px; overflow: auto; }") as demo: |
|
gr.Markdown("# Semantic Deduplication") |
|
|
|
deduplication_type = gr.Radio( |
|
choices=["Single dataset", "Cross-dataset"], |
|
label="Deduplication Type", |
|
value="Single dataset", |
|
) |
|
|
|
with gr.Row(): |
|
dataset1_name = gr.Textbox(value=default_dataset_name, label="Dataset 1 Name") |
|
dataset1_split = gr.Textbox(value=default_dataset_split, label="Dataset 1 Split") |
|
dataset1_text_column = gr.Textbox(value=default_text_column, label="Text Column Name") |
|
|
|
dataset2_inputs = gr.Column(visible=False) |
|
with dataset2_inputs: |
|
gr.Markdown("### Dataset 2") |
|
with gr.Row(): |
|
dataset2_name = gr.Textbox(value=default_dataset_name, label="Dataset 2 Name") |
|
dataset2_split = gr.Textbox(value=default_dataset_split, label="Dataset 2 Split") |
|
dataset2_text_column = gr.Textbox(value=default_text_column, label="Text Column Name") |
|
|
|
threshold = gr.Slider(0.0, 1.0, value=default_threshold, label="Similarity Threshold") |
|
compute_button = gr.Button("Compute") |
|
status_output = gr.Markdown(elem_id="status_output") |
|
result_output = gr.Markdown() |
|
|
|
def update_visibility(choice): |
|
return gr.update(visible=choice == "Cross-dataset") |
|
|
|
deduplication_type.change(update_visibility, inputs=deduplication_type, outputs=dataset2_inputs) |
|
|
|
compute_button.click( |
|
fn=perform_deduplication, |
|
inputs=[ |
|
deduplication_type, |
|
dataset1_name, |
|
dataset1_split, |
|
dataset1_text_column, |
|
dataset2_name, |
|
dataset2_split, |
|
dataset2_text_column, |
|
threshold, |
|
], |
|
outputs=[status_output, result_output], |
|
) |
|
|
|
demo.launch() |
|
|