news_verification / src /application /content_detection.py
pmkhanh7890's picture
run pre-commit
38fd181
raw
history blame
29.6 kB
from difflib import SequenceMatcher
import pandas as pd
from src.application.image.image_detection import (
detect_image_by_ai_model,
detect_image_by_reverse_search,
detect_image_from_news_image,
)
from src.application.text.entity import (
apply_highlight,
highlight_entities,
)
from src.application.text.helper import extract_equal_text
from src.application.text.model_detection import detect_text_by_ai_model
from src.application.text.preprocessing import split_into_paragraphs
from src.application.text.search_detection import (
check_human,
detect_text_by_relative_search,
find_text_source,
)
class NewsVerification:
def __init__(self):
self.news_text = ""
self.news_title = ""
self.news_content = ""
self.news_image = ""
self.text_prediction_label: list[str] = []
self.text_prediction_score: list[float] = []
self.text_referent_url: list[str] = []
self.image_prediction_label: list[str] = []
self.image_prediction_score: list[str] = []
self.image_referent_url: list[str] = []
self.news_prediction_label = ""
self.news_prediction_score = -1
self.found_img_url: list[str] = []
self.aligned_sentences: list[dict] = []
self.aligned_sentences_df: pd.DataFrame = pd.DataFrame(
columns=[
"input_sentence",
"matched_sentence",
"label",
"similarity",
"paraphrase",
"url",
"group",
"entities",
],
)
self.is_paraphrased: list[bool] = []
self.ordinary_user_table: list = []
self.fact_checker_table: list = []
self.governor_table: list = []
self.entities_with_colors = []
def load_news(self, news_title, news_content, news_image):
self.news_text = news_title + "\n\n" + news_content
self.news_title = news_title
self.news_content = news_content
self.news_image = news_image
def determine_text_origin(self):
"""
Determines the origin of the given text based on paraphrasing detection
and human authorship analysis.
Args:
text: The input text to be analyzed.
Returns:
str: The predicted origin of the text:
- "HUMAN": If the text is likely written by a human.
- "MACHINE": If the text is likely generated by a machine.
"""
print("CHECK TEXT:")
print("\tFrom search engine:")
# Classify by search engine
input_sentences = split_into_paragraphs(self.news_text)
current_index = 0
previous_paraphrase = None
ai_sentence = {
"input_sentence": "",
"matched_sentence": "",
"label": "",
"similarity": None,
"paraphrase": False,
"url": "",
}
for index, sentence in enumerate(input_sentences):
print(f"-------index = {index}-------")
print(f"current_sentence = {input_sentences[index]}")
if current_index >= len(input_sentences):
break
if (
current_index > index
and index != 0
and index != len(input_sentences) - 1
):
continue
(
paraphrase,
text_url,
searched_sentences,
img_urls,
current_index,
) = detect_text_by_relative_search(input_sentences, index)
if paraphrase is False:
# add sentence to ai_sentence
if ai_sentence["input_sentence"] != "":
ai_sentence["input_sentence"] += "<br>"
ai_sentence["input_sentence"] += sentence
if index == len(input_sentences) - 1:
# add ai_sentences to align_sentences
text_prediction_label, text_prediction_score = (
detect_text_by_ai_model(ai_sentence["input_sentence"])
)
ai_sentence["label"] = text_prediction_label
ai_sentence["similarity"] = text_prediction_score
self.aligned_sentences.append(ai_sentence)
else:
if previous_paraphrase is False or previous_paraphrase is None:
# add ai_sentences to align_sentences
if ai_sentence[
"input_sentence"
] != "" or current_index >= len(input_sentences):
text_prediction_label, text_prediction_score = (
detect_text_by_ai_model(
ai_sentence["input_sentence"],
)
)
ai_sentence["label"] = text_prediction_label
ai_sentence["similarity"] = text_prediction_score
self.aligned_sentences.append(ai_sentence)
# reset
ai_sentence = {
"input_sentence": "",
"matched_sentence": "",
"label": "",
"similarity": None,
"paraphrase": False,
"url": "",
}
# add searched_sentences to align_sentences
if searched_sentences["input_sentence"] != "":
self.found_img_url.extend(img_urls)
if check_human(searched_sentences):
searched_sentences["label"] = "HUMAN"
else:
searched_sentences["label"] = "MACHINE"
self.aligned_sentences.append(searched_sentences)
previous_paraphrase = paraphrase
def determine_text_origin_2(self):
"""
Determines the origin of the given text based on paraphrasing detection
and human authorship analysis.
Args:
text: The input text to be analyzed.
Returns:
str: The predicted origin of the text:
- "HUMAN": If the text is likely written by a human.
- "MACHINE": If the text is likely generated by a machine.
"""
print("CHECK TEXT:")
print("\tFrom search engine:")
# Classify by search engine
input_sentences = split_into_paragraphs(self.news_text)
for _ in range(5):
self.aligned_sentences_df = pd.concat(
[self.aligned_sentences_df, pd.DataFrame([{}])],
ignore_index=False,
)
for index, sentence in enumerate(input_sentences):
print(f"-------index = {index}-------")
print(f"current_sentence = {input_sentences[index]}")
if self.aligned_sentences_df["url"] is not None:
continue
self.aligned_sentences_df, img_urls = find_text_source(
input_sentences[index],
self.aligned_sentences_df,
)
def detect_image_origin(self):
print("CHECK IMAGE:")
if self.news_image is None:
self.image_prediction_label = "UNKNOWN"
self.image_prediction_score = 0.0
self.image_referent_url = None
return
for image in self.found_img_url:
print(f"\tfound_img_url: {image}")
matched_url, similarity = detect_image_from_news_image(
self.news_image,
self.found_img_url,
)
if matched_url is not None:
print(f"matching image: {matched_url}\nsimilarity: {similarity}\n")
self.image_prediction_label = "HUMAN"
self.image_prediction_score = similarity
self.image_referent_url = matched_url
return
matched_url, similarity = detect_image_by_reverse_search(
self.news_image,
)
if matched_url is not None:
print(f"matching image: {matched_url}\nsimilarity: {similarity}\n")
self.image_prediction_label = "HUMAN"
self.image_prediction_score = similarity
self.image_referent_url = matched_url
return
detected_label, score = detect_image_by_ai_model(self.news_image)
if detected_label:
print(f"detected_label: {detected_label} ({score})")
self.image_prediction_label = detected_label
self.image_prediction_score = score
self.image_referent_url = None
return
self.image_prediction_label = "UNKNOWN"
self.image_prediction_score = 50
self.image_referent_url = None
def determine_news_origin(self):
if self.text_prediction_label == "MACHINE":
text_prediction_score = 100 - self.text_prediction_score
elif self.text_prediction_label == "UNKNOWN":
text_prediction_score = 50
else:
text_prediction_score = self.text_prediction_score
if self.image_prediction_label == "MACHINE":
image_prediction_score = 100 - self.image_prediction_score
elif self.image_prediction_label == "UNKNOWN":
image_prediction_score = 50
else:
image_prediction_score = self.image_prediction_score
news_prediction_score = (
text_prediction_score + image_prediction_score
) / 2
if news_prediction_score > 50:
self.news_prediction_score = news_prediction_score
self.news_prediction_label = "HUMAN"
else:
self.news_prediction_score = 100 - news_prediction_score
self.news_prediction_label = "MACHINE"
def generate_analysis_report(self):
self.determine_text_origin()
self.detect_image_origin()
def analyze_details(self):
entities_with_colors = []
for index, aligned_sentence in enumerate(self.aligned_sentences):
# Get entity-words (in pair) with colors
entities_with_colors = highlight_entities(
aligned_sentence["input_sentence"],
aligned_sentence["matched_sentence"],
)
self.aligned_sentences[index]["entities"] = entities_with_colors
ordinary_user_table = self.create_ordinary_user_table()
fact_checker_table = self.create_fact_checker_table()
governor_table = self.create_governor_table()
return ordinary_user_table, fact_checker_table, governor_table
def get_text_urls(self):
return set(self.text_referent_url)
def compare_sentences(self, sentence_1, sentence_2, position, color):
"""
Compares two sentences and identifies common phrases,
outputting their start and end positions.
"""
if not sentence_1 or not sentence_2: # Handle empty strings
return []
s = SequenceMatcher(None, sentence_1, sentence_2)
common_phrases = []
for block in s.get_matching_blocks():
if block.size > 0: # Ignore zero-length matches
start_1 = block.a
end_1 = block.a + block.size
start_2 = block.b
end_2 = block.b + block.size
phrase = sentence_1[
start_1:end_1
] # Or sentence_2[start_2:end_2], they are the same
common_phrases.append(
{
"phrase": phrase,
"start_1": start_1 + position,
"end_1": end_1 + position,
"start_2": start_2,
"end_2": end_2,
"color": color,
},
)
position += len(sentence_1)
return common_phrases, position
def create_fact_checker_table(self):
rows = []
max_length = 30 # TODO: put this in configuration
rows.append(self.format_image_fact_checker_row(max_length))
for aligned_sentence in self.aligned_sentences:
if "input_sentence" not in aligned_sentence:
continue
# Get index of equal phrases in input and source sentences
equal_idx_1, equal_idx_2 = extract_equal_text(
aligned_sentence["input_sentence"],
aligned_sentence["matched_sentence"],
)
# Get entity-words (in pair) with colors
# entities_with_colors = highlight_entities(
# aligned_sentence["input_sentence"],
# aligned_sentence["matched_sentence"],
# )
self.fact_checker_table.append(
[
aligned_sentence,
equal_idx_1,
equal_idx_2,
aligned_sentence["entities"],
],
)
for row in self.fact_checker_table:
formatted_row = self.format_text_fact_checker_row(row, max_length)
rows.append(formatted_row)
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Source (corresponding URL provided in Originality)</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_fact_checker_row(self, row, max_length=30):
entity_count = 0
if row[0]["input_sentence"] == "":
return ""
if row[0]["matched_sentence"] != "": # source is not empty
# highlight entities
input_sentence, highlight_idx_input = apply_highlight(
row[0]["input_sentence"],
row[3],
"input",
)
source_sentence, highlight_idx_source = apply_highlight(
row[0]["matched_sentence"],
row[3],
"source",
)
entity_count = len(row[3])
# Color overlapping words
input_sentence = self.color_text(
input_sentence,
row[1],
highlight_idx_input,
) # text, index of highlight words
source_sentence = self.color_text(
source_sentence,
row[2],
highlight_idx_source,
) # text, index of highlight words
input_sentence = input_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
source_sentence = source_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
else:
input_sentence = row[0]["input_sentence"]
source_sentence = row[0]["matched_sentence"]
label = row[0]["label"]
score = row[0]["similarity"]
url = row[0]["url"] #
short_url = self.shorten_url(url, max_length)
source_text_url = f"""<a href="{url}">{short_url}</a>"""
entity_count_text = self.get_entity_count_text(entity_count)
return f"""
<tr>
<td>{input_sentence}</td>
<td>{source_sentence}</td>
<td>{label}<br>({score * 100:.2f}%)<br><br>{entity_count_text}</td> # noqa: E501
<td>{source_text_url}</td>
</tr>
"""
def format_image_fact_checker_row(self, max_length=30):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image = f"""<img src="{self.image_referent_url}" width="200" height="150">""" # noqa: E501
short_url = self.shorten_url(self.image_referent_url, max_length)
source_image_url = (
f"""<a href="{self.image_referent_url}">{short_url}</a>"""
)
else:
source_image = "Image not found"
source_image_url = ""
return f"""<tr><td>input image</td><td>{source_image}</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td>{source_image_url}</td></tr>""" # noqa: E501
def create_ordinary_user_table(self):
rows = []
max_length = 30 # TODO: put this in configuration
rows.append(self.format_image_ordinary_user_row(max_length))
rows.append(self.format_text_ordinary_user_row(max_length))
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left; border-collapse:collapse;"> # noqa: E501
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_ordinary_user_row(self, max_length=30):
input_sentences = ""
source_text_urls = ""
label = ""
scores = 0
sentence_count = 0
for index, row in enumerate(self.aligned_sentences):
if row["input_sentence"] == "":
continue
input_sentences += row["input_sentence"] + "<br><br>"
label = self.aligned_sentences[index]["label"]
url = self.aligned_sentences[index]["url"] #
short_url = self.shorten_url(url, max_length)
source_text_urls += f"""<a href="{url}">{short_url}</a><br>"""
sentence_count += 1
scores, label = self.calculate_score_label()
return f"""
<tr>
<td>{input_sentences}</td>
<td>{label}<br>({scores * 100:.2f}%)</td>
<td>{source_text_urls}</td>
</tr>
"""
def format_image_ordinary_user_row(self, max_length=30):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
short_url = self.shorten_url(self.image_referent_url, max_length)
source_image_url = (
f"""<a href="{self.image_referent_url}">{short_url}</a>"""
)
else:
# source_image = "Image not found"
source_image_url = ""
return f"""<tr><td>input image</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td>{source_image_url}</td></tr>""" # noqa: E501
def create_governor_table(self):
rows = []
max_length = 30 # TODO: put this in configuration
rows.append(self.format_image_governor_row(max_length))
for aligned_sentence in self.aligned_sentences:
if "input_sentence" not in aligned_sentence:
continue
# Get index of equal phrases in input and source sentences
equal_idx_1, equal_idx_2 = extract_equal_text(
aligned_sentence["input_sentence"],
aligned_sentence["matched_sentence"],
)
# Get entity-words (in pair) with colors
# entities_with_colors = highlight_entities(
# aligned_sentence["input_sentence"],
# aligned_sentence["matched_sentence"],
# )
self.governor_table.append(
[
aligned_sentence,
equal_idx_1,
equal_idx_2,
aligned_sentence["entities"],
],
)
formatted_row = self.format_text_governor_row(max_length)
rows.append(formatted_row)
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Source (corresponding URL provided in Originality)</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_governor_row(self, max_length=30):
input_sentences = ""
source_sentences = ""
source_text_urls = ""
label = ""
sentence_count = 0
entity_count = 0
for row in self.governor_table:
print(f"governor_row: {row}")
if row[0]["input_sentence"] == "":
continue
if row[0]["matched_sentence"] != "": # source is not empty
# highlight entities
input_sentence, highlight_idx_input = apply_highlight(
row[0]["input_sentence"],
row[3],
"input",
entity_count,
)
source_sentence, highlight_idx_source = apply_highlight(
row[0]["matched_sentence"],
row[3],
"source",
entity_count,
)
entity_count += len(row[3])
# Color overlapping words
input_sentence = self.color_text(
input_sentence,
row[1],
highlight_idx_input,
) # text, index of highlight words
source_sentence = self.color_text(
source_sentence,
row[2],
highlight_idx_source,
) # text, index of highlight words
input_sentence = input_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
source_sentence = source_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
else:
input_sentence = row[0]["input_sentence"]
source_sentence = row[0]["matched_sentence"]
# convert score to HUMAN-based score:
input_sentences += input_sentence + "<br><br>"
source_sentences += source_sentence + "<br><br>"
url = row[0]["url"]
short_url = self.shorten_url(url, max_length)
source_text_urls += f"""<a href="{url}">{short_url}</a><br>"""
sentence_count += 1
score, label = self.calculate_score_label()
entity_count_text = self.get_entity_count_text(entity_count)
return f"""
<tr>
<td>{input_sentences}</td>
<td>{source_sentences}</td>
<td>{label}<br>({score * 100:.2f}%)<br><br>{entity_count_text}</td>
<td>{source_text_urls}</td>
</tr>
"""
def format_image_governor_row(self, max_length=30):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image = f"""<img src="{self.image_referent_url}" width="200" height="150">""" # noqa: E501
short_url = self.shorten_url(self.image_referent_url, max_length)
source_image_url = (
f"""<a href="{self.image_referent_url}">{short_url}</a>"""
)
else:
source_image = "Image not found"
source_image_url = ""
return f"""<tr><td>input image</td><td>{source_image}</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td>{source_image_url}</td></tr>""" # noqa: E501
def get_entity_count_text(self, entity_count):
if entity_count <= 0:
entity_count_text = ""
elif entity_count == 1:
entity_count_text = "with altered entity"
else:
entity_count_text = "with altered entities"
return entity_count_text
def shorten_url(self, url, max_length=30):
if url is None:
return ""
if len(url) > max_length:
short_url = url[:max_length] + "..."
else:
short_url = url
return short_url
def color_text(self, text, colored_idx, highlighted_idx):
paragraph = ""
words = text.split()
starts, ends = self.extract_starts_ends(colored_idx)
starts, ends = self.filter_indices(starts, ends, highlighted_idx)
previous_end = 0
for start, end in zip(starts, ends):
paragraph += " ".join(words[previous_end:start])
equal_words = " ".join(words[start:end])
paragraph += f" <span style='color:#00FF00;'>{equal_words}</span> "
previous_end = end
# Some left words due to the punctuation separated from
# the highlighting text
equal_words = " ".join(words[previous_end:])
print(f"starts_2: {previous_end}")
print(f"ends_2: {len(words) - 1}")
print(f"equal_words: {words[previous_end:]}")
paragraph += f" <span style='color:#00FF00;'>{equal_words}</span> "
return paragraph
def extract_starts_ends(self, colored_idx):
starts = []
ends = []
for index in colored_idx:
starts.append(index["start"])
ends.append(index["end"])
return starts, ends
def filter_indices(self, starts, ends, ignore_indices):
"""
Filters start and end indices to exclude any indices present in the
ignore_indices list.
Args:
starts: A list of starting indices.
ends: A list of ending indices. Must be the same length as starts.
ignore_indices: A list of indices to exclude.
Returns:
A tuple of two lists: filtered_starts and filtered_ends.
Returns empty lists if the input is invalid
or if all ranges are filtered out.
Prints error messages for invalid input.
Examples:
starts = [0, 5, 10]
ends = [3, 7, 12]
ignore_indices = [1, 2, 11, 17]
# Output:
starts = [0, 3, 5, 10, 12]
ends = [0, 3, 7, 10, 12]
"""
if len(starts) != len(ends):
print(
"Error: The 'starts' and 'ends' lists must have the same length.", # noqa: E501
)
return [], []
filtered_starts = []
filtered_ends = []
for i in range(len(starts)):
start = starts[i]
end = ends[i]
if end < start:
print(
f"Error: End index {end} is less than start index {start} at position {i}.", # noqa: E501
)
return [], []
start_end = list(range(start, end + 1, 1))
start_end = list(set(start_end) - set(ignore_indices))
new_start, new_end = self.extract_sequences(start_end)
filtered_starts.extend(new_start)
filtered_ends.extend(new_end)
return filtered_starts, filtered_ends
def extract_sequences(self, numbers):
if len(numbers) == 1:
return [numbers[0]], [numbers[0]]
numbers.sort()
starts = []
ends = []
for i, number in enumerate(numbers):
if i == 0:
start = number
end = number
continue
if number - 1 == numbers[i - 1]:
end = number
else:
starts.append(start)
ends.append(end + 1)
start = number
end = number
if i == len(numbers) - 1:
starts.append(start)
ends.append(end + 1)
return starts, ends
def calculate_score_label(self):
human_score = []
machine_score = []
machine_flag = False
for sentence in self.aligned_sentences:
if sentence["input_sentence"] == "":
continue
if sentence["label"] == "HUMAN":
human_score.append(sentence["similarity"])
elif sentence["label"] == "MACHINE":
machine_score.append(1 - sentence["similarity"])
machine_flag = True
if machine_flag is True and len(machine_score) > 0:
# average value of machine_score
machine_score_avg = sum(machine_score) / len(machine_score)
if machine_score_avg < 0.5:
machine_score_avg = 1 - machine_score_avg
return machine_score_avg, "MACHINE"
elif machine_flag is False and len(human_score) > 0:
# average value of human_score
human_score_avg = sum(human_score) / len(human_score)
return human_score_avg, "HUMAN"
else:
return 0, "UNKNOWN"