import os import shutil import random import pandas as pd import numpy as np import nltk import google.generativeai as genai import csv from transformers import ( AutoTokenizer, DataCollatorWithPadding, AutoModelForSequenceClassification, EarlyStoppingCallback, TrainerCallback, TrainingArguments, Trainer ) from openai import OpenAI from sklearn.neural_network import MLPClassifier from sklearn.metrics import roc_auc_score, accuracy_score from os.path import join from langchain.chat_models import ChatOpenAI from datasets import load_metric, load_dataset, Dataset from copy import deepcopy from bart_score import BARTScorer import argparse # Constants TOGETHER_API_KEY = "your_together_api_key" OPENAI_API_KEY = "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" GEMINI_API_KEY = "your_gemini_key" LOG_FILE = "data/99_log.txt" OUTPUT_FILE = "data/result.txt" METRIC_NAME = "roc_auc" TRAIN_RATIO = 0.8 VAL_RATIO = 0.1 NUMBER_OF_MAX_EPOCH_WITH_EARLY_STOPPING = 10 PATIENCE = 3 BATCH_SIZE = 8 OPTIMIZED_METRIC = "roc_auc" SEED = 0 TEMPERATURE = 0.0 IS_OUTPUT_NORMALIZATION = False RATIO = 0.9 HUMAN_LABEL = 0 MACHINE_LABEL = 1 BART = "bart" MULTIMODEL = "multimodel" SINGLE_FROM_MULTIMODEL = "single_from_multimodel" # Environment setup os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY os.environ['CURL_CA_BUNDLE'] = '' os.environ['REQUESTS_CA_BUNDLE'] = '' # Download necessary NLTK data nltk.download('punkt') nltk.download('punkt_tab') # Chat model configurations chat_model = ChatOpenAI(temperature=TEMPERATURE, model_name="gpt-3.5-turbo-0125") # API Models and Paths CHATGPT = "ChatGPT" GEMINI = "Gemini" # LLAMA_2_70_CHAT_TEMP_0 = "LLaMa" API_ERROR = "API_ERROR" IGNORE_BY_API_ERROR = "IGNORE_BY_API_ERROR" # Initialize BARTScorer bart_scorer = BARTScorer(device='cuda:0', checkpoint="facebook/bart-large-cnn") # Generative AI configuration genai.configure(api_key=GEMINI_API_KEY, transport='rest') generation_config = { "temperature": TEMPERATURE, } GEMINI_MODEL = genai.GenerativeModel('gemini-pro', generation_config=generation_config) # Model paths MODEL_PATHS = { "LLaMa": "meta-llama/Llama-2-70b-chat-hf", "QWEN": "Qwen/Qwen1.5-72B-Chat", "Yi": "NousResearch/Nous-Hermes-2-Yi-34B", "Mixtral": "mistralai/Mixtral-8x7B-Instruct-v0.1", "OLMo": "allenai/OLMo-7B-Instruct", "Phi": "microsoft/phi-2", "OpenChat": "openchat/openchat-3.5-1210", "WizardLM": "WizardLM/WizardLM-13B-V1.2", "Vicuna": "lmsys/vicuna-13b-v1.5" } TOGETHER_PATH ='https://api.together.xyz' # Roberta model configurations ROBERTA_BASE = "roberta-base" ROBERTA_LARGE = "roberta-large" ROBERTA_MODEL_PATHS = { ROBERTA_BASE: "roberta-base", ROBERTA_LARGE: "roberta-large" } LEARNING_RATES = { ROBERTA_BASE: 2e-5, ROBERTA_LARGE: 8e-6 } MODEL_NAME = ROBERTA_BASE # Tokenizer initialization tokenizer = AutoTokenizer.from_pretrained(ROBERTA_MODEL_PATHS[MODEL_NAME]) # Custom callback for Trainer class CustomCallback(TrainerCallback): """ Custom callback to evaluate the training dataset at the end of each epoch. """ def __init__(self, trainer) -> None: super().__init__() self._trainer = trainer def on_epoch_end(self, args, state, control, **kwargs): """ At the end of each epoch, evaluate the training dataset. """ if control.should_evaluate: control_copy = deepcopy(control) self._trainer.evaluate(eval_dataset=self._trainer.train_dataset, metric_key_prefix="train") return control_copy # Metric loading metric = load_metric(METRIC_NAME) def compute_metrics(evaluation_predictions): """ Function to compute evaluation metrics for model predictions. Parameters: evaluation_predictions (tuple): A tuple containing two elements: - predictions (array-like): The raw prediction scores from the model. - labels (array-like): The true labels for the evaluation data. Returns: dict: A dictionary containing the computed evaluation metrics. """ # Unpack predictions and labels from the input tuple raw_predictions, true_labels = evaluation_predictions # Convert raw prediction scores to predicted class labels predicted_labels = np.argmax(raw_predictions, axis=1) # Compute and return the evaluation metrics return metric.compute(prediction_scores=predicted_labels, references=true_labels, average="macro") def abstract_proofread(model_path, temperature, base_url, api_key, prompt): """ Function to proofread an abstract using an AI language model. Parameters: model_path (str): The path or identifier of the AI model to use. temperature (float): Sampling temperature for the model's output. base_url (str): The base URL for the API endpoint. api_key (str): The API key for authentication. prompt (str): The text prompt to provide to the AI for proofreading. Returns: str: The proofread abstract generated by the AI model. """ # Initialize the AI client with the provided API key and base URL ai_client = OpenAI(api_key=api_key, base_url=base_url) # Create a chat completion request with the system message and user prompt chat_completion = ai_client.chat.completions.create( messages=[ { "role": "system", "content": "You are an AI assistant", }, { "role": "user", "content": prompt, } ], model=model_path, max_tokens=1024, temperature=temperature, ) # Return the content of the first choice's message return chat_completion.choices[0].message.content def proofread_by_model_name(model_name, input_text, normalize_output): """ Proofreads the given input text using the specified model. Args: model_name (str): The name of the model to use for proofreading. input_text (str): The text to be proofread. normalize_output (bool): Whether to normalize the output or not. Returns: str: The proofread text. """ # Constants for API access base_url = TOGETHER_PATH api_key = TOGETHER_API_KEY temperature = TEMPERATURE # Retrieve the model path from the dictionary if model_name in MODEL_PATHS: model_path = MODEL_PATHS[model_name] else: raise ValueError("Model name not found in the dictionary.") # Formulate the prompt for the model prompt = f"Proofreading for the text: ```{input_text}```" # Apply output normalization if required if normalize_output: prompt = output_normalization(prompt) # Debugging: Print the prompt print(f"Prompt: {prompt}") # Call the abstract proofreading function with the prepared parameters return abstract_proofread(model_path, temperature, base_url, api_key, prompt) def gemini_proofread(input_text, normalize_output): """ Proofreads the given text using the GEMINI_MODEL. Parameters: input_text (str): The text to be proofread. normalize_output (bool): Flag indicating whether to normalize the output. Returns: str: The proofread text. """ prompt = f"Proofreading for the text: ```{input_text}```" if normalize_output: prompt = output_normalization(prompt) response = GEMINI_MODEL.generate_content(prompt) return response.text def print_and_log(message): """ Prints and logs the given message to a log file. Parameters: message (str): The message to be printed and logged. """ print(message) with open(LOG_FILE, "a+", encoding='utf-8') as log_file: log_file.write(message + "\n") def write_to_file(filename, content): """ Writes the given content to a specified file. Parameters: filename (str): The name of the file to write to. content (str): The content to be written. """ print(content) with open(filename, "a+", encoding='utf-8') as file: file.write(content) def output_normalization(prompt): """ Normalizes the output by appending a specific instruction to the prompt. Parameters: prompt (str): The initial prompt. Returns: str: The modified prompt. """ return prompt + " Please only output the proofread text without any explanation." def chatGPT_proofread(input_text, normalize_output): """ Proofreads the given text using the chat_model. Parameters: input_text (str): The text to be proofread. normalize_output (bool): Flag indicating whether to normalize the output. Returns: str: The proofread text. """ prompt = f"Proofreading for the text: ```{input_text}```" if normalize_output: prompt = output_normalization(prompt) print(f"Starting API call with prompt: {prompt}") result = chat_model.predict(prompt) print(f"Ending API call with prompt: {prompt}") return result def normalize_text(input_text): """ Normalizes the given text by removing certain characters and extra spaces. Parameters: input_text (str): The text to be normalized. Returns: str: The normalized text. """ result = input_text.strip() result = result.replace("**", "") result = result.replace("\n", " ") result = result.replace(" ", " ") # Remove extra spaces return result def write_to_csv(filename, row_data): """ Writes a row of data to a specified CSV file. Parameters: filename (str): The name of the CSV file. row_data (list): The row data to be written. """ with open(filename, 'a+', encoding='UTF8', newline='') as file: writer = csv.writer(file) writer.writerow(row_data) def number_of_csv_lines(filename): """ Returns the number of lines in a specified CSV file. Parameters: filename (str): The name of the CSV file. Returns: int: The number of lines in the CSV file. """ file_data = pd.read_csv(filename, sep=',').values return len(file_data) def read_csv_data(input_file): """ Reads data from a specified CSV file. Parameters: input_file (str): The name of the CSV file. Returns: numpy.ndarray: The data read from the CSV file. """ file_data = pd.read_csv(input_file, dtype='string', keep_default_na=False, sep=',').values return file_data def bart_score(text_1, text_2): """ Computes the BART score between two texts. Parameters: text_1 (str): The first text. text_2 (str): The second text. Returns: float: The BART score. """ score = bart_scorer.score([text_1], [text_2]) return score def check_bart_score(input_text, raw_text): """ Checks if the BART score between input_text and raw_text is above a threshold. Parameters: input_text (str): The input text. raw_text (str): The raw text to compare against. Returns: bool: True if the score is above the threshold, False otherwise. """ THRESHOLD = -2.459 normalized_text = normalize_text(raw_text) score = bart_score(input_text, normalized_text)[0] return score >= THRESHOLD def get_column(input_file, column_name): """ Retrieves a specific column from a CSV file. Parameters: input_file (str): The name of the CSV file. column_name (str): The name of the column to retrieve. Returns: numpy.ndarray: The values from the specified column. """ df = pd.read_csv(input_file, dtype='string', keep_default_na=False, sep=',') column_data = df[column_name] return column_data.values def generate_column_names(categories): """ Generates a list of column names based on given categories. Parameters: categories (list): The list of categories. Returns: list: The generated list of column names. """ column_names = ['human'] for name in categories: column_names.append(name) for first in categories: for second in categories: column_names.append(f"{first}_{second}") return column_names def write_new_data(output_file, current_data, column_names): """ Writes new data to a CSV file based on current data and column names. Parameters: output_file (str): The name of the output CSV file. current_data (dict): The current data to be written. column_names (list): The list of column names. """ data_row = [current_data[column] for column in column_names] write_to_csv(output_file, data_row) def refine(input_text, candidate): """ Refines the candidate string by removing specific surrounding marks if they are present in the input_text with a count difference of exactly 2. Args: input_text (str): The original text. candidate (str): The candidate text to be refined. Returns: str: The refined candidate text. """ # Create a copy of the candidate string and strip whitespace refined_candidate = candidate.strip() # List of marks to check and potentially remove marks = ["```", "'", '"'] # Iterate through each mark for mark in marks: # Count occurrences of the mark in input_text and refined_candidate count_input_text = input_text.count(mark) count_refined_candidate = refined_candidate.count(mark) # Check if the mark should be stripped if (count_refined_candidate == count_input_text + 2 and refined_candidate.startswith(mark) and refined_candidate.endswith(mark)): # Strip the mark from both ends of the refined_candidate refined_candidate = refined_candidate.strip(mark) return refined_candidate def extract_by_best_similarity(input_text, raw_text): """ Extracts the best candidate string from the raw text based on the highest similarity score compared to the input text. The similarity score is calculated using the BART score. Args: input_text (str): The original text. raw_text (str): The raw text containing multiple candidate strings. Returns: str: The best candidate string with the highest similarity score. Returns the input text if no suitable candidate is found. """ # Refine the raw text refined_raw_text = refine(input_text, raw_text) # Tokenize the refined raw text into sentences raw_candidates = nltk.sent_tokenize(refined_raw_text) # Split sentences further by newlines to get individual candidates candidate_list = [] for sentence in raw_candidates: candidate_list.extend(sentence.split("\n")) # Initialize variables to track the best similarity score and the best candidate best_similarity = -9999 best_candidate = "" # Iterate over each candidate to find the best one based on the BART score for candidate in candidate_list: refined_candidate = refine(input_text, candidate) if check_bart_score(input_text, refined_candidate): score = bart_score(input_text, refined_candidate)[0] if score > best_similarity: best_similarity = score best_candidate = refined_candidate # Print the best candidate found print(f"best_candidate = {best_candidate}") # Return the best candidate if found, otherwise return the input text if best_candidate == "": return input_text return best_candidate def proofread_with_best_similarity(input_text, model_kind): """ Proofreads the input text using the specified model and extracts the best-corrected text based on similarity. Args: input_text (str): The original text to be proofread. model_kind (str): The kind of model to use for proofreading (e.g., CHATGPT, GEMINI). Returns: tuple: A tuple containing the raw proofread text and the best-corrected text. """ # Normalize the input text normalized_input_text = normalize_text(input_text) print_and_log(f"INPUT = {normalized_input_text}") result_text = "" raw_text = "" for i in range(1): # Loop is redundant as it runs only once; consider removing if unnecessary # Select the proofreading model based on model_kind if model_kind == CHATGPT: raw_text = chatGPT_proofread(normalized_input_text, normalize_output=IS_OUTPUT_NORMALIZATION) elif model_kind == GEMINI: raw_text = gemini_proofread(normalized_input_text, normalize_output=IS_OUTPUT_NORMALIZATION) else: raw_text = proofread_by_model_name(model_kind, normalized_input_text, normalize_output=IS_OUTPUT_NORMALIZATION) # Extract the best candidate text based on similarity result_text = extract_by_best_similarity(normalized_input_text, raw_text) # Log the raw and result texts print_and_log(f"RAW_{i} = {raw_text}") print_and_log(f"RESULT_{i} = {result_text}") # Normalize the result text result_text = normalize_text(result_text) # If a valid result is obtained, return it if result_text != "": return raw_text, result_text # Return the raw and result texts return raw_text, result_text def generate_file_name(existing_data_file, existing_kinds, new_kinds): """ Generates a new file name based on the path of an existing data file and a combination of existing and new kinds. Args: existing_data_file (str): The path to the existing data file. existing_kinds (list): A list of existing kinds. new_kinds (list): A list of new kinds. Returns: str: The generated file name with the full path. """ # Combine existing and new kinds into a single list combined_kinds = existing_kinds + new_kinds # Get the directory path of the existing data file directory_path = os.path.dirname(existing_data_file) # Create a new file name by joining the kinds with underscores and adding a suffix new_file_name = "_".join(combined_kinds) + "_with_best_similarity.csv" # Combine the directory path with the new file name to get the full output file path output_file_path = os.path.join(directory_path, new_file_name) return output_file_path def generate_new_data_with_best_similarity(existing_data_file, existing_kinds, new_kinds): """ Generates new data with the best similarity based on existing and new kinds, and writes the results to a CSV file. Args: existing_data_file (str): The path to the existing data file. existing_kinds (list): A list of existing kinds. new_kinds (list): A list of new kinds. Returns: None """ # Combine existing and new kinds into a single list all_kinds = existing_kinds + new_kinds # Generate column names for the CSV file column_names = generate_column_names(all_kinds) # Generate column names for existing kinds existing_column_names = generate_column_names(existing_kinds) # Generate the output file name output_file = generate_file_name(existing_data_file, existing_kinds, new_kinds) # Create the output file with column names if it doesn't exist if not os.path.exists(output_file): write_to_csv(output_file, column_names) # Read existing data from the file existing_data = {kind: get_column(existing_data_file, kind) for kind in existing_column_names} # Read input data from the output file input_data = read_csv_data(output_file) start_index = len(input_data) print(f"start_index = {start_index}") num_rows = len(existing_data["human"]) global_generate_set = [] global_reuse = [] for index in range(start_index, num_rows): # Initialize generation and reuse sets generate_set = [] reuse_set = [] # Prepare the current generation dictionary current_generation = {kind: existing_data[kind][index] for kind in existing_column_names} print(f"current_generation before generation = {current_generation}") human_text = current_generation["human"] # Generate new kinds based on human text for kind in new_kinds: _, generated_text = proofread_with_best_similarity(human_text, kind) current_generation[kind] = generated_text generate_set.append(kind) print(f"current_generation after generate one = {current_generation}") # Generate combinations of kinds for first_kind in all_kinds: for second_kind in all_kinds: combination_name = f"{first_kind}_{second_kind}" if combination_name not in current_generation: if first_kind in current_generation and current_generation[first_kind] == human_text: generated_text = current_generation[second_kind] reuse_set.append(f"{combination_name} from {second_kind}") else: is_need_generation = True for first_kind_2 in all_kinds: if first_kind != first_kind_2 and current_generation[first_kind] == current_generation[first_kind_2]: combination_name_2 = f"{first_kind_2}_{second_kind}" if combination_name_2 in current_generation: generated_text = current_generation[combination_name_2] reuse_set.append(f"{combination_name} from {combination_name_2}") is_need_generation = False break if is_need_generation: _, generated_text = proofread_with_best_similarity(current_generation[first_kind], second_kind) generate_set.append(f"{first_kind}_{second_kind}") current_generation[combination_name] = generated_text # Write the current generation to the output file write_new_data(output_file, current_generation, column_names) # Update global sets global_generate_set.append(generate_set) global_reuse def shuffle(array, seed): """ Shuffles the elements of each sublist in the given array using the specified seed. Args: array (list of lists): The array containing sublists to shuffle. seed (int): The seed value for the random number generator. Returns: None """ for sublist in array: random.Random(seed).shuffle(sublist) def generate_human_with_shuffle(dataset_name, column_name, num_samples, output_file): """ Generates a shuffled list of sentences from the dataset and writes them to a CSV file. Args: dataset_name (str): The name of the dataset to load. column_name (str): The column name to extract sentences from. num_samples (int): The number of samples to process. output_file (str): The path to the output CSV file. Returns: None """ # Load the dataset dataset = load_dataset(dataset_name) data = dataset['train'] lines = [] # Tokenize sentences and add to the lines list for sample in data: nltk_tokens = nltk.sent_tokenize(sample[column_name]) lines.extend(nltk_tokens) # Filter out empty lines filtered_lines = [line for line in lines if line != ""] lines = filtered_lines # Shuffle the lines shuffle([lines], seed=SEED) # Ensure the output file exists and write the header if it doesn't if not os.path.exists(output_file): header = ["human"] write_to_csv(output_file, header) # Get the number of lines already processed in the output file number_of_processed_lines = number_of_csv_lines(output_file) # Print the initial lines to be processed print(f"Lines before processing: {lines[:num_samples]}") # Slice the lines list to get the unprocessed lines lines = lines[number_of_processed_lines:num_samples] # Print the lines after slicing print(f"Lines after slicing: {lines}") # Process each line and write to the output file for index, human in enumerate(lines): normalized_text = normalize_text(human) output_data = [normalized_text] write_to_csv(output_file, output_data) print(f"Processed {index + 1} / {len(lines)}; Total processed: {number_of_processed_lines + index + 1} / {num_samples}") def split(data, ratio): """ Splits the data into training and testing sets based on the given ratio. Args: data (list): The dataset to split. ratio (float): The ratio for splitting the data into training and testing sets. Returns: tuple: A tuple containing the training data and the testing data. """ train_size = int(len(data) * ratio) train_data = data[:train_size] test_data = data[train_size:] return train_data, test_data def bart_score_in_batch(text_1, text_2): """ Calculates the BART score for pairs of texts in batches. Args: text_1 (list of str): The first list of texts. text_2 (list of str): The second list of texts. Returns: list: A list of BART scores for each pair of texts. """ return bart_scorer.score(text_1, text_2, batch_size=BATCH_SIZE) def extract_feature_in_batch(text_1, text_2, feature_kind): """ Extracts features for pairs of texts using BART scores. Args: text_1 (list of str): The first list of texts. text_2 (list of str): The second list of texts. feature_kind (str): The type of feature to extract. Returns: list: A list of extracted features. """ features = bart_score_in_batch(text_1, text_2) return features def abstract_train(features, labels): """ Trains a model using the given features and labels. Args: features (list): The input features for training. labels (list): The target labels for training. Returns: object: The trained model. """ model = MLPClassifier() model.fit(features, labels) return model def evaluate_model(model, features, labels): """ Evaluates the model's performance using accuracy and ROC AUC scores. Args: model (object): The trained model to evaluate. features (list): The input features for evaluation. labels (list): The target labels for evaluation. Returns: None """ predictions = model.predict(features) rounded_predictions = [round(value) for value in predictions] accuracy = accuracy_score(labels, rounded_predictions) write_to_file(OUTPUT_FILE, f"Accuracy: {accuracy * 100.0:.1f}%\n") roc_auc = roc_auc_score(labels, rounded_predictions) write_to_file(OUTPUT_FILE, f"ROC AUC: {roc_auc * 100.0:.1f}%\n") def combine_text_with_BERT_format(text_list): """ Combines a list of texts into a single string formatted for BERT input. Args: text_list (list of str): The list of texts to combine. Returns: str: The combined text string formatted for BERT input. """ combined_text = f"{text_list[0]}" for i in range(1, len(text_list)): combined_text += f"{text_list[i]}" return combined_text def preprocess_function_multimodel(sample): """ Preprocesses a given sample for a multi-model setup by calculating BART scores and formatting the text for BERT input. Args: sample (dict): A dictionary containing a key "text", which is a list of lists of strings. Returns: dict: A dictionary containing tokenized and preprocessed text data. """ num_texts = len(sample["text"][0]) # Number of texts in each sub-sample texts_grouped_by_index = [[] for _ in range(num_texts)] # Initialize empty lists for grouping texts by index # Group texts by their index across sub-samples for sub_sample in sample["text"]: for i in range(num_texts): texts_grouped_by_index[i].append(sub_sample[i]) # Calculate BART scores for each text pair (text[0] with text[i]) bart_scores = [bart_score_in_batch(texts_grouped_by_index[0], texts_grouped_by_index[i]) for i in range(1, num_texts)] combined_texts = [] # Process each sub-sample for BERT input for index, sub_sample in enumerate(sample["text"]): text_array = [sub_sample[0]] # Start with the input text score_generation_pairs = [] # Pair scores with their corresponding generations for i in range(1, num_texts): generation_text = sub_sample[i] generation_score = bart_scores[i-1][index] score_generation_pairs.append((generation_score, generation_text)) # Sort pairs by score in descending order sorted_pairs = sorted(score_generation_pairs, reverse=True) # Append sorted texts to text_array for _, sorted_text in sorted_pairs: text_array.append(sorted_text) # Combine texts into a single BERT-formatted string combined_text = combine_text_with_BERT_format(text_array) combined_texts.append(combined_text) # Tokenize the combined texts for BERT return tokenizer(combined_texts, add_special_tokens=False, truncation=True) def preprocess_function_single_from_multimodel(sample): """ Extracts the first text from each sub-sample in a multi-model sample and tokenizes it. Args: sample (dict): A dictionary containing a key "text", which is a list of lists of strings. Returns: dict: A dictionary containing tokenized text data. """ combined_texts = [] # Iterate through each sub-sample for sub_sample in sample["text"]: input_text = sub_sample[0] # Extract the first text from the sub-sample combined_texts.append(input_text) # Append it to the list of combined texts # Tokenize the combined texts return tokenizer(combined_texts, truncation=True) def check_api_error(data): """ Checks if any item in the provided data indicates an API error. Args: data (list): A list of items to be checked for API errors. Returns: bool: True if an API error or ignore by API error is found, otherwise False. """ for item in data: if item == API_ERROR or item == IGNORE_BY_API_ERROR: # Check for API error indicators return True # Return True if an error indicator is found return False # Return False if no error indicators are found def train_only_by_transformer_with_test_evaluation_early_stop(train_data, test_data, input_type, num_classes=2): """ Trains a transformer model using the provided training and testing datasets with early stopping. Args: train_data (Dataset): The training dataset. test_data (Dataset): The testing dataset. input_type (str): The type of input data, either MULTIMODEL or SINGLE_FROM_MULTIMODEL. num_classes (int, optional): The number of classes for classification. Defaults to 2. Returns: Trainer: The trained model wrapped in a Trainer object. """ # Preprocess datasets based on the input type if input_type == MULTIMODEL: train_data = train_data.map(preprocess_function_multimodel, batched=True) test_data = test_data.map(preprocess_function_multimodel, batched=True) elif input_type == SINGLE_FROM_MULTIMODEL: train_data = train_data.map(preprocess_function_single_from_multimodel, batched=True) test_data = test_data.map(preprocess_function_single_from_multimodel, batched=True) # Data collator to pad inputs data_collator = DataCollatorWithPadding(tokenizer=tokenizer) # Load appropriate model based on number of classes if num_classes == 3: model = AutoModelForSequenceClassification.from_pretrained( "pretrained_model/roberta-base_num_labels_3", num_labels=num_classes) else: model = AutoModelForSequenceClassification.from_pretrained( ROBERTA_MODEL_PATHS[MODEL_NAME], num_labels=num_classes) learning_rate = LEARNING_RATES[MODEL_NAME] output_folder = "training_with_callbacks" # Remove the output folder if it already exists if os.path.exists(output_folder): shutil.rmtree(output_folder) # Training arguments training_args = TrainingArguments( output_dir=output_folder, evaluation_strategy="epoch", logging_strategy="epoch", save_strategy="epoch", learning_rate=learning_rate, per_device_train_batch_size=BATCH_SIZE, per_device_eval_batch_size=BATCH_SIZE, num_train_epochs=NUMBER_OF_MAX_EPOCH_WITH_EARLY_STOPPING, weight_decay=0.01, push_to_hub=False, metric_for_best_model=OPTIMIZED_METRIC, load_best_model_at_end=True ) # Create Trainer object trainer = Trainer( model=model, args=training_args, train_dataset=train_data, eval_dataset=test_data, tokenizer=tokenizer, data_collator=data_collator, compute_metrics=compute_metrics, callbacks=[EarlyStoppingCallback(early_stopping_patience=PATIENCE)] ) # Add custom callback trainer.add_callback(CustomCallback(trainer)) # Start training trainer.train() return trainer def calculate_number_of_models(num_columns): """ Calculates the number of models required based on the number of columns. Args: num_columns (int): The total number of columns. Returns: int: The number of models required. Raises: Exception: If the number of models cannot be calculated to match the number of columns. """ num_models = 0 count_human = 1 # Initial count representing human input while True: count_single = num_models # Single model count count_pair = num_models * num_models # Pair model count total_count = count_human + count_single + count_pair if total_count == num_columns: return num_models elif total_count > num_columns: raise Exception("Cannot calculate the number of models to match the number of columns") num_models += 1 def read_multimodel_data_from_csv(multimodel_csv_file): """ Reads multimodel data from a CSV file and organizes it into a structured format. Args: multimodel_csv_file (str): Path to the CSV file containing multimodel data. Returns: list: A list of dictionaries, each containing 'human', 'single', and 'pair' data. Raises: Exception: If there is an error in reading the CSV file or processing the data. """ # Read CSV data into a list of lists input_data = read_csv_data(multimodel_csv_file) # Initialize the result list structured_data = [] # Calculate the number of models based on the number of columns in the first row num_models = calculate_number_of_models(len(input_data[0])) # Process each row in the input data for row in input_data: row_data = {} index = 0 # Extract human data row_data["human"] = row[index] index += 1 # Extract single model data single_model_data = [] for _ in range(num_models): single_model_data.append(row[index]) index += 1 row_data["single"] = single_model_data # Extract pair model data pair_model_data = [] for _ in range(num_models): sub_pair_data = [] for _ in range(num_models): sub_pair_data.append(row[index]) index += 1 pair_model_data.append(sub_pair_data) row_data["pair"] = pair_model_data # Append the structured row data to the result list structured_data.append(row_data) return structured_data def check_error(data_item): """ Checks for errors in a data item by verifying the 'human', 'single', and 'pair' fields. Args: data_item (dict): A dictionary containing 'human', 'single', and 'pair' data. Returns: bool: True if any of the fields contain an error, otherwise False. """ # Check for API error in the 'human' field if check_api_error(data_item["human"]): return True # Check for API error in the 'single' model data for single_text in data_item["single"]: if check_api_error(single_text): return True # Get the number of models from the 'single' model data num_models = len(data_item["single"]) # Check for API error in the 'pair' model data for i in range(num_models): for j in range(num_models): if check_api_error(data_item["pair"][i][j]): return True # No errors found return False def create_pair_sample(data_item, training_indices): """ Creates pair samples for training by comparing human data with machine-generated data. Args: data_item (dict): A dictionary containing 'human', 'single', and 'pair' data. training_indices (list): A list of indices used for training. Returns: list: A list of dictionaries, each containing a 'text' array and a 'label'. """ # Initialize the result list result_samples = [] # Check if there is any error in the data_item if check_error(data_item): return result_samples print(training_indices) print(data_item) # Create machine samples for train_idx in training_indices: if data_item["human"] != data_item["single"][train_idx]: text_array = [] machine_text = data_item["single"][train_idx] text_array.append(machine_text) for sub_idx in training_indices: text_array.append(data_item["pair"][train_idx][sub_idx]) sample = { "text": text_array, "label": MACHINE_LABEL } result_samples.append(sample) # Create human samples text_array = [data_item["human"]] for train_idx in training_indices: text_array.append(data_item["single"][train_idx]) human_sample = { "text": text_array, "label": HUMAN_LABEL } # Append human samples for each machine sample num_machine_samples = len(result_samples) for _ in range(num_machine_samples): result_samples.append(human_sample) return result_samples def create_pair_test_sample(data_item, training_indices, testing_indices): """ Creates pair test samples by comparing human data with machine-generated data. Args: data_item (dict): A dictionary containing 'human', 'single', and 'pair' data. training_indices (list): A list of indices used for training. testing_indices (list): A list of indices used for testing. Returns: list: A list of dictionaries, each containing a 'text' array and a 'label'. """ # Initialize the result list result_samples = [] # Check if there is any error in the data_item if check_error(data_item): return result_samples # Create machine samples based on testing indices for test_idx in testing_indices: if data_item["human"] != data_item["single"][test_idx]: text_array = [] machine_text = data_item["single"][test_idx] text_array.append(machine_text) for train_idx in training_indices: text_array.append(data_item["pair"][test_idx][train_idx]) sample = { "text": text_array, "label": MACHINE_LABEL } result_samples.append(sample) # Create human sample text_array = [data_item["human"]] for train_idx in training_indices: text_array.append(data_item["single"][train_idx]) human_sample = { "text": text_array, "label": HUMAN_LABEL } # Append the human sample for each machine sample num_machine_samples = len(result_samples) for _ in range(num_machine_samples): result_samples.append(human_sample) return result_samples def create_train_val_sample(data, training_indices): """ Creates training and validation samples from the provided data. Args: data (list): A list of data items, each to be processed. training_indices (list): A list of indices used for training. Returns: list: A list of training and validation samples created from the data. """ # Initialize the result list result_samples = [] # Process each item in the data for data_item in data: # Create pair samples for the current item sub_samples = create_pair_sample(data_item, training_indices) # Extend the result list with the created sub-samples result_samples.extend(sub_samples) return result_samples def create_test_sample(data, training_indices, testing_indices): """ Creates test samples from the provided data by comparing human data with machine-generated data. Args: data (list): A list of data items, each to be processed. training_indices (list): A list of indices used for training. testing_indices (list): A list of indices used for testing. Returns: list: A list of test samples created from the data. """ # Initialize the result list result_samples = [] # Process each item in the data for data_item in data: # Create pair test samples for the current item sub_samples = create_pair_test_sample(data_item, training_indices, testing_indices) # Extend the result list with the created sub-samples result_samples.extend(sub_samples) return result_samples def distribute_data(data, train_indices, test_indices, train_ratio, val_ratio): """ Distributes the data into training, validation, and test samples. Args: data (list): A list of data items to be split and processed. train_indices (list): A list of indices used for training. test_indices (list): A list of indices used for testing. train_ratio (float): The ratio of data to be used for training. val_ratio (float): The ratio of data to be used for validation. Returns: tuple: A tuple containing lists of training, validation, and test samples. """ # Split the data into training, validation, and test sets train_data, val_data, test_data = split_train_val_test(data, train_ratio, val_ratio) # Create training samples train_samples = create_train_val_sample(train_data, train_indices) write_to_file(OUTPUT_FILE, f"train samples = {len(train_samples)}\n") # Create validation samples val_samples = create_train_val_sample(val_data, train_indices) write_to_file(OUTPUT_FILE, f"val samples = {len(val_samples)}\n") # Create test samples test_samples = create_test_sample(test_data, train_indices, test_indices) write_to_file(OUTPUT_FILE, f"test samples = {len(test_samples)}\n") return train_samples, val_samples, test_samples def convert_to_huggingface_with_multimodel(samples): """ Converts a list of samples to the Hugging Face Dataset format. Args: samples (list): A list of samples to be converted. Returns: Dataset: A Hugging Face Dataset object created from the samples. """ return Dataset.from_list(samples) def train_by_transformer_with_multimodel_and_early_stop(train_samples, val_samples, input_type): """ Trains a transformer model with multimodal data and early stopping. Args: train_samples (list): A list of training samples. val_samples (list): A list of validation samples. input_type (str): The type of input data (e.g., multimodal). Returns: object: The trained model with early stopping. """ # Convert training and validation samples to Hugging Face Dataset format train_data = convert_to_huggingface_with_multimodel(train_samples) val_data = convert_to_huggingface_with_multimodel(val_samples) # Train the model with early stopping and return the trained model return train_only_by_transformer_with_test_evaluation_early_stop(train_data, val_data, input_type) def test_by_transformer_with_multimodel(detector, test_samples, input_type): """ Tests a trained transformer model with multimodal data. Args: detector (object): The trained model to be evaluated. test_samples (list): A list of test samples. input_type (str): The type of input data (e.g., multimodal). Returns: None """ # Convert test samples to Hugging Face Dataset format test_data = convert_to_huggingface_with_multimodel(test_samples) # Apply the appropriate preprocessing function based on the input type if input_type == MULTIMODEL: test_data = test_data.map(preprocess_function_multimodel, batched=True) elif input_type == SINGLE_FROM_MULTIMODEL: test_data = test_data.map(preprocess_function_single_from_multimodel, batched=True) print("Test data:", test_data) # Evaluate the model on the test data result = detector.evaluate(eval_dataset=test_data) print("Test result:", result) # Extract and log the ROC AUC score roc_auc = result['eval_roc_auc'] write_to_file(OUTPUT_FILE, "roc_auc: %.1f%%" % (roc_auc * 100.0) + "\n") def extract_by_feature_kind(samples, feature_type): """ Extracts features from the given samples based on the specified feature type. Args: samples (list): A list of samples where each sample is a dictionary with 'text' and 'label' keys. feature_type (str): The type of feature to extract. Returns: tuple: A tuple containing the extracted features and corresponding labels. """ text_1_list = [] text_2_list = [] labels = [] for sample in samples: text_1_list.append(sample["text"][0]) text_2_list.append(sample["text"][1]) labels.append(sample["label"]) # Extract features in batch based on the feature type features = extract_feature_in_batch(text_1_list, text_2_list, feature_type) return features, labels def train_by_feature_kind(train_samples, feature_type): """ Trains a model using features extracted from the training samples based on the specified feature type. Args: train_samples (list): A list of training samples where each sample is a dictionary with 'text' and 'label' keys. feature_type (str): The type of feature to extract for training. Returns: object: The trained model. """ # Extract features and labels from the training samples features, labels = extract_by_feature_kind(train_samples, feature_type) # Convert features to a numpy array and reshape for training features = np.array(features) features = features.reshape(-1, 1) # Train the model using the extracted features and labels model = abstract_train(features, labels) return model def test_by_feature_kind(detector, samples, feature_type): """ Tests a detector using features extracted from the provided samples based on the specified feature type. Args: detector (object): The detector model to be evaluated. samples (list): A list of samples where each sample is a dictionary with 'text' and 'label' keys. feature_type (str): The type of feature to extract for testing. Returns: None """ # Extract features and labels from the samples features, labels = extract_by_feature_kind(samples, feature_type) # Convert features to a numpy array and reshape for evaluation features = np.array(features) features = features.reshape(-1, 1) # Evaluate the detector model using the extracted features and labels evaluate_model(detector, features, labels) def general_process_multimodels_train_val_test(train_samples, val_samples, test_samples): """ General process for training, validating, and testing models using multi-model and feature kind approaches. Args: train_samples (list): Training samples. val_samples (list): Validation samples. test_samples (list): Test samples. Returns: None """ # Multi-model approach input_kind = MULTIMODEL write_to_file(OUTPUT_FILE, f"\nInput kind = {input_kind} \n") # Train detector using multi-model with early stopping detector = train_by_transformer_with_multimodel_and_early_stop(train_samples, val_samples, input_kind) detector.save_model("./models/multi_model_detector") # Evaluate on train set write_to_file(OUTPUT_FILE, f"EVALUATE ON TRAIN SET \n") test_by_transformer_with_multimodel(detector, train_samples, input_kind) # Evaluate on validation set write_to_file(OUTPUT_FILE, f"EVALUATE ON VALIDATION SET \n") test_by_transformer_with_multimodel(detector, val_samples, input_kind) # Evaluate on test set write_to_file(OUTPUT_FILE, f"EVALUATE ON TEST SET \n") test_by_transformer_with_multimodel(detector, test_samples, input_kind) # Single from multi-model approach input_kind = SINGLE_FROM_MULTIMODEL write_to_file(OUTPUT_FILE, f"\nInput kind = {input_kind} \n") # Train detector using single from multi-model with early stopping detector = train_by_transformer_with_multimodel_and_early_stop(train_samples, val_samples, input_kind) detector.save_model("./models/single_model_detector_1") # Evaluate on train set write_to_file(OUTPUT_FILE, f"EVALUATE ON TRAIN SET \n") test_by_transformer_with_multimodel(detector, train_samples, input_kind) # Evaluate on validation set write_to_file(OUTPUT_FILE, f"EVALUATE ON VALIDATION SET \n") test_by_transformer_with_multimodel(detector, val_samples, input_kind) # Evaluate on test set write_to_file(OUTPUT_FILE, f"EVALUATE ON TEST SET \n") test_by_transformer_with_multimodel(detector, test_samples, input_kind) # Feature kind approach sample_length = len(train_samples[0]["text"]) if sample_length == 2: # Check if the sample length is 2, indicating BART feature kind feature_kind = BART write_to_file(OUTPUT_FILE, f"\nFeature kind = {feature_kind} \n") # Train detector using feature kind detector = train_by_feature_kind(train_samples, feature_kind) # Evaluate on train set write_to_file(OUTPUT_FILE, f"EVALUATE ON TRAIN SET \n") test_by_feature_kind(detector, train_samples, feature_kind) # Evaluate on validation set write_to_file(OUTPUT_FILE, f"EVALUATE ON VALIDATION SET \n") test_by_feature_kind(detector, val_samples, feature_kind) # Evaluate on test set write_to_file(OUTPUT_FILE, f"EVALUATE ON TEST SET \n") test_by_feature_kind(detector, test_samples, feature_kind) def process_multi_models_with_validation(multimodel_csv_file, train_indices, test_indices, num_samples): """ Processes multi-model data with validation, training, and testing. Args: multimodel_csv_file (str): Path to the CSV file containing multi-model data. train_indices (list): Indices for the training data. test_indices (list): Indices for the testing data. num_samples (int): Number of samples to process. Returns: None """ # Log the details of the process write_to_file(OUTPUT_FILE, f"PROCESSING FILE={multimodel_csv_file} \n") write_to_file(OUTPUT_FILE, f"EXPERIMENT WITH {MODEL_NAME} model \n") write_to_file(OUTPUT_FILE, f"NUMBER OF MAX EPOCHS WITH EARLY STOPPING = {NUMBER_OF_MAX_EPOCH_WITH_EARLY_STOPPING} \n") write_to_file(OUTPUT_FILE, f"PATIENCE = {PATIENCE} \n") write_to_file(OUTPUT_FILE, f"OPTIMIZED METRIC = {OPTIMIZED_METRIC} \n") write_to_file(OUTPUT_FILE, f"BATCH SIZE = {BATCH_SIZE} \n") write_to_file(OUTPUT_FILE, f"Number of samples = {num_samples} \n") # Read multi-model data from the CSV file data = read_multimodel_data_from_csv(multimodel_csv_file) # Limit data to the specified number of samples data = data[:num_samples] # Distribute data into training, validation, and testing sets train_samples, val_samples, test_samples = distribute_data(data, train_indices, test_indices, TRAIN_RATIO, VAL_RATIO) # Log the training and testing indices write_to_file(OUTPUT_FILE, f"Multimodel training with train indices {train_indices}, test with test indices {test_indices} \n") # Process the multi-models for training, validation, and testing general_process_multimodels_train_val_test(train_samples, val_samples, test_samples) def split_train_val_test(data, train_ratio, val_ratio): """ Splits the dataset into training, validation, and test sets based on specified ratios. Args: data (list): The dataset to be split. train_ratio (float): The ratio of the dataset to be used for training. val_ratio (float): The ratio of the dataset to be used for validation. Returns: tuple: A tuple containing three lists - (train_data, val_data, test_data). """ # Calculate the number of samples for the training set num_train_samples = int(len(data) * train_ratio) # Calculate the number of samples for the validation set num_val_samples = int(len(data) * val_ratio) # Split the data into training, validation, and test sets train_data = data[:num_train_samples] val_data = data[num_train_samples:(num_train_samples + num_val_samples)] test_data = data[(num_train_samples + num_val_samples):] return train_data, val_data, test_data def main(): """ Main function to handle argument parsing and execute the sequence of operations including data generation and processing with multiple models. """ parser = argparse.ArgumentParser(description='SimLLM.') # Argument for specifying the list of large language models parser.add_argument('--LLMs', nargs="+", default=[CHATGPT],#, "Yi", "OpenChat"], help='List of large language models') # Argument for specifying the list of training indexes parser.add_argument('--train_indexes', type=int, default=[0,1,2], nargs="+", help='List of training indexes') # Argument for specifying the list of testing indexes parser.add_argument('--test_indexes', type=int, default=[0], nargs="+", help='List of testing indexes') # Argument for specifying the number of samples parser.add_argument('--num_samples', type=int, default=5000, help='Number of samples') # Argument for multimodel_csv_file parser.add_argument('--multimodel_csv_file', type=str, default="data/ChatGPT_Nous_Hermes_2_Yi_34B_openchat_3_5_1210_with_best_similarity.csv", help='multimodel_csv_file') # Parse the command-line arguments args = parser.parse_args() if args.multimodel_csv_file == "": # Static dataset parameters dataset_name = "xsum" column_name = "document" num_samples = args.num_samples output_file = "data/test.csv" # Generate human data with shuffle # generate_human_with_shuffle(dataset_name, column_name, num_samples, output_file) # Existing data parameters existing_data_file = output_file existing_kinds = [] # New kinds of models to generate data with new_kinds = args.LLMs # Generate new data with best similarity generate_new_data_with_best_similarity(existing_data_file, existing_kinds, new_kinds) # Generate a filename for the multimodel CSV file multimodel_csv_file = generate_file_name(existing_data_file, existing_kinds, new_kinds) else: multimodel_csv_file = args.multimodel_csv_file # Number of samples to process (-1 means process all samples) num_samples_to_process = -1 # Training and testing indexes from arguments training_indexes = args.train_indexes testing_indexes = args.test_indexes # Process multiple models with validation process_multi_models_with_validation(multimodel_csv_file, training_indexes, testing_indexes, num_samples_to_process) if __name__ == "__main__": main()