Spaces:
Sleeping
Sleeping
File size: 10,161 Bytes
1ce1659 da7dbd0 1ce1659 da7dbd0 1ce1659 da7dbd0 1ce1659 da7dbd0 1ce1659 da7dbd0 d952fbe da7dbd0 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 d952fbe 1ce1659 da7dbd0 1ce1659 d952fbe 1ce1659 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from src.application.text.preprocessing import split_into_sentences
from src.application.text.search import generate_search_phrases, search_by_google
from src.application.url_reader import URLReader
import numpy as np
import nltk
import torch
from nltk.corpus import stopwords
from sentence_transformers import SentenceTransformer, util
import math
from difflib import SequenceMatcher
# Download necessary NLTK data files
nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True)
nltk.download('stopwords', quiet=True)
# load the model
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PARAPHASE_MODEL = SentenceTransformer('paraphrase-MiniLM-L6-v2')
PARAPHASE_MODEL.to(DEVICE)
BATCH_SIZE = 8
PARAPHRASE_THRESHOLD = 0.8
PARAPHRASE_THRESHOLD_FOR_OPPOSITE = 0.7
MIN_SAME_SENTENCE_LEN = 6
MIN_PHRASE_SENTENCE_LEN = 10
MIN_RATIO_PARAPHRASE_NUM = 0.7
MAX_CHAR_SIZE = 30000
def detect_text_by_relative_search(input_text, is_support_opposite = False):
checked_urls = set()
searched_phrases = generate_search_phrases(input_text)
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item['link'] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # visited url
continue
checked_urls.add(url)
print(f"\t\tChecking URL: {url}")
content = URLReader(url)
if content.is_extracted is True:
if content.title is None or content.text is None:
print(f"\t\t\tβββ Title or text not found")
continue
page_text = content.title + "\n" + content.text
if len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t\tβββ More than {MAX_CHAR_SIZE} characters")
continue
is_paraphrase, aligned_sentences = check_paraphrase(input_text, page_text, url)
#if is_paraphrase:
return is_paraphrase, url, aligned_sentences, content.images
return False, None, [], []
def longest_common_subsequence(arr1, arr2):
"""
Finds the length of the longest common subsequence (contiguous) between
two arrays.
Args:
arr1: The first array.
arr2: The second array.
Returns:
The length of the longest common subsequence.
Returns 0 if either input is invalid.
"""
if not isinstance(arr1, list) or not isinstance(arr2, list):
return 0
n = len(arr1)
m = len(arr2)
if n == 0 or m == 0: #handle empty list
return 0
# Create table dp with size (n+1) x (m+1)
dp = [[0] * (m + 1) for _ in range(n + 1)]
max_length = 0
for i in range(1, n + 1):
for j in range(1, m + 1):
if arr1[i - 1] == arr2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
max_length = max(max_length, dp[i][j])
else:
dp[i][j] = 0 # set 0 since the array must be consecutive
return max_length
def check_sentence(input_sentence, source_sentence, min_same_sentence_len,
min_phrase_sentence_len, verbose=False):
"""
Checks if two sentences are similar based on exact match or
longest common subsequence.
Args:
input_sentence: The input sentence.
source_sentence: The source sentence.
min_same_sentence_len: Minimum length for exact sentence match.
min_phrase_sentence_len: Minimum length for common subsequence match.
verbose: If True, print debug information.
Returns:
True if the sentences are considered similar, False otherwise.
Returns False if input is not valid.
"""
if not isinstance(input_sentence, str) or not isinstance(source_sentence, str):
return False
input_sentence = input_sentence.strip()
source_sentence = source_sentence.strip()
if not input_sentence or not source_sentence: # handle empty string
return False
input_words = input_sentence.split() # split without arguments
source_words = source_sentence.split() # split without arguments
if input_sentence == source_sentence and len(input_words) >= min_same_sentence_len:
if verbose:
print("Exact match found.")
return True
max_overlap_len = longest_common_subsequence(input_words, source_words)
if verbose:
print(f"Max overlap length: {max_overlap_len}") # print overlap length
if max_overlap_len >= min_phrase_sentence_len:
return True
return False
def check_paraphrase(input_text, page_text, url, verbose=False):
"""
Checks if the input text is paraphrased in the content at the given URL.
Args:
input_text: The text to check for paraphrase.
page_text: The text of the web page to compare with.
verbose: If True, print debug information.
Returns:
A tuple containing:
- is_paraphrase: True if the input text is considered a paraphrase, False otherwise.
- paraphrase_results: A list of dictionaries, each containing:
- input_sentence: The sentence from the input text.
- matched_sentence: The corresponding sentence from the web page (if found).
- similarity: The cosine similarity score between the sentences.
- is_paraphrase_sentence: True if the individual sentence pair meets the paraphrase criteria, False otherwise.
"""
is_paraphrase_text = False
if not isinstance(input_text, str) or not isinstance(page_text, str):
return False, []
# Extract sentences from input text and web page
#input_text = remove_punctuation(input_text)
input_sentences = split_into_sentences(input_text)
if not page_text:
return is_paraphrase_text, []
#page_text = remove_punctuation(page_text)
page_sentences = split_into_sentences(page_text)
if not input_sentences or not page_sentences:
return is_paraphrase_text, []
additional_sentences = []
for sentence in page_sentences:
if ", external" in sentence:
additional_sentences.append(sentence.replace(", external", ""))
page_sentences.extend(additional_sentences)
min_matching_sentences = math.ceil(len(input_sentences) * MIN_RATIO_PARAPHRASE_NUM)
# Encode sentences into embeddings
embeddings1 = PARAPHASE_MODEL.encode(input_sentences, convert_to_tensor=True, device=DEVICE)
embeddings2 = PARAPHASE_MODEL.encode(page_sentences, convert_to_tensor=True, device=DEVICE)
# Compute cosine similarity matrix
similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy()
# Find sentence alignments
alignment = {}
paraphrased_sentence_count = 0
for i, sentence1 in enumerate(input_sentences):
print(f"allign: {i}")
max_sim_index = np.argmax(similarity_matrix[i])
max_similarity = similarity_matrix[i][max_sim_index]
is_paraphrase_sentence = max_similarity > PARAPHRASE_THRESHOLD
if 0.80 > max_similarity:
alignment = {
"input_sentence": sentence1,
"matched_sentence": "",
"similarity": max_similarity,
"is_paraphrase_sentence": is_paraphrase_sentence,
"url": "",
}
else:
alignment = {
"input_sentence": sentence1,
"matched_sentence": page_sentences[max_sim_index],
"similarity": max_similarity,
"is_paraphrase_sentence": is_paraphrase_sentence,
"url": url,
}
# Check for individual sentence paraphrase if overall paraphrase not yet found
if not is_paraphrase_text and check_sentence(
sentence1, page_sentences[max_sim_index], MIN_SAME_SENTENCE_LEN, MIN_PHRASE_SENTENCE_LEN
):
is_paraphrase_text = True
if verbose:
print(f"Paraphrase found for individual sentence: {sentence1}")
print(f"Matched sentence: {page_sentences[max_sim_index]}")
#alignment.append(item)
paraphrased_sentence_count += 1 if is_paraphrase_sentence else 0
# Check if enough sentences are paraphrases
is_paraphrase_text = paraphrased_sentence_count > 0 #min_matching_sentences
if verbose:
print (f"\t\tparaphrased_sentence_count: {paraphrased_sentence_count}, min_matching_sentences: {min_matching_sentences}, total_sentence_count: {len(input_sentences)}")
print(f"Minimum matching sentences required: {min_matching_sentences}")
print(f"Total input sentences: {len(input_sentences)}")
print(f"Number of matching sentences: {paraphrased_sentence_count}")
print(f"Is paraphrase: {is_paraphrase_text}")
for item in alignment:
print(item)
return is_paraphrase_text, alignment
def similarity_ratio(a, b):
"""
Calculates the similarity ratio between two strings using SequenceMatcher.
Args:
a: The first string.
b: The second string.
Returns:
A float representing the similarity ratio between 0.0 and 1.0.
Returns 0.0 if either input is None or not a string.
"""
if not isinstance(a, str) or not isinstance(b, str) or a is None or b is None:
return 0.0 # Handle cases where inputs are not strings or None
return SequenceMatcher(None, a, b).ratio()
def check_human(alligned_sentences):
"""
Checks if a sufficient number of input sentences are found within
source sentences.
Returns:
bool: True if the condition is met, False otherwise.
"""
if not alligned_sentences: # Handle empty data case
return False
if alligned_sentences["similarity"] >= 0.99:
return True
return False
if __name__ == '__main__':
pass |