pmkhanh7890's picture
1st
22e1b62
raw
history blame
27.2 kB
import warnings
from bs4 import BeautifulSoup
from identity import extract_entities
warnings.simplefilter(action='ignore', category=FutureWarning)
import time
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
import re
from collections import Counter
import string
import nltk
import torch
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.util import ngrams
from sentence_transformers import SentenceTransformer, util
import math
from dotenv import load_dotenv
from difflib import SequenceMatcher
import os
import requests
import csv
from newspaper import article, ArticleException, ArticleBinaryDataException
# Google Cloud Console
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
SEARCH_ENGINE_ID = os.getenv("SEARCH_ENGINE_ID")
# Download necessary NLTK data files
nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True)
nltk.download('stopwords', quiet=True)
# load the model
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PARAPHASE_MODEL = SentenceTransformer('paraphrase-MiniLM-L6-v2')
PARAPHASE_MODEL.to(DEVICE)
BATCH_SIZE = 8
MAX_URL_SIZE = 2000000 # ~2MB
PARAPHRASE_THRESHOLD = 0.8
PARAPHRASE_THRESHOLD_FOR_OPPOSITE = 0.7
MIN_SAME_SENTENCE_LEN = 6
MIN_PHRASE_SENTENCE_LEN = 10
MIN_RATIO_PARAPHRASE_NUM = 0.7
MAX_CHAR_SIZE = 30000
def clean_text(text):
"""Doc cleaning"""
punctuations = r"""!"#$%&'()*+-/:;<=>?@[\]^_`{|}~""" # not include , and . due to number
# Lowering text
text = text.lower()
# Removing punctuation
text = "".join([c for c in text if c not in punctuations])
# Removing whitespace and newlines
text = re.sub(r'\s+',' ',text)
text.replace("£", " * ")
words = text.split()
text = ' '.join(words[:18]) # Join the first 18 words back into a string
return text
def remove_punctuation(text):
"""Remove punctuation from a given text."""
punctuation_without_dot = string.punctuation.replace(".", "")
translator = str.maketrans('', '', punctuation_without_dot)
return text.translate(translator)
def get_keywords(text, num_keywords=5):
"""Return top k keywords from a doc using TF-IDF method"""
# Create a TF-IDF Vectorizer
vectorizer = TfidfVectorizer(stop_words='english')
# Fit and transform the text
tfidf_matrix = vectorizer.fit_transform([text])
# Get feature names (words)
feature_names = vectorizer.get_feature_names_out()
# Get TF-IDF scores
tfidf_scores = tfidf_matrix.toarray()[0]
# Sort words by TF-IDF score
word_scores = list(zip(feature_names, tfidf_scores))
word_scores.sort(key=lambda x: x[1], reverse=True)
# Return top keywords
return [word for word, score in word_scores[:num_keywords]]
"""
# Example usage
text = "Artificial intelligence (AI) is intelligence demonstrated by machines, as opposed to natural intelligence displayed by animals including humans. Leading AI textbooks define the field as the study of "intelligent agents": any system that perceives its environment and takes actions that maximize its chance of achieving its goals. Some popular accounts use the term "artificial intelligence" to describe machines that mimic "cognitive" functions that humans associate with the human mind, such as "learning" and "problem solving", however this definition is rejected by major AI researchers."
print(f"\n# Input text:\n'{text}'")
print("\n----------------------\n")
keywords = get_keywords(text)
print("# Top keywords:", keywords)
print("\n----------------------\n")
"""
def get_important_sentences(paragraph: str, keywords: list[str], num_sentences: int = 3) -> list[str]:
"""
Selects important sentences from a given paragraph based on a list of keywords.
Args:
paragraph (str): The input paragraph.
keywords (list[str]): List of important keywords.
num_sentences (int): Number of sentences to return (default is 3).
Returns:
list: A list of important sentences.
"""
# Clean and split the paragraph into sentences
sentences = [s.strip() for s in re.split(r'(?<=[.!?])\s+', paragraph) if s.strip()]
# Calculate the importance score for each sentence
sentence_scores = []
for sentence in sentences:
processed_sentence = clean_text(sentence)
score = 0
words = processed_sentence.lower().split()
word_count = Counter(words)
for keyword in keywords:
if keyword.lower() in word_count:
score += word_count[keyword.lower()]
sentence_scores.append((sentence, score))
# Sort sentences by their scores in descending order
sentence_scores.sort(key=lambda x: x[1], reverse=True)
# Return the top N sentences
return [sentence for sentence, score in sentence_scores[:num_sentences]]
"""# Example usage
keywords = get_keywords(paragraph)
important_sentences = get_important_sentences(paragraph, keywords)
print("# Important sentences:")
for i, sentence in enumerate(important_sentences, 1):
print(f"{i}. {sentence}")
print("\n----------------------\n")
"""
def extract_important_phrases(paragraph: str, keywords: list[str], phrase_length: int = 5) -> list[str]:
"""
Extracts important phrases from a given paragraph based on a list of keywords.
Phrase length is auto-determined, and overlapped parts are less than 20%.
Args:
paragraph (str): The input paragraph.
keywords (list[str]): List of important keywords.
phrase_length (int): The length of phrases to extract (default is 5 words).
Returns:
list: A list of important phrases.
"""
# Tokenize the paragraph into words
words = word_tokenize(paragraph.lower())
# Determine phrase length (between 3 and 7 words)
phrase_length = min(max(len(words) // 10, 5), 7)
# Generate n-grams (phrases) from the paragraph
phrases = list(ngrams(words, phrase_length))
important_phrases = []
used_indices = set()
for i, phrase in enumerate(phrases):
# Check if the phrase contains any keyword
if any(keyword.lower() in phrase for keyword in keywords):
# Check overlap with previously selected phrases
if not any(abs(i - j) < phrase_length * 0.8 for j in used_indices):
important_phrases.append(clean_text(" ".join(phrase)))
used_indices.add(i)
return important_phrases
"""# Example usage
keywords = get_keywords(paragraph)
important_phrases = extract_important_phrases(paragraph, keywords)
print("# Important phrases:")
for i, phrase in enumerate(important_phrases[:5], 1): # Print top 5 phrases
print(f"{i}. {phrase}")"""
def search_by_google(
query,
num_results=10,
is_exact_terms = False
) -> dict:
"""
Searches the Google Custom Search Engine for the given query.
Args:
query: The search query.
is_exact_terms: Whether to use exact terms search (True) or regular search (False).
num_results: The number of results to return (default: 10).
Returns:
A dictionary containing the search results or None if there was an error.
"""
start_date = "20000101"
end_date = "20210101"
url = "https://www.googleapis.com/customsearch/v1"
params = {
"key": GOOGLE_API_KEY,
"cx": SEARCH_ENGINE_ID,
"num": num_results,
}
if is_exact_terms:
params["exactTerms"] = query
else:
params["q"] = query.replace('"', "")
response = requests.get(url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}, {response.text}")
return None
def display_Google_results(results):
for result in results:
print(f"Title: {result['title']}")
print(f"Link: {result['link']}")
print(f"Snippet: {result['snippet']}")
print(" ------- ")
def detect_by_relative_search(input_text, is_support_opposite = False):
checked_urls = set()
searched_phrases = generate_search_phrases(input_text)
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item['link'] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # already checked
continue
checked_urls.add(url)
print(f"\n\tURL: {url}")
size = get_url_size(url)
if size != None and size <= MAX_URL_SIZE:
page_text = extract_text(url)
if page_text is None or len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters")
continue
is_paraphrase, data = check_paraphrase(input_text, page_text)
if is_paraphrase:
return is_paraphrase, url, data
return False, None, []
def get_url_size(url):
"""
Retrieves the size of a URL's content using a HEAD request.
Args:
url: The URL to check.
Returns:
The size of the content in bytes, or None if the size cannot be determined
(e.g., due to network errors or missing Content-Length header).
"""
try:
response = requests.head(url, allow_redirects=True, timeout=5) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
content_length = response.headers.get('Content-Length')
if content_length is not None:
return int(content_length)
else:
print(f"\t\t↑↑↑ Content-Length header not found")
return None
except requests.exceptions.RequestException as e:
print(f"\t\t↑↑↑ Error getting URL size: {e}")
return None
def get_most_frequent_words(input_text, number_word=32):
"""
Gets the top words from the input text, excluding stop words and punctuation.
Args:
input_text: The input text as a string.
number_word: The number of top words to return.
Returns:
A list of tuples, where each tuple contains a word and its frequency.
Returns an empty list if input is not a string or is empty.
"""
if not isinstance(input_text, str) or not input_text:
return []
words = word_tokenize(input_text.lower()) # Tokenize and lowercase
stop_words = set(stopwords.words('english'))
punctuation = set(string.punctuation) # get all punctuation
filtered_words = [
word for word in words
if word.isalnum() and word not in stop_words and word not in punctuation
]
word_frequencies = Counter(filtered_words)
top_words = word_frequencies.most_common(number_word)
for top_word in top_words:
words.append(top_word[0])
if len(words) > 32:
search_phrase = " ".join(words[:32])
else:
search_phrase = " ".join(words[:number_word])
return search_phrase
def get_chunk(input_text, chunk_length=32, num_chunk=3):
"""
Splits the input text into chunks of a specified length.
Args:
input_text: The input text as a string.
num_chunk: The maximum number of chunks to create.
chunk_length: The desired length of each chunk (in words).
Returns:
A list of string chunks.
Returns an empty list if input is invalid.
"""
if not isinstance(input_text, str):
return []
chunks = []
input_words = input_text.split() # Split by any whitespace
for i in range(num_chunk):
start_index = i * chunk_length
end_index = (i + 1) * chunk_length
chunk = " ".join(input_words[start_index:end_index])
if chunk: # Only append non-empty chunks
chunks.append(chunk)
return chunks
def generate_search_phrases(input_text):
"""
Generates different types of phrases for search purposes.
Args:
input_text: The input text.
Returns:
A list containing:
- A list of most frequent words.
- The original input text.
- A list of text chunks.
"""
if not isinstance(input_text, str):
return []
search_phrases = []
# Method 1: Get most frequent words
search_phrases.append(get_most_frequent_words(input_text))
# Method 2: Get the whole text
search_phrases.append(input_text)
# Method 3: Split text by chunks
search_phrases.extend(get_chunk(input_text))
# Method 4: Get most identities and key words
entities = extract_entities(input_text)
keywords = get_keywords(input_text, 16)
search_phrase = " ".join(entities) + " " + " ".join(keywords)
search_phrases.append(search_phrase)
return search_phrases
def split_into_sentences(input_text):
"""
Splits input text into sentences by newlines.
Args:
input_text: The input text as a string.
Returns:
A list of sentences. Returns an empty list if input is not valid.
"""
if not isinstance(input_text, str):
return []
paragraphs = input_text.splitlines()
sentences = []
for paragraph in paragraphs:
paragraph = paragraph.strip()
if paragraph:
sentences.extend(sent_tokenize(paragraph))
return sentences
def longest_common_subsequence(arr1, arr2):
"""
Finds the length of the longest common subsequence (contiguous) between
two arrays.
Args:
arr1: The first array.
arr2: The second array.
Returns:
The length of the longest common subsequence.
Returns 0 if either input is invalid.
"""
if not isinstance(arr1, list) or not isinstance(arr2, list):
return 0
n = len(arr1)
m = len(arr2)
if n == 0 or m == 0: #handle empty list
return 0
# Create table dp with size (n+1) x (m+1)
dp = [[0] * (m + 1) for _ in range(n + 1)]
max_length = 0
for i in range(1, n + 1):
for j in range(1, m + 1):
if arr1[i - 1] == arr2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
max_length = max(max_length, dp[i][j])
else:
dp[i][j] = 0 # set 0 since the array must be consecutive
return max_length
def check_sentence(input_sentence, source_sentence, min_same_sentence_len,
min_phrase_sentence_len, verbose=False):
"""
Checks if two sentences are similar based on exact match or
longest common subsequence.
Args:
input_sentence: The input sentence.
source_sentence: The source sentence.
min_same_sentence_len: Minimum length for exact sentence match.
min_phrase_sentence_len: Minimum length for common subsequence match.
verbose: If True, print debug information.
Returns:
True if the sentences are considered similar, False otherwise.
Returns False if input is not valid.
"""
if not isinstance(input_sentence, str) or not isinstance(source_sentence, str):
return False
input_sentence = input_sentence.strip()
source_sentence = source_sentence.strip()
if not input_sentence or not source_sentence: # handle empty string
return False
input_words = input_sentence.split() # split without arguments
source_words = source_sentence.split() # split without arguments
if input_sentence == source_sentence and len(input_words) >= min_same_sentence_len:
if verbose:
print("Exact match found.")
return True
max_overlap_len = longest_common_subsequence(input_words, source_words)
if verbose:
print(f"Max overlap length: {max_overlap_len}") # print overlap length
if max_overlap_len >= min_phrase_sentence_len:
return True
return False
def extract_text(url, newspapers = False):
"""
Extracts text from a URL, handling HTML and potential errors.
Args:
url: The URL of the web page to extract text from.
Returns:
The extracted text content from the web page, or None if extraction fails.
"""
if newspapers is True:
try:
response = requests.get(url)
response.raise_for_status() # Raise exception for unsuccessful requests
except requests.exceptions.RequestException as e:
print(f"Error fetching URL: {e}")
return None
try:
news = article(url=url, fetch_images=False)
except: # (ArticleException, ArticleBinaryDataException) as e:
print(f"\t\t↑↑↑ Error downloading article.")
#print(f"\t\t↑↑↑ Error downloading article: {e}")
return None
return news.text
else:
"""
Extracts text from an HTML page.
"""
response = requests.get(url)
response.raise_for_status()
response.encoding = response.apparent_encoding
try:
soup = BeautifulSoup(response.content, "html.parser")
except:
print(f"Error parsing HTML content from {url}")
return None
# Exclude text within specific elements
for element in soup(["img", "figcaption", "table", "script", "style"]):
element.extract()
#text = soup.get_text(separator="\n")
paragraphs = soup.find_all('p')
text = ' '.join([p.get_text() for p in paragraphs])
# remove ", external" which appear after the embedded text
# text = re.sub(r', external', '', text)
return text
def check_paraphrase(input_text, page_text, verbose=False):
"""
Checks if the input text is paraphrased in the content at the given URL.
Args:
input_text: The text to check for paraphrase.
url: The URL of the web page to compare with.
verbose: If True, print debug information.
Returns:
A tuple containing:
- is_paraphrase: True if the input text is considered a paraphrase, False otherwise.
- paraphrase_results: A list of dictionaries, each containing:
- input_sentence: The sentence from the input text.
- matched_sentence: The corresponding sentence from the web page (if found).
- similarity: The cosine similarity score between the sentences.
- is_paraphrase_sentence: True if the individual sentence pair meets the paraphrase criteria, False otherwise.
"""
is_paraphrase_text = False
if not isinstance(input_text, str) or not isinstance(page_text, str):
return False, []
# Extract sentences from input text and web page
#input_text = remove_punctuation(input_text)
input_sentences = split_into_sentences(input_text)
if not page_text:
return is_paraphrase_text, []
#page_text = remove_punctuation(page_text)
page_sentences = split_into_sentences(page_text)
if not input_sentences or not page_sentences:
return is_paraphrase_text, []
additional_sentences = []
for sentence in page_sentences:
if ", external" in sentence:
additional_sentences.append(sentence.replace(", external", ""))
page_sentences.extend(additional_sentences)
min_matching_sentences = math.ceil(len(input_sentences) * MIN_RATIO_PARAPHRASE_NUM)
# Encode sentences into embeddings
embeddings1 = PARAPHASE_MODEL.encode(input_sentences, convert_to_tensor=True, device=DEVICE)
embeddings2 = PARAPHASE_MODEL.encode(page_sentences, convert_to_tensor=True, device=DEVICE)
# Compute cosine similarity matrix
similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy()
# Find sentence alignments
alignment = []
paraphrased_sentence_count = 0
for i, sentence1 in enumerate(input_sentences):
max_sim_index = np.argmax(similarity_matrix[i])
max_similarity = similarity_matrix[i][max_sim_index]
is_paraphrase_sentence = max_similarity > PARAPHRASE_THRESHOLD
if 0.80 < max_similarity < 0.99:
print(f"\t\tinput_sentence : {sentence1}")
print(f"\t\tmatched_sentence: {page_sentences[max_sim_index]}")
print(f"\t\t--> similarity: {max_similarity}\n")
item = {
"input_sentence": sentence1,
"matched_sentence": page_sentences[max_sim_index],
"similarity": max_similarity,
"is_paraphrase_sentence": is_paraphrase_sentence,
}
# Check for individual sentence paraphrase if overall paraphrase not yet found
if not is_paraphrase_text and check_sentence(
sentence1, page_sentences[max_sim_index], MIN_SAME_SENTENCE_LEN, MIN_PHRASE_SENTENCE_LEN
):
is_paraphrase_text = True
if verbose:
print(f"Paraphrase found for individual sentence: {sentence1}")
print(f"Matched sentence: {page_sentences[max_sim_index]}")
alignment.append(item)
paraphrased_sentence_count += 1 if is_paraphrase_sentence else 0
# Check if enough sentences are paraphrases
print (f"\t\tparaphrased_sentence_count: {paraphrased_sentence_count}, min_matching_sentences: {min_matching_sentences}, total_sentence_count: {len(input_sentences)}")
is_paraphrase_text = paraphrased_sentence_count >= min_matching_sentences
if verbose:
print(f"Minimum matching sentences required: {min_matching_sentences}")
print(f"Total input sentences: {len(input_sentences)}")
print(f"Number of matching sentences: {paraphrased_sentence_count}")
print(f"Is paraphrase: {is_paraphrase_text}")
for item in alignment:
print(item)
return is_paraphrase_text, alignment
def similarity_ratio(a, b):
"""
Calculates the similarity ratio between two strings using SequenceMatcher.
Args:
a: The first string.
b: The second string.
Returns:
A float representing the similarity ratio between 0.0 and 1.0.
Returns 0.0 if either input is None or not a string.
"""
if not isinstance(a, str) or not isinstance(b, str) or a is None or b is None:
return 0.0 # Handle cases where inputs are not strings or None
return SequenceMatcher(None, a, b).ratio()
def is_human_written(sentence):
# 1. Search for exact matches before 2020
query = f'"{sentence}"'
results = search_by_google(query)
#results = search_bing(sentence)
# print("\n----------------------\n")
# print(f"# Search results:\n")
# display_Google_results(results)
if results:
# Exact match found, likely human-written
#return f"human-written\nExact match found: '{sentence}'"
return -1
# 2. If no exact match, find similar sentences
query = sentence
results = search_by_google(query)
if results:
# Check similarity with search results
similarities = [similarity_ratio(sentence, result['snippet']) for result in results]
max_similarity = max(similarities)
# You can adjust this threshold as needed
if max_similarity > 0.8:
#return f"likely human-written\nFound result that has {max_similarity*100}% of '{sentence}'"
return max_similarity
# No strong evidence of human authorship
#return f"likely machine-generated\nFound result that has less than 80% similarity of '{sentence}'"
return 1
# # Example usage
# sentence = important_sentences[0]
# result = is_human_written(sentence)
# print("\n----------------------\n")
# print(f"# Result:\nThe sentence is {result}")
def get_text_from_csv(filename):
"""
Reads a CSV file and returns a list of strings,
extracting only the second column (assuming it contains the text).
Args:
filename: The path to the CSV file.
Returns:
A list of strings containing the text from the second column.
"""
text_data = []
with open(filename, 'r') as file:
reader = csv.reader(file)
next(reader, None) # skip the headers
for row in reader:
if len(row) >= 2: # Check if the row has at least two elements
text_data.append(row[1])
return text_data
if __name__ == '__main__':
# paragraph = """
# Artificial intelligence (AI) is intelligence demonstrated by machines, as opposed to natural intelligence displayed by animals including humans. Leading AI textbooks define the field as the study of "intelligent agents": any system that perceives its environment and takes actions that maximize its chance of achieving its goals. Some popular accounts use the term "artificial intelligence" to describe machines that mimic "cognitive" functions that humans associate with the human mind, such as "learning" and "problem solving", however this definition is rejected by major AI researchers.
# """
# keywords = get_keywords(paragraph)
# important_sentences = get_important_sentences(paragraph, keywords)
# print("# Important sentences:")
# for i, sentence in enumerate(important_sentences, 1):
# print(f"{i}. {sentence}")
# print("\n----------------------\n")
# sentence = important_sentences[0]
filename = "data/results/[res]unchanged_words.csv" # Replace with the actual filename
text_list = get_text_from_csv(filename)
count = 1
match_count = 0
unmatch_count = 0
initial_delay = 1 # second
data = []
for text in text_list:
cleaned_text = clean_text(text)
result = is_human_written(cleaned_text)
match = "match" if result == -1 else "unmatch"
print(f"{count}: [{match}] {text}")
data.append([match, text])
if result == -1:
match_count += 1
else:
unmatch_count += 1
count += 1
time.sleep(initial_delay) # avoid 100? queries per minute limit
print(f"Match count: {match_count}")
print(f"Unmatch count: {unmatch_count}")
df = pd.DataFrame(data, columns=["Text", "Match"])
output_filename = "data/results/[res]unchanged_words_processed_data.csv" # Specify the output filename
df.to_csv(output_filename, index=False)
# # Bing search
# subscription_key = "80163c6371fa40e0a50dfaa1dd5b7d84"
# assert subscription_key
# search_url = "https://api.bing.microsoft.com/v7.0/search"
# headers = {"Ocp-Apim-Subscription-Key": subscription_key}
# params = {"q": '"Artificial intelligence (AI) is intelligence demonstrated by machines"', 'freshness': '2000-02-01..2020-02-01', 'answerCount': 2, 'mkt': 'en-US' }
# response = requests.get(search_url, headers=headers, params=params)
# response.raise_for_status()
# search_results = response.json()
# print("\nHeaders:\n")
# print(response.headers)
# print("\nJSON Response:\n")
# pprint(response.json())