import warnings from difflib import SequenceMatcher import nltk import numpy as np import torch from sentence_transformers import ( SentenceTransformer, util, ) from src.application.text.preprocessing import split_into_paragraphs from src.application.text.search import ( generate_search_phrases, search_by_google, ) from src.application.url_reader import URLReader warnings.simplefilter(action="ignore", category=FutureWarning) # Download necessary NLTK data files nltk.download("punkt", quiet=True) nltk.download("punkt_tab", quiet=True) nltk.download("stopwords", quiet=True) # load the model DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") PARAPHASE_MODEL = SentenceTransformer("paraphrase-MiniLM-L6-v2") PARAPHASE_MODEL.to(DEVICE) PARAPHRASE_THRESHOLD_HUMAN = 0.963 PARAPHRASE_THRESHOLD_MACHINE = 0.8 PARAPHRASE_THRESHOLD = 0.8 MIN_SAME_SENTENCE_LEN = 6 MIN_PHRASE_SENTENCE_LEN = 10 MIN_RATIO_PARAPHRASE_NUM = 0.5 MAX_CHAR_SIZE = 30000 def find_paragraph_source(text, text_index, sentences_df): checked_urls = set() searched_phrases = generate_search_phrases(text[text_index]) for candidate in searched_phrases: search_results = search_by_google(candidate) urls = [item["link"] for item in search_results.get("items", [])] for url in urls[:3]: if url in checked_urls: # visited url continue if "bbc.com" not in url: continue checked_urls.add(url) print(f"\t\tChecking URL: {url}") content = URLReader(url) if content.is_extracted is True: if content.title is None or content.text is None: print("\t\t\t↑↑↑ Title or text not found") continue page_text = content.title + "\n" + content.text if len(page_text) > MAX_CHAR_SIZE: print(f"\t\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters") continue print(f"\t\t\t↑↑↑ Title: {content.title}") aligned_sentence = check_paraphrase( text[text_index], page_text, url, ) if aligned_sentence["paraphrase"] is False: sentences_df.loc[text_index, "input"] = aligned_sentence[ "input" ] sentences_df.loc[text_index, "paraphrase"] = ( aligned_sentence["paraphrase"] ) return sentences_df, [] # assign values columns = [ "input", "source", "label", "similarity", "paraphrase", "url", ] for c in columns: if c in sentences_df.columns: sentences_df.loc[text_index, c] = aligned_sentence[c] for idx, _ in sentences_df.iterrows(): similarity = sentences_df.loc[idx, "similarity"] if similarity is not None: if similarity > PARAPHRASE_THRESHOLD_MACHINE: continue # find matched content in new url aligned_sentence = check_paraphrase( text[idx], page_text, url, ) if ( similarity is None or aligned_sentence["similarity"] > similarity ): columns = [ "input", "source", "label", "similarity", "url", ] for c in columns: if c in sentences_df.columns: sentences_df.loc[idx, c] = aligned_sentence[c] return sentences_df, content.images sentences_df.loc[text_index, "input"] = text[text_index] return sentences_df, [] def longest_common_subsequence(arr1, arr2): """ Finds the length of the longest common subsequence (contiguous) between two arrays. Args: arr1: The first array. arr2: The second array. Returns: The length of the longest common subsequence. Returns 0 if either input is invalid. """ if not isinstance(arr1, list) or not isinstance(arr2, list): return 0 n = len(arr1) m = len(arr2) if n == 0 or m == 0: # handle empty list return 0 # Create table dp with size (n+1) x (m+1) dp = [[0] * (m + 1) for _ in range(n + 1)] max_length = 0 for i in range(1, n + 1): for j in range(1, m + 1): if arr1[i - 1] == arr2[j - 1]: dp[i][j] = dp[i - 1][j - 1] + 1 max_length = max(max_length, dp[i][j]) else: dp[i][j] = 0 # set 0 since the array must be consecutive return max_length def check_sentence( input_sentence, source_sentence, min_same_sentence_len, min_phrase_sentence_len, verbose=False, ): """ Checks if two sentences are similar based on exact match or longest common subsequence. Args: input_sentence: The input sentence. source_sentence: The source sentence. min_same_sentence_len: Minimum length for exact sentence match. min_phrase_sentence_len: Minimum length for common subsequence match. verbose: If True, print debug information. Returns: True if the sentences are considered similar, False otherwise. Returns False if input is not valid. """ if not isinstance(input_sentence, str) or not isinstance( source_sentence, str, ): return False input_sentence = input_sentence.strip() source_sentence = source_sentence.strip() if not input_sentence or not source_sentence: # handle empty string return False input_words = input_sentence.split() # split without arguments source_words = source_sentence.split() # split without arguments if ( input_sentence == source_sentence and len(input_words) >= min_same_sentence_len ): if verbose: print("Exact match found.") return True max_overlap_len = longest_common_subsequence(input_words, source_words) if verbose: print(f"Max overlap length: {max_overlap_len}") # print overlap length if max_overlap_len >= min_phrase_sentence_len: return True return False def check_paraphrase(input_text, page_text, url): """ Checks if the input text is paraphrased in the content at the given URL. Args: input_text: The text to check for paraphrase. page_text: The text of the web page to compare with. url Returns: A tuple containing: """ # Extract sentences from input text and web page input_paragraphs = [input_text] if not page_text: return {} page_paragraphs = split_into_paragraphs(page_text) if not input_paragraphs or not page_paragraphs: return {} additional_sentences = [] for sentence in page_paragraphs: if ", external" in sentence: additional_sentences.append(sentence.replace(", external", "")) page_paragraphs.extend(additional_sentences) # Encode sentences into embeddings embeddings1 = PARAPHASE_MODEL.encode( input_paragraphs, convert_to_tensor=True, device=DEVICE, show_progress_bar=False, ) embeddings2 = PARAPHASE_MODEL.encode( page_paragraphs, convert_to_tensor=True, device=DEVICE, show_progress_bar=False, ) # Compute cosine similarity matrix similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy() # Find sentence alignments alignment = {} for i, paragraph in enumerate(input_paragraphs): max_sim_index = np.argmax(similarity_matrix[i]) max_similarity = similarity_matrix[i][max_sim_index] label, is_paraphrased = determine_label(max_similarity) best_matched_paragraph = page_paragraphs[max_sim_index] alignment = { "input": paragraph, "source": best_matched_paragraph, "similarity": max_similarity, "label": label, "paraphrase": is_paraphrased, "url": url, } print(f"Result: [{alignment["similarity"]}] {alignment["source"]}") return alignment def similarity_ratio(a, b): """ Calculates the similarity ratio between two strings using SequenceMatcher. Args: a: The first string. b: The second string. Returns: A float representing the similarity ratio between 0.0 and 1.0. Returns 0.0 if either input is None or not a string. """ if ( not isinstance(a, str) or not isinstance(b, str) or a is None or b is None ): return 0.0 # Handle cases where inputs are not strings or None return SequenceMatcher(None, a, b).ratio() def check_human(alligned_sentences): """ Checks if a sufficient number of input sentences are found within source sentences. Returns: bool: True if the condition is met, False otherwise. """ if not alligned_sentences: # Handle empty data case return False if alligned_sentences["similarity"] >= 0.99: return True return False def determine_label(similarity): if similarity >= PARAPHRASE_THRESHOLD_HUMAN: return "HUMAN", True elif similarity >= PARAPHRASE_THRESHOLD_MACHINE: return "MACHINE", True else: return None, False if __name__ == "__main__": pass