Spaces:
Sleeping
Sleeping
import warnings | |
from difflib import SequenceMatcher | |
import nltk | |
import numpy as np | |
import torch | |
from sentence_transformers import ( | |
SentenceTransformer, | |
util, | |
) | |
from src.application.text.preprocessing import split_into_paragraphs | |
from src.application.text.search import ( | |
generate_search_phrases, | |
search_by_google, | |
) | |
from src.application.url_reader import URLReader | |
warnings.simplefilter(action="ignore", category=FutureWarning) | |
# Download necessary NLTK data files | |
nltk.download("punkt", quiet=True) | |
nltk.download("punkt_tab", quiet=True) | |
nltk.download("stopwords", quiet=True) | |
# load the model | |
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
PARAPHASE_MODEL = SentenceTransformer("paraphrase-MiniLM-L6-v2") | |
PARAPHASE_MODEL.to(DEVICE) | |
PARAPHRASE_THRESHOLD_HUMAN = 0.963 | |
PARAPHRASE_THRESHOLD_MACHINE = 0.8 | |
PARAPHRASE_THRESHOLD = 0.8 | |
MIN_SAME_SENTENCE_LEN = 6 | |
MIN_PHRASE_SENTENCE_LEN = 10 | |
MIN_RATIO_PARAPHRASE_NUM = 0.5 | |
MAX_CHAR_SIZE = 30000 | |
def find_paragraph_source(text, text_index, sentences_df): | |
checked_urls = set() | |
searched_phrases = generate_search_phrases(text[text_index]) | |
for candidate in searched_phrases: | |
search_results = search_by_google(candidate) | |
urls = [item["link"] for item in search_results.get("items", [])] | |
for url in urls[:3]: | |
if url in checked_urls: # visited url | |
continue | |
if "bbc.com" not in url: | |
continue | |
checked_urls.add(url) | |
print(f"\t\tChecking URL: {url}") | |
content = URLReader(url) | |
if content.is_extracted is True: | |
if content.title is None or content.text is None: | |
print("\t\t\tβββ Title or text not found") | |
continue | |
page_text = content.title + "\n" + content.text | |
if len(page_text) > MAX_CHAR_SIZE: | |
print(f"\t\t\tβββ More than {MAX_CHAR_SIZE} characters") | |
continue | |
print(f"\t\t\tβββ Title: {content.title}") | |
aligned_sentence = check_paraphrase( | |
text[text_index], | |
page_text, | |
url, | |
) | |
if aligned_sentence["paraphrase"] is False: | |
sentences_df.loc[text_index, "input"] = aligned_sentence[ | |
"input" | |
] | |
sentences_df.loc[text_index, "paraphrase"] = ( | |
aligned_sentence["paraphrase"] | |
) | |
return sentences_df, [] | |
# assign values | |
columns = [ | |
"input", | |
"source", | |
"label", | |
"similarity", | |
"paraphrase", | |
"url", | |
] | |
for c in columns: | |
if c in sentences_df.columns: | |
sentences_df.loc[text_index, c] = aligned_sentence[c] | |
for idx, _ in sentences_df.iterrows(): | |
similarity = sentences_df.loc[idx, "similarity"] | |
if similarity is not None: | |
if similarity > PARAPHRASE_THRESHOLD_MACHINE: | |
continue | |
# find matched content in new url | |
aligned_sentence = check_paraphrase( | |
text[idx], | |
page_text, | |
url, | |
) | |
if ( | |
similarity is None | |
or aligned_sentence["similarity"] > similarity | |
): | |
columns = [ | |
"input", | |
"source", | |
"label", | |
"similarity", | |
"url", | |
] | |
for c in columns: | |
if c in sentences_df.columns: | |
sentences_df.loc[idx, c] = aligned_sentence[c] | |
return sentences_df, content.images | |
sentences_df.loc[text_index, "input"] = text[text_index] | |
return sentences_df, [] | |
def longest_common_subsequence(arr1, arr2): | |
""" | |
Finds the length of the longest common subsequence (contiguous) between | |
two arrays. | |
Args: | |
arr1: The first array. | |
arr2: The second array. | |
Returns: | |
The length of the longest common subsequence. | |
Returns 0 if either input is invalid. | |
""" | |
if not isinstance(arr1, list) or not isinstance(arr2, list): | |
return 0 | |
n = len(arr1) | |
m = len(arr2) | |
if n == 0 or m == 0: # handle empty list | |
return 0 | |
# Create table dp with size (n+1) x (m+1) | |
dp = [[0] * (m + 1) for _ in range(n + 1)] | |
max_length = 0 | |
for i in range(1, n + 1): | |
for j in range(1, m + 1): | |
if arr1[i - 1] == arr2[j - 1]: | |
dp[i][j] = dp[i - 1][j - 1] + 1 | |
max_length = max(max_length, dp[i][j]) | |
else: | |
dp[i][j] = 0 # set 0 since the array must be consecutive | |
return max_length | |
def check_sentence( | |
input_sentence, | |
source_sentence, | |
min_same_sentence_len, | |
min_phrase_sentence_len, | |
verbose=False, | |
): | |
""" | |
Checks if two sentences are similar based on exact match or | |
longest common subsequence. | |
Args: | |
input_sentence: The input sentence. | |
source_sentence: The source sentence. | |
min_same_sentence_len: Minimum length for exact sentence match. | |
min_phrase_sentence_len: Minimum length for common subsequence match. | |
verbose: If True, print debug information. | |
Returns: | |
True if the sentences are considered similar, False otherwise. | |
Returns False if input is not valid. | |
""" | |
if not isinstance(input_sentence, str) or not isinstance( | |
source_sentence, | |
str, | |
): | |
return False | |
input_sentence = input_sentence.strip() | |
source_sentence = source_sentence.strip() | |
if not input_sentence or not source_sentence: # handle empty string | |
return False | |
input_words = input_sentence.split() # split without arguments | |
source_words = source_sentence.split() # split without arguments | |
if ( | |
input_sentence == source_sentence | |
and len(input_words) >= min_same_sentence_len | |
): | |
if verbose: | |
print("Exact match found.") | |
return True | |
max_overlap_len = longest_common_subsequence(input_words, source_words) | |
if verbose: | |
print(f"Max overlap length: {max_overlap_len}") # print overlap length | |
if max_overlap_len >= min_phrase_sentence_len: | |
return True | |
return False | |
def check_paraphrase(input_text, page_text, url): | |
""" | |
Checks if the input text is paraphrased in the content at the given URL. | |
Args: | |
input_text: The text to check for paraphrase. | |
page_text: The text of the web page to compare with. | |
url | |
Returns: | |
A tuple containing: | |
""" | |
# Extract sentences from input text and web page | |
input_paragraphs = [input_text] | |
if not page_text: | |
return {} | |
page_paragraphs = split_into_paragraphs(page_text) | |
if not input_paragraphs or not page_paragraphs: | |
return {} | |
additional_sentences = [] | |
for sentence in page_paragraphs: | |
if ", external" in sentence: | |
additional_sentences.append(sentence.replace(", external", "")) | |
page_paragraphs.extend(additional_sentences) | |
# Encode sentences into embeddings | |
embeddings1 = PARAPHASE_MODEL.encode( | |
input_paragraphs, | |
convert_to_tensor=True, | |
device=DEVICE, | |
show_progress_bar=False, | |
) | |
embeddings2 = PARAPHASE_MODEL.encode( | |
page_paragraphs, | |
convert_to_tensor=True, | |
device=DEVICE, | |
show_progress_bar=False, | |
) | |
# Compute cosine similarity matrix | |
similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy() | |
# Find sentence alignments | |
alignment = {} | |
for i, paragraph in enumerate(input_paragraphs): | |
max_sim_index = np.argmax(similarity_matrix[i]) | |
max_similarity = similarity_matrix[i][max_sim_index] | |
label, is_paraphrased = determine_label(max_similarity) | |
best_matched_paragraph = page_paragraphs[max_sim_index] | |
alignment = { | |
"input": paragraph, | |
"source": best_matched_paragraph, | |
"similarity": max_similarity, | |
"label": label, | |
"paraphrase": is_paraphrased, | |
"url": url, | |
} | |
print(f"Result: [{alignment["similarity"]}] {alignment["source"]}") | |
return alignment | |
def similarity_ratio(a, b): | |
""" | |
Calculates the similarity ratio between two strings using SequenceMatcher. | |
Args: | |
a: The first string. | |
b: The second string. | |
Returns: | |
A float representing the similarity ratio between 0.0 and 1.0. | |
Returns 0.0 if either input is None or not a string. | |
""" | |
if ( | |
not isinstance(a, str) | |
or not isinstance(b, str) | |
or a is None | |
or b is None | |
): | |
return 0.0 # Handle cases where inputs are not strings or None | |
return SequenceMatcher(None, a, b).ratio() | |
def check_human(alligned_sentences): | |
""" | |
Checks if a sufficient number of input sentences are found within | |
source sentences. | |
Returns: | |
bool: True if the condition is met, False otherwise. | |
""" | |
if not alligned_sentences: # Handle empty data case | |
return False | |
if alligned_sentences["similarity"] >= 0.99: | |
return True | |
return False | |
def determine_label(similarity): | |
if similarity >= PARAPHRASE_THRESHOLD_HUMAN: | |
return "HUMAN", True | |
elif similarity >= PARAPHRASE_THRESHOLD_MACHINE: | |
return "MACHINE", True | |
else: | |
return None, False | |
if __name__ == "__main__": | |
pass | |