news_verification / src /application /text /search_detection.py
pmkhanh7890's picture
revise demo
d952fbe
raw
history blame
10.2 kB
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from src.application.text.preprocessing import split_into_sentences
from src.application.text.search import generate_search_phrases, search_by_google
from src.application.url_reader import URLReader
import numpy as np
import nltk
import torch
from nltk.corpus import stopwords
from sentence_transformers import SentenceTransformer, util
import math
from difflib import SequenceMatcher
# Download necessary NLTK data files
nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True)
nltk.download('stopwords', quiet=True)
# load the model
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PARAPHASE_MODEL = SentenceTransformer('paraphrase-MiniLM-L6-v2')
PARAPHASE_MODEL.to(DEVICE)
BATCH_SIZE = 8
PARAPHRASE_THRESHOLD = 0.8
PARAPHRASE_THRESHOLD_FOR_OPPOSITE = 0.7
MIN_SAME_SENTENCE_LEN = 6
MIN_PHRASE_SENTENCE_LEN = 10
MIN_RATIO_PARAPHRASE_NUM = 0.7
MAX_CHAR_SIZE = 30000
def detect_text_by_relative_search(input_text, is_support_opposite = False):
checked_urls = set()
searched_phrases = generate_search_phrases(input_text)
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item['link'] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # visited url
continue
checked_urls.add(url)
print(f"\t\tChecking URL: {url}")
content = URLReader(url)
if content.is_extracted is True:
if content.title is None or content.text is None:
print(f"\t\t\t↑↑↑ Title or text not found")
continue
page_text = content.title + "\n" + content.text
if len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters")
continue
is_paraphrase, aligned_sentences = check_paraphrase(input_text, page_text, url)
#if is_paraphrase:
return is_paraphrase, url, aligned_sentences, content.images
return False, None, [], []
def longest_common_subsequence(arr1, arr2):
"""
Finds the length of the longest common subsequence (contiguous) between
two arrays.
Args:
arr1: The first array.
arr2: The second array.
Returns:
The length of the longest common subsequence.
Returns 0 if either input is invalid.
"""
if not isinstance(arr1, list) or not isinstance(arr2, list):
return 0
n = len(arr1)
m = len(arr2)
if n == 0 or m == 0: #handle empty list
return 0
# Create table dp with size (n+1) x (m+1)
dp = [[0] * (m + 1) for _ in range(n + 1)]
max_length = 0
for i in range(1, n + 1):
for j in range(1, m + 1):
if arr1[i - 1] == arr2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
max_length = max(max_length, dp[i][j])
else:
dp[i][j] = 0 # set 0 since the array must be consecutive
return max_length
def check_sentence(input_sentence, source_sentence, min_same_sentence_len,
min_phrase_sentence_len, verbose=False):
"""
Checks if two sentences are similar based on exact match or
longest common subsequence.
Args:
input_sentence: The input sentence.
source_sentence: The source sentence.
min_same_sentence_len: Minimum length for exact sentence match.
min_phrase_sentence_len: Minimum length for common subsequence match.
verbose: If True, print debug information.
Returns:
True if the sentences are considered similar, False otherwise.
Returns False if input is not valid.
"""
if not isinstance(input_sentence, str) or not isinstance(source_sentence, str):
return False
input_sentence = input_sentence.strip()
source_sentence = source_sentence.strip()
if not input_sentence or not source_sentence: # handle empty string
return False
input_words = input_sentence.split() # split without arguments
source_words = source_sentence.split() # split without arguments
if input_sentence == source_sentence and len(input_words) >= min_same_sentence_len:
if verbose:
print("Exact match found.")
return True
max_overlap_len = longest_common_subsequence(input_words, source_words)
if verbose:
print(f"Max overlap length: {max_overlap_len}") # print overlap length
if max_overlap_len >= min_phrase_sentence_len:
return True
return False
def check_paraphrase(input_text, page_text, url, verbose=False):
"""
Checks if the input text is paraphrased in the content at the given URL.
Args:
input_text: The text to check for paraphrase.
page_text: The text of the web page to compare with.
verbose: If True, print debug information.
Returns:
A tuple containing:
- is_paraphrase: True if the input text is considered a paraphrase, False otherwise.
- paraphrase_results: A list of dictionaries, each containing:
- input_sentence: The sentence from the input text.
- matched_sentence: The corresponding sentence from the web page (if found).
- similarity: The cosine similarity score between the sentences.
- is_paraphrase_sentence: True if the individual sentence pair meets the paraphrase criteria, False otherwise.
"""
is_paraphrase_text = False
if not isinstance(input_text, str) or not isinstance(page_text, str):
return False, []
# Extract sentences from input text and web page
#input_text = remove_punctuation(input_text)
input_sentences = split_into_sentences(input_text)
if not page_text:
return is_paraphrase_text, []
#page_text = remove_punctuation(page_text)
page_sentences = split_into_sentences(page_text)
if not input_sentences or not page_sentences:
return is_paraphrase_text, []
additional_sentences = []
for sentence in page_sentences:
if ", external" in sentence:
additional_sentences.append(sentence.replace(", external", ""))
page_sentences.extend(additional_sentences)
min_matching_sentences = math.ceil(len(input_sentences) * MIN_RATIO_PARAPHRASE_NUM)
# Encode sentences into embeddings
embeddings1 = PARAPHASE_MODEL.encode(input_sentences, convert_to_tensor=True, device=DEVICE)
embeddings2 = PARAPHASE_MODEL.encode(page_sentences, convert_to_tensor=True, device=DEVICE)
# Compute cosine similarity matrix
similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy()
# Find sentence alignments
alignment = {}
paraphrased_sentence_count = 0
for i, sentence1 in enumerate(input_sentences):
print(f"allign: {i}")
max_sim_index = np.argmax(similarity_matrix[i])
max_similarity = similarity_matrix[i][max_sim_index]
is_paraphrase_sentence = max_similarity > PARAPHRASE_THRESHOLD
if 0.80 > max_similarity:
alignment = {
"input_sentence": sentence1,
"matched_sentence": "",
"similarity": max_similarity,
"is_paraphrase_sentence": is_paraphrase_sentence,
"url": "",
}
else:
alignment = {
"input_sentence": sentence1,
"matched_sentence": page_sentences[max_sim_index],
"similarity": max_similarity,
"is_paraphrase_sentence": is_paraphrase_sentence,
"url": url,
}
# Check for individual sentence paraphrase if overall paraphrase not yet found
if not is_paraphrase_text and check_sentence(
sentence1, page_sentences[max_sim_index], MIN_SAME_SENTENCE_LEN, MIN_PHRASE_SENTENCE_LEN
):
is_paraphrase_text = True
if verbose:
print(f"Paraphrase found for individual sentence: {sentence1}")
print(f"Matched sentence: {page_sentences[max_sim_index]}")
#alignment.append(item)
paraphrased_sentence_count += 1 if is_paraphrase_sentence else 0
# Check if enough sentences are paraphrases
is_paraphrase_text = paraphrased_sentence_count > 0 #min_matching_sentences
if verbose:
print (f"\t\tparaphrased_sentence_count: {paraphrased_sentence_count}, min_matching_sentences: {min_matching_sentences}, total_sentence_count: {len(input_sentences)}")
print(f"Minimum matching sentences required: {min_matching_sentences}")
print(f"Total input sentences: {len(input_sentences)}")
print(f"Number of matching sentences: {paraphrased_sentence_count}")
print(f"Is paraphrase: {is_paraphrase_text}")
for item in alignment:
print(item)
return is_paraphrase_text, alignment
def similarity_ratio(a, b):
"""
Calculates the similarity ratio between two strings using SequenceMatcher.
Args:
a: The first string.
b: The second string.
Returns:
A float representing the similarity ratio between 0.0 and 1.0.
Returns 0.0 if either input is None or not a string.
"""
if not isinstance(a, str) or not isinstance(b, str) or a is None or b is None:
return 0.0 # Handle cases where inputs are not strings or None
return SequenceMatcher(None, a, b).ratio()
def check_human(alligned_sentences):
"""
Checks if a sufficient number of input sentences are found within
source sentences.
Returns:
bool: True if the condition is met, False otherwise.
"""
if not alligned_sentences: # Handle empty data case
return False
if alligned_sentences["similarity"] >= 0.99:
return True
return False
if __name__ == '__main__':
pass