from difflib import SequenceMatcher import pandas as pd from src.application.image.image_detection import ( detect_image_by_ai_model, detect_image_by_reverse_search, detect_image_from_news_image, ) from src.application.text.entity import ( apply_highlight, highlight_entities, ) from src.application.text.helper import extract_equal_text from src.application.text.model_detection import ( detect_text_by_ai_model, predict_generation_model, ) from src.application.text.preprocessing import split_into_paragraphs from src.application.text.search_detection import ( PARAPHRASE_THRESHOLD_MACHINE, find_sentence_source, ) class NewsVerification: def __init__(self): self.news_text = "" self.news_title = "" self.news_content = "" self.news_image = "" self.text_prediction_label: list[str] = ["UNKNOWN"] self.text_prediction_score: list[float] = [0.0] self.image_prediction_label: list[str] = ["UNKNOWN"] self.image_prediction_score: list[str] = [0.0] self.image_referent_url: list[str] = [] self.news_prediction_label = "" self.news_prediction_score = -1 # news' urls to find img self.found_img_url: list[str] = [] # Analyzed results self.aligned_sentences_df: pd.DataFrame = pd.DataFrame( columns=[ "input", "source", "label", "similarity", "paraphrase", "url", "group", "entities", ], ) self.grouped_url_df: pd.DataFrame = pd.DataFrame() # For formatting ouput tables self.ordinary_user_table: list = [] self.fact_checker_table: list = [] self.governor_table: list = [] def load_news(self, news_title, news_content, news_image): self.news_text = (news_title + "\n\n" + news_content).strip() self.news_title = news_title self.news_content = news_content self.news_image = news_image def determine_text_origin(self): self.find_text_source() # Group inout and source by url def concat_text(series): return " ".join( series.astype(str).tolist(), ) # Handle mixed data types and NaNs self.grouped_url_df = self.aligned_sentences_df.groupby("url").agg( { "input": concat_text, "source": concat_text, }, ) self.grouped_url_df = self.grouped_url_df.reset_index() # Add new columns for label and score self.grouped_url_df["label"] = None self.grouped_url_df["score"] = None print(f"aligned_sentences_df:\n {self.aligned_sentences_df}") for index, row in self.grouped_url_df.iterrows(): label, score = self.verify_text(row["url"]) if label == "UNKNOWN": # Concatenate text from "input" in sentence_df text = " ".join(row["input"]) # detect by baseline model label, score = detect_text_by_ai_model(text) self.grouped_url_df.at[index, "label"] = label self.grouped_url_df.at[index, "score"] = score # Overall label or score for the whole input text if len(self.grouped_url_df) > 0: machine_label = self.grouped_url_df[ self.grouped_url_df["label"].str.contains( "MACHINE", case=False, na=False, ) ] if len(machine_label) > 0: label = " ".join(machine_label["label"].tolist()) self.text_prediction_label[0] = label self.text_prediction_score[0] = machine_label["score"].mean() else: machine_label = self.aligned_sentences_df[ self.aligned_sentences_df["label"] == "HUMAN" ] self.text_prediction_label[0] = "HUMAN" self.text_prediction_score[0] = machine_label["score"].mean() else: # no source found in the input text print("No source found in the input text") text = " ".join(self.aligned_sentences_df["input"].tolist()) # detect by baseline model label, score = detect_text_by_ai_model(text) self.text_prediction_label[0] = label self.text_prediction_score[0] = score def find_text_source(self): """ Determines the origin of the given text based on paraphrasing detection and human authorship analysis. Args: text: The input text to be analyzed. Returns: str: The predicted origin of the text: - "HUMAN": If the text is likely written by a human. - "MACHINE": If the text is likely generated by a machine. """ print("CHECK TEXT:") print("\tFrom search engine:") # Classify by search engine # input_sentences = split_into_sentences(self.news_text) input_paragraphs = split_into_paragraphs(self.news_text) # Setup df for input_sentences for _ in range(len(input_paragraphs)): self.aligned_sentences_df = pd.concat( [ self.aligned_sentences_df, pd.DataFrame( [ { "input": None, "source": None, "label": None, "similarity": None, "paraphrase": None, "url": None, "entities": None, }, ], ), ], ignore_index=True, ) # find a source for each sentence for index, _ in enumerate(input_paragraphs): similarity = self.aligned_sentences_df.loc[index, "similarity"] if similarity is not None: if similarity > PARAPHRASE_THRESHOLD_MACHINE: continue print(f"\n-------index = {index}-------") print(f"current_text = {input_paragraphs[index]}\n") self.aligned_sentences_df, img_urls = find_sentence_source( input_paragraphs, index, self.aligned_sentences_df, ) self.found_img_url.extend(img_urls) # determine if the whole source is from a news or not def verify_text(self, url): label = "UNKNOWN" score = 0 # calculate the average similarity when the similary score # in each row of sentences_df is higher than 0.8 filtered_by_url = self.aligned_sentences_df[ self.aligned_sentences_df["url"] == url ] filtered_by_similarity = filtered_by_url[ filtered_by_url["similarity"] > 0.8 ] if len(filtered_by_similarity) / len(self.aligned_sentences_df) > 0.5: # check if "MACHINE" is in self.aligned_sentences_df["label"]: contains_machine = ( filtered_by_similarity["label"] .str.contains( "MACHINE", case=False, na=False, ) .any() ) if contains_machine: label = "MACHINE" machine_rows = filtered_by_similarity[ filtered_by_similarity["label"].str.contains( "MACHINE", case=False, na=False, ) ] generated_model, _ = predict_generation_model(self.news_text) label += f"
({generated_model})" score = machine_rows["similarity"].mean() else: label = "HUMAN" human_rows = filtered_by_similarity[ filtered_by_similarity["label"].str.contains( "HUMAN", case=False, na=False, ) ] score = human_rows["similarity"].mean() return label, score def determine_image_origin(self): print("CHECK IMAGE:") if self.news_image is None: self.image_prediction_label = "UNKNOWN" self.image_prediction_score = 0.0 self.image_referent_url = None return matched_url, similarity = detect_image_from_news_image( self.news_image, self.found_img_url, ) if matched_url is not None: print(f"matched image: {matched_url}\nsimilarity: {similarity}\n") self.image_prediction_label = "HUMAN" self.image_prediction_score = similarity self.image_referent_url = matched_url return matched_url, similarity = detect_image_by_reverse_search( self.news_image, ) if matched_url is not None: print(f"matched image: {matched_url}\tScore: {similarity}%\n") self.image_prediction_label = "HUMAN" self.image_prediction_score = similarity self.image_referent_url = matched_url return detected_label, score = detect_image_by_ai_model(self.news_image) if detected_label: print(f"detected_label: {detected_label} ({score})") self.image_prediction_label = detected_label self.image_prediction_score = score self.image_referent_url = None return self.image_prediction_label = "UNKNOWN" self.image_prediction_score = 50 self.image_referent_url = None def generate_analysis_report(self): if self.news_text != "": self.determine_text_origin() if self.news_image != "": self.determine_image_origin() def analyze_details(self): self.handle_entities() ordinary_user_table = self.create_ordinary_user_table() fact_checker_table = self.create_fact_checker_table() governor_table = self.create_governor_table() return ordinary_user_table, fact_checker_table, governor_table def handle_entities(self): entities_with_colors = [] for index, row in self.grouped_url_df.iterrows(): # Get entity-words (in pair) with colors entities_with_colors = highlight_entities( row["input"], row["source"], ) for index, sentence in self.aligned_sentences_df.iterrows(): if sentence["url"] == row["url"]: self.aligned_sentences_df.at[index, "entities"] = ( entities_with_colors # must use at ) def get_text_urls(self): return set(self.text_referent_url) def compare_sentences(self, sentence_1, sentence_2, position, color): """ Compares two sentences and identifies common phrases, outputting their start and end positions. """ if not sentence_1 or not sentence_2: # Handle empty strings return [] s = SequenceMatcher(None, sentence_1, sentence_2) common_phrases = [] for block in s.get_matching_blocks(): if block.size > 0: # Ignore zero-length matches start_1 = block.a end_1 = block.a + block.size start_2 = block.b end_2 = block.b + block.size phrase = sentence_1[ start_1:end_1 ] # Or sentence_2[start_2:end_2], they are the same common_phrases.append( { "phrase": phrase, "start_1": start_1 + position, "end_1": end_1 + position, "start_2": start_2, "end_2": end_2, "color": color, }, ) position += len(sentence_1) return common_phrases, position def create_fact_checker_table(self): rows = [] rows.append(self.format_image_fact_checker_row()) for _, row in self.aligned_sentences_df.iterrows(): if row["input"] is None: continue if row["source"] is None: equal_idx_1 = equal_idx_2 = [] else: # Get index of equal phrases in input and source sentences equal_idx_1, equal_idx_2 = extract_equal_text( row["input"], row["source"], ) self.fact_checker_table.append( [ row, equal_idx_1, equal_idx_2, row["entities"], row["url"], ], ) previous_url = None span_row = 1 for index, row in enumerate(self.fact_checker_table): current_url = row[4] last_url_row = False # First row or URL change if index == 0 or current_url != previous_url: first_url_row = True previous_url = current_url # Increase counter "span_row" when the next url is the same while ( index + span_row < len(self.fact_checker_table) and self.fact_checker_table[index + span_row][4] == current_url ): span_row += 1 else: first_url_row = False span_row -= 1 if span_row == 1: last_url_row = True formatted_row = self.format_text_fact_checker_row( row, first_url_row, last_url_row, span_row, ) rows.append(formatted_row) table = "\n".join(rows) return f"""
Comparison between input news and source news:
{table}
Input news Source (URL in Originality) Forensic Originality