news_verification / src /application /content_detection.py
pmkhanh7890's picture
Add comments to text module
0827f9d
raw
history blame
31.6 kB
from difflib import SequenceMatcher
import pandas as pd
from src.application.image.image_detection import (
detect_image_by_ai_model,
detect_image_by_reverse_search,
detect_image_from_news_image,
)
from src.application.text.entity import (
apply_highlight,
highlight_entities,
)
from src.application.text.helper import extract_equal_text
from src.application.text.model_detection import (
detect_text_by_ai_model,
predict_generation_model,
)
from src.application.text.preprocessing import split_into_paragraphs
from src.application.text.search_detection import (
PARAPHRASE_THRESHOLD_MACHINE,
find_sentence_source,
)
class NewsVerification:
def __init__(self):
self.news_text = ""
self.news_title = ""
self.news_content = ""
self.news_image = ""
self.text_prediction_label: list[str] = ["UNKNOWN"]
self.text_prediction_score: list[float] = [0.0]
self.image_prediction_label: list[str] = ["UNKNOWN"]
self.image_prediction_score: list[str] = [0.0]
self.image_referent_url: list[str] = []
self.news_prediction_label = ""
self.news_prediction_score = -1
# news' urls to find img
self.found_img_url: list[str] = []
# Analyzed results
self.aligned_sentences_df: pd.DataFrame = pd.DataFrame(
columns=[
"input",
"source",
"label",
"similarity",
"paraphrase",
"url",
"group",
"entities",
],
)
self.grouped_url_df: pd.DataFrame = pd.DataFrame()
# For formatting ouput tables
self.ordinary_user_table: list = []
self.fact_checker_table: list = []
self.governor_table: list = []
def load_news(self, news_title, news_content, news_image):
self.news_text = (news_title + "\n\n" + news_content).strip()
self.news_title = news_title
self.news_content = news_content
self.news_image = news_image
def determine_text_origin(self):
self.find_text_source()
# Group inout and source by url
def concat_text(series):
return " ".join(
series.astype(str).tolist(),
) # Handle mixed data types and NaNs
self.grouped_url_df = self.aligned_sentences_df.groupby("url").agg(
{
"input": concat_text,
"source": concat_text,
},
)
self.grouped_url_df = self.grouped_url_df.reset_index()
# Add new columns for label and score
self.grouped_url_df["label"] = None
self.grouped_url_df["score"] = None
print(f"aligned_sentences_df:\n {self.aligned_sentences_df}")
for index, row in self.grouped_url_df.iterrows():
label, score = self.verify_text(row["url"])
if label == "UNKNOWN":
# Concatenate text from "input" in sentence_df
text = " ".join(row["input"])
# detect by baseline model
label, score = detect_text_by_ai_model(text)
self.grouped_url_df.at[index, "label"] = label
self.grouped_url_df.at[index, "score"] = score
# Overall label or score for the whole input text
if len(self.grouped_url_df) > 0:
machine_label = self.grouped_url_df[
self.grouped_url_df["label"].str.contains(
"MACHINE",
case=False,
na=False,
)
]
if len(machine_label) > 0:
label = " ".join(machine_label["label"].tolist())
self.text_prediction_label[0] = label
self.text_prediction_score[0] = machine_label["score"].mean()
else:
machine_label = self.aligned_sentences_df[
self.aligned_sentences_df["label"] == "HUMAN"
]
self.text_prediction_label[0] = "HUMAN"
self.text_prediction_score[0] = machine_label["score"].mean()
else: # no source found in the input text
print("No source found in the input text")
text = " ".join(self.aligned_sentences_df["input"].tolist())
# detect by baseline model
label, score = detect_text_by_ai_model(text)
self.text_prediction_label[0] = label
self.text_prediction_score[0] = score
def find_text_source(self):
"""
Determines the origin of the given text based on paraphrasing detection
and human authorship analysis.
Args:
text: The input text to be analyzed.
Returns:
str: The predicted origin of the text:
- "HUMAN": If the text is likely written by a human.
- "MACHINE": If the text is likely generated by a machine.
"""
print("CHECK TEXT:")
print("\tFrom search engine:")
# Classify by search engine
# input_sentences = split_into_sentences(self.news_text)
input_paragraphs = split_into_paragraphs(self.news_text)
# Setup df for input_sentences
for _ in range(len(input_paragraphs)):
self.aligned_sentences_df = pd.concat(
[
self.aligned_sentences_df,
pd.DataFrame(
[
{
"input": None,
"source": None,
"label": None,
"similarity": None,
"paraphrase": None,
"url": None,
"entities": None,
},
],
),
],
ignore_index=True,
)
# find a source for each sentence
for index, _ in enumerate(input_paragraphs):
similarity = self.aligned_sentences_df.loc[index, "similarity"]
if similarity is not None:
if similarity > PARAPHRASE_THRESHOLD_MACHINE:
continue
print(f"\n-------index = {index}-------")
print(f"current_text = {input_paragraphs[index]}\n")
self.aligned_sentences_df, img_urls = find_sentence_source(
input_paragraphs,
index,
self.aligned_sentences_df,
)
self.found_img_url.extend(img_urls)
# determine if the whole source is from a news or not
def verify_text(self, url):
label = "UNKNOWN"
score = 0
# calculate the average similarity when the similary score
# in each row of sentences_df is higher than 0.8
filtered_by_url = self.aligned_sentences_df[
self.aligned_sentences_df["url"] == url
]
filtered_by_similarity = filtered_by_url[
filtered_by_url["similarity"] > 0.8
]
if len(filtered_by_similarity) / len(self.aligned_sentences_df) > 0.5:
# check if "MACHINE" is in self.aligned_sentences_df["label"]:
contains_machine = (
filtered_by_similarity["label"]
.str.contains(
"MACHINE",
case=False,
na=False,
)
.any()
)
if contains_machine:
label = "MACHINE"
machine_rows = filtered_by_similarity[
filtered_by_similarity["label"].str.contains(
"MACHINE",
case=False,
na=False,
)
]
generated_model, _ = predict_generation_model(self.news_text)
label += f"<br>({generated_model})"
score = machine_rows["similarity"].mean()
else:
label = "HUMAN"
human_rows = filtered_by_similarity[
filtered_by_similarity["label"].str.contains(
"HUMAN",
case=False,
na=False,
)
]
score = human_rows["similarity"].mean()
return label, score
def determine_image_origin(self):
print("CHECK IMAGE:")
if self.news_image is None:
self.image_prediction_label = "UNKNOWN"
self.image_prediction_score = 0.0
self.image_referent_url = None
return
matched_url, similarity = detect_image_from_news_image(
self.news_image,
self.found_img_url,
)
if matched_url is not None:
print(f"matched image: {matched_url}\nsimilarity: {similarity}\n")
self.image_prediction_label = "HUMAN"
self.image_prediction_score = similarity
self.image_referent_url = matched_url
return
matched_url, similarity = detect_image_by_reverse_search(
self.news_image,
)
if matched_url is not None:
print(f"matched image: {matched_url}\tScore: {similarity}%\n")
self.image_prediction_label = "HUMAN"
self.image_prediction_score = similarity
self.image_referent_url = matched_url
return
detected_label, score = detect_image_by_ai_model(self.news_image)
if detected_label:
print(f"detected_label: {detected_label} ({score})")
self.image_prediction_label = detected_label
self.image_prediction_score = score
self.image_referent_url = None
return
self.image_prediction_label = "UNKNOWN"
self.image_prediction_score = 50
self.image_referent_url = None
def generate_analysis_report(self):
if self.news_text != "":
self.determine_text_origin()
if self.news_image != "":
self.determine_image_origin()
def analyze_details(self):
self.handle_entities()
ordinary_user_table = self.create_ordinary_user_table()
fact_checker_table = self.create_fact_checker_table()
governor_table = self.create_governor_table()
return ordinary_user_table, fact_checker_table, governor_table
def handle_entities(self):
entities_with_colors = []
for index, row in self.grouped_url_df.iterrows():
# Get entity-words (in pair) with colors
entities_with_colors = highlight_entities(
row["input"],
row["source"],
)
for index, sentence in self.aligned_sentences_df.iterrows():
if sentence["url"] == row["url"]:
self.aligned_sentences_df.at[index, "entities"] = (
entities_with_colors # must use at
)
def get_text_urls(self):
return set(self.text_referent_url)
def compare_sentences(self, sentence_1, sentence_2, position, color):
"""
Compares two sentences and identifies common phrases,
outputting their start and end positions.
"""
if not sentence_1 or not sentence_2: # Handle empty strings
return []
s = SequenceMatcher(None, sentence_1, sentence_2)
common_phrases = []
for block in s.get_matching_blocks():
if block.size > 0: # Ignore zero-length matches
start_1 = block.a
end_1 = block.a + block.size
start_2 = block.b
end_2 = block.b + block.size
phrase = sentence_1[
start_1:end_1
] # Or sentence_2[start_2:end_2], they are the same
common_phrases.append(
{
"phrase": phrase,
"start_1": start_1 + position,
"end_1": end_1 + position,
"start_2": start_2,
"end_2": end_2,
"color": color,
},
)
position += len(sentence_1)
return common_phrases, position
def create_fact_checker_table(self):
rows = []
rows.append(self.format_image_fact_checker_row())
for _, row in self.aligned_sentences_df.iterrows():
if row["input"] is None:
continue
if row["source"] is None:
equal_idx_1 = equal_idx_2 = []
else: # Get index of equal phrases in input and source sentences
equal_idx_1, equal_idx_2 = extract_equal_text(
row["input"],
row["source"],
)
self.fact_checker_table.append(
[
row,
equal_idx_1,
equal_idx_2,
row["entities"],
row["url"],
],
)
previous_url = None
span_row = 1
for index, row in enumerate(self.fact_checker_table):
current_url = row[4]
last_url_row = False
# First row or URL change
if index == 0 or current_url != previous_url:
first_url_row = True
previous_url = current_url
# Increase counter "span_row" when the next url is the same
while (
index + span_row < len(self.fact_checker_table)
and self.fact_checker_table[index + span_row][4]
== current_url
):
span_row += 1
else:
first_url_row = False
span_row -= 1
if span_row == 1:
last_url_row = True
formatted_row = self.format_text_fact_checker_row(
row,
first_url_row,
last_url_row,
span_row,
)
rows.append(formatted_row)
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Source (URL in Originality)</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_fact_checker_row(
self,
row,
first_url_row=True,
last_url_row=True,
span_row=1,
):
entity_count = 0
if row[0]["input"] is None:
return ""
if row[0]["source"] is not None: # source is not empty
if row[3] is not None:
# highlight entities
input_sentence, highlight_idx_input = apply_highlight(
row[0]["input"],
row[3],
"input",
)
source_sentence, highlight_idx_source = apply_highlight(
row[0]["source"],
row[3],
"source",
)
else:
input_sentence = row[0]["input"]
source_sentence = row[0]["source"]
highlight_idx_input = []
highlight_idx_source = []
if row[3] is not None:
entity_count = len(row[3])
# Color overlapping words
input_sentence = self.color_text(
input_sentence,
row[1],
highlight_idx_input,
) # text, index of highlight words
source_sentence = self.color_text(
source_sentence,
row[2],
highlight_idx_source,
) # text, index of highlight words
# Replace _ to get correct formatting
# Original one having _ for correct word counting
input_sentence = input_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
source_sentence = source_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
else:
input_sentence = row[0]["input"]
source_sentence = row[0]["source"]
url = row[0]["url"]
# Displayed label and score by url
filterby_url = self.grouped_url_df[self.grouped_url_df["url"] == url]
if len(filterby_url) > 0:
label = filterby_url["label"].values[0]
score = filterby_url["score"].values[0]
else:
label = self.text_prediction_label[0]
score = self.text_prediction_score[0]
# Format displayed url
source_text_url = f"""<a href="{url}">{url}</a>"""
# Format displayed entity count
entity_count_text = self.get_entity_count_text(entity_count)
border_top = "border-top: 1px solid transparent;"
border_bottom = "border-bottom: 1px solid transparent;"
word_break = "word-break: break-all;"
if first_url_row is True:
# First & Last the group: no transparent
if last_url_row is True:
return f"""
<tr>
<td>{input_sentence}</td>
<td>{source_sentence}</td>
<td rowspan="{span_row}">{label}<br>
({score * 100:.2f}%)<br><br>
{entity_count_text}</td>
<td rowspan="{span_row}"; style="{word_break}";>{source_text_url}</td>
</tr>
"""
# First row of the group: transparent bottom border
return f"""
<tr>
<td style="{border_bottom}";>{input_sentence}</td>
<td style="{border_bottom}";>{source_sentence}</td>
<td rowspan="{span_row}">{label}<br>
({score * 100:.2f}%)<br><br>
{entity_count_text}</td>
<td rowspan="{span_row}"; style="{word_break}";>{source_text_url}</td>
</tr>
"""
else:
if last_url_row is True:
# NOT First row, Last row: transparent top border
return f"""
<tr>
<td style="{border_top}";>{input_sentence}</td>
<td style="{border_top}";>{source_sentence}</td>
</tr>
"""
else:
# NOT First & NOT Last row: transparent top & bottom borders
return f"""
<tr>
<td style="{border_top} {border_bottom}";>{input_sentence}</td>
<td style="{border_top} {border_bottom}";>{source_sentence}</td>
</tr>
"""
def format_image_fact_checker_row(self):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image = f"""<img src="{self.image_referent_url}" width="100" height="150">""" # noqa: E501
source_image_url = f"""<a href="{self.image_referent_url}">{self.image_referent_url}</a>""" # noqa: E501
else:
source_image = "Image not found"
source_image_url = ""
word_break = "word-break: break-all;"
return f"""
<tr>
<td>input image</td>
<td>{source_image}</td>
<td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td>
<td style="{word_break}";>{source_image_url}</td></tr>"""
def create_ordinary_user_table(self):
rows = []
rows.append(self.format_image_ordinary_user_row())
rows.append(self.format_text_ordinary_user_row())
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 340px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_ordinary_user_row(self, max_length=30):
input_sentences = ""
source_text_urls = ""
urls = []
for _, row in self.aligned_sentences_df.iterrows():
if row["input"] is None:
continue
input_sentences += row["input"] + "<br><br>"
url = row["url"]
if url not in urls:
urls.append(url)
source_text_urls += f"""<a href="{url}">{url}</a><br>"""
word_break = "word-break: break-all;"
return f"""
<tr>
<td>{input_sentences}</td>
<td>{self.text_prediction_label[0]}<br>
({self.text_prediction_score[0] * 100:.2f}%)</td>
<td style="{word_break}";>{source_text_urls}</td>
</tr>
"""
def format_image_ordinary_user_row(self, max_length=30):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image_url = f"""<a href="{self.image_referent_url}">{self.image_referent_url}</a>""" # noqa: E501
else:
source_image_url = ""
word_break = "word-break: break-all;"
return f"""<tr><td>input image</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td style="{word_break}";>{source_image_url}</td></tr>""" # noqa: E501
def create_governor_table(self):
rows = []
rows.append(self.format_image_governor_row())
for _, row in self.aligned_sentences_df.iterrows():
if row["input"] is None:
continue
if row["source"] is None:
equal_idx_1 = equal_idx_2 = []
else:
# Get index of equal phrases in input and source sentences
equal_idx_1, equal_idx_2 = extract_equal_text(
row["input"],
row["source"],
)
self.governor_table.append(
[
row,
equal_idx_1,
equal_idx_2,
row["entities"],
],
)
formatted_row = self.format_text_governor_row()
rows.append(formatted_row)
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Source (URL in Originality)</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_governor_row(self):
input_sentences = ""
source_sentences = ""
source_text_urls = ""
urls = []
sentence_count = 0
entity_count = [0, 0] # to get index of [-2]
for row in self.governor_table:
if row[0]["input"] is None:
continue
if row[0]["source"] is not None: # source is not empty
# highlight entities
input_sentence, highlight_idx_input = apply_highlight(
row[0]["input"],
row[3], # entities_with_colors
"input", # key
entity_count[
-2
], # since the last one is for current counting
)
source_sentence, highlight_idx_source = apply_highlight(
row[0]["source"],
row[3], # entities_with_colors
"source", # key
entity_count[
-2
], # since the last one is for current counting
)
# Color overlapping words
input_sentence = self.color_text(
input_sentence,
row[1],
highlight_idx_input,
) # text, index of highlight words
source_sentence = self.color_text(
source_sentence,
row[2],
highlight_idx_source,
) # text, index of highlight words
input_sentence = input_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
source_sentence = source_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
else:
if row[0]["source"] is None:
source_sentence = ""
else:
source_sentence = row[0]["source"]
input_sentence = row[0]["input"]
# convert score to HUMAN-based score:
input_sentences += input_sentence + "<br><br>"
source_sentences += source_sentence + "<br><br>"
url = row[0]["url"]
if url not in urls:
urls.append(url)
source_text_urls += f"""<a href="{url}">{url}</a><br><br>"""
sentence_count += 1
if row[3] is not None:
entity_count.append(len(row[3]))
entity_count_text = self.get_entity_count_text(sum(entity_count))
word_break = "word-break: break-all;"
return f"""
<tr>
<td>{input_sentences}</td>
<td>{source_sentences}</td>
<td>{self.text_prediction_label[0]}<br>
({self.text_prediction_score[0] * 100:.2f}%)<br><br>
{entity_count_text}</td>
<td style="{word_break}";>{source_text_urls}</td>
</tr>
"""
def format_image_governor_row(self):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image = f"""<img src="{self.image_referent_url}" width="100" height="150">""" # noqa: E501
source_image_url = f"""<a href="{self.image_referent_url}">{self.image_referent_url}</a>""" # noqa: E501
else:
source_image = "Image not found"
source_image_url = ""
word_break = "word-break: break-all;"
return f"""<tr><td>input image</td><td>{source_image}</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td style="{word_break}";>{source_image_url}</td></tr>""" # noqa: E501
def get_entity_count_text(self, entity_count):
if entity_count <= 0:
entity_count_text = ""
elif entity_count == 1:
entity_count_text = "with 1 altered entity"
else:
entity_count_text = "with altered entities"
return entity_count_text
def color_text(self, text, colored_idx, highlighted_idx):
sentence = ""
words = text.split()
starts, ends = self.extract_starts_ends(colored_idx)
starts, ends = self.filter_indices(starts, ends, highlighted_idx)
previous_end = 0
for start, end in zip(starts, ends):
sentence += " ".join(words[previous_end:start])
equal_words = " ".join(words[start:end])
sentence += f" <span style='color:#00FF00;'>{equal_words}</span> "
previous_end = end
sentence += " ".join(words[previous_end:])
return sentence
def extract_starts_ends(self, colored_idx):
starts = []
ends = []
for index in colored_idx:
starts.append(index["start"])
ends.append(index["end"])
return starts, ends
def filter_indices(self, starts, ends, ignore_indices):
"""
Filters start and end indices to exclude any indices present in the
ignore_indices list.
Args:
starts: A list of starting indices.
ends: A list of ending indices. Must be the same length as starts.
ignore_indices: A list of indices to exclude.
Returns:
A tuple of two lists: filtered_starts and filtered_ends.
Returns empty lists if the input is invalid
or if all ranges are filtered out.
Prints error messages for invalid input.
Examples:
starts = [0, 5, 10]
ends = [3, 7, 12] # words at the end will not be colored.
ignore_indices = [1, 2, 12, 17]
# Output:
starts = [0, 3, 5, 10]
ends = [1, 4, 7, 12]
"""
if len(starts) != len(ends):
print(
"Error: The 'starts' and 'ends' lists must have the same length.", # noqa: E501
)
return [], []
filtered_starts = []
filtered_ends = []
for i in range(len(starts)):
start = starts[i]
end = ends[i]
if end < start:
print(
f"Error: End index {end} is less than start index {start} at position {i}.", # noqa: E501
)
return [], []
start_end = list(range(start, end + 1, 1))
start_end = list(set(start_end) - set(ignore_indices))
# new_start, new_end = self.extract_sequences(start_end)
new_start, new_end = self.extract_new_startend(
start,
end,
ignore_indices,
)
filtered_starts.extend(new_start)
filtered_ends.extend(new_end)
return filtered_starts, filtered_ends
def extract_new_startend(self, start, end, ignore_indices):
# sort a set of ignore_indices
indexes = list(set(ignore_indices))
indexes.sort()
new_starts = []
new_ends = []
new_start = start
if indexes is None or len(indexes) < 1:
new_starts.append(start)
new_ends.append(end)
return new_starts, new_ends
for index in indexes:
if index < start:
continue
elif index >= end:
continue
new_starts.append(new_start)
new_ends.append(index)
new_start = index + 1
new_starts.append(new_start)
new_ends.append(end)
return new_starts, new_ends
def extract_sequences(self, numbers):
if len(numbers) == 1:
return [numbers[0]], [numbers[0]]
numbers.sort()
starts = []
ends = []
for i, number in enumerate(numbers):
if i == 0:
start = number
end = number
continue
if number - 1 == numbers[i - 1]:
end = number
else:
starts.append(start)
ends.append(end)
start = number
end = number
if i == len(numbers) - 1:
starts.append(start)
ends.append(end)
return starts, ends