Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import random | |
import pandas as pd | |
import numpy as np | |
import nltk | |
import google.generativeai as genai | |
import csv | |
from transformers import ( | |
AutoTokenizer, | |
DataCollatorWithPadding, | |
AutoModelForSequenceClassification, | |
EarlyStoppingCallback, | |
TrainerCallback, | |
TrainingArguments, | |
Trainer | |
) | |
from openai import OpenAI | |
from sklearn.neural_network import MLPClassifier | |
from sklearn.metrics import roc_auc_score, accuracy_score | |
from os.path import join | |
from langchain.chat_models import ChatOpenAI | |
from datasets import load_metric, load_dataset, Dataset | |
from copy import deepcopy | |
from bart_score import BARTScorer | |
import argparse | |
# Constants | |
TOGETHER_API_KEY = "your_together_api_key" | |
OPENAI_API_KEY = "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" | |
GEMINI_API_KEY = "your_gemini_key" | |
LOG_FILE = "data/99_log.txt" | |
OUTPUT_FILE = "data/result.txt" | |
METRIC_NAME = "roc_auc" | |
TRAIN_RATIO = 0.8 | |
VAL_RATIO = 0.1 | |
NUMBER_OF_MAX_EPOCH_WITH_EARLY_STOPPING = 10 | |
PATIENCE = 3 | |
BATCH_SIZE = 8 | |
OPTIMIZED_METRIC = "roc_auc" | |
SEED = 0 | |
TEMPERATURE = 0.0 | |
IS_OUTPUT_NORMALIZATION = False | |
RATIO = 0.9 | |
HUMAN_LABEL = 0 | |
MACHINE_LABEL = 1 | |
BART = "bart" | |
MULTIMODEL = "multimodel" | |
SINGLE_FROM_MULTIMODEL = "single_from_multimodel" | |
# Environment setup | |
os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY | |
os.environ['CURL_CA_BUNDLE'] = '' | |
os.environ['REQUESTS_CA_BUNDLE'] = '' | |
# Download necessary NLTK data | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
# Chat model configurations | |
chat_model = ChatOpenAI(temperature=TEMPERATURE, model_name="gpt-3.5-turbo-0125") | |
# API Models and Paths | |
CHATGPT = "ChatGPT" | |
GEMINI = "Gemini" | |
# LLAMA_2_70_CHAT_TEMP_0 = "LLaMa" | |
API_ERROR = "API_ERROR" | |
IGNORE_BY_API_ERROR = "IGNORE_BY_API_ERROR" | |
# Initialize BARTScorer | |
bart_scorer = BARTScorer(device='cuda:0', checkpoint="facebook/bart-large-cnn") | |
# Generative AI configuration | |
genai.configure(api_key=GEMINI_API_KEY, transport='rest') | |
generation_config = { | |
"temperature": TEMPERATURE, | |
} | |
GEMINI_MODEL = genai.GenerativeModel('gemini-pro', generation_config=generation_config) | |
# Model paths | |
MODEL_PATHS = { | |
"LLaMa": "meta-llama/Llama-2-70b-chat-hf", | |
"QWEN": "Qwen/Qwen1.5-72B-Chat", | |
"Yi": "NousResearch/Nous-Hermes-2-Yi-34B", | |
"Mixtral": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"OLMo": "allenai/OLMo-7B-Instruct", | |
"Phi": "microsoft/phi-2", | |
"OpenChat": "openchat/openchat-3.5-1210", | |
"WizardLM": "WizardLM/WizardLM-13B-V1.2", | |
"Vicuna": "lmsys/vicuna-13b-v1.5" | |
} | |
TOGETHER_PATH ='https://api.together.xyz' | |
# Roberta model configurations | |
ROBERTA_BASE = "roberta-base" | |
ROBERTA_LARGE = "roberta-large" | |
ROBERTA_MODEL_PATHS = { | |
ROBERTA_BASE: "roberta-base", | |
ROBERTA_LARGE: "roberta-large" | |
} | |
LEARNING_RATES = { | |
ROBERTA_BASE: 2e-5, | |
ROBERTA_LARGE: 8e-6 | |
} | |
MODEL_NAME = ROBERTA_BASE | |
# Tokenizer initialization | |
tokenizer = AutoTokenizer.from_pretrained(ROBERTA_MODEL_PATHS[MODEL_NAME]) | |
# Custom callback for Trainer | |
class CustomCallback(TrainerCallback): | |
""" | |
Custom callback to evaluate the training dataset at the end of each epoch. | |
""" | |
def __init__(self, trainer) -> None: | |
super().__init__() | |
self._trainer = trainer | |
def on_epoch_end(self, args, state, control, **kwargs): | |
""" | |
At the end of each epoch, evaluate the training dataset. | |
""" | |
if control.should_evaluate: | |
control_copy = deepcopy(control) | |
self._trainer.evaluate(eval_dataset=self._trainer.train_dataset, metric_key_prefix="train") | |
return control_copy | |
# Metric loading | |
metric = load_metric(METRIC_NAME) | |
def compute_metrics(evaluation_predictions): | |
""" | |
Function to compute evaluation metrics for model predictions. | |
Parameters: | |
evaluation_predictions (tuple): A tuple containing two elements: | |
- predictions (array-like): The raw prediction scores from the model. | |
- labels (array-like): The true labels for the evaluation data. | |
Returns: | |
dict: A dictionary containing the computed evaluation metrics. | |
""" | |
# Unpack predictions and labels from the input tuple | |
raw_predictions, true_labels = evaluation_predictions | |
# Convert raw prediction scores to predicted class labels | |
predicted_labels = np.argmax(raw_predictions, axis=1) | |
# Compute and return the evaluation metrics | |
return metric.compute(prediction_scores=predicted_labels, references=true_labels, average="macro") | |
def abstract_proofread(model_path, temperature, base_url, api_key, prompt): | |
""" | |
Function to proofread an abstract using an AI language model. | |
Parameters: | |
model_path (str): The path or identifier of the AI model to use. | |
temperature (float): Sampling temperature for the model's output. | |
base_url (str): The base URL for the API endpoint. | |
api_key (str): The API key for authentication. | |
prompt (str): The text prompt to provide to the AI for proofreading. | |
Returns: | |
str: The proofread abstract generated by the AI model. | |
""" | |
# Initialize the AI client with the provided API key and base URL | |
ai_client = OpenAI(api_key=api_key, base_url=base_url) | |
# Create a chat completion request with the system message and user prompt | |
chat_completion = ai_client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "system", | |
"content": "You are an AI assistant", | |
}, | |
{ | |
"role": "user", | |
"content": prompt, | |
} | |
], | |
model=model_path, | |
max_tokens=1024, | |
temperature=temperature, | |
) | |
# Return the content of the first choice's message | |
return chat_completion.choices[0].message.content | |
def proofread_by_model_name(model_name, input_text, normalize_output): | |
""" | |
Proofreads the given input text using the specified model. | |
Args: | |
model_name (str): The name of the model to use for proofreading. | |
input_text (str): The text to be proofread. | |
normalize_output (bool): Whether to normalize the output or not. | |
Returns: | |
str: The proofread text. | |
""" | |
# Constants for API access | |
base_url = TOGETHER_PATH | |
api_key = TOGETHER_API_KEY | |
temperature = TEMPERATURE | |
# Retrieve the model path from the dictionary | |
if model_name in MODEL_PATHS: | |
model_path = MODEL_PATHS[model_name] | |
else: | |
raise ValueError("Model name not found in the dictionary.") | |
# Formulate the prompt for the model | |
prompt = f"Proofreading for the text: ```{input_text}```" | |
# Apply output normalization if required | |
if normalize_output: | |
prompt = output_normalization(prompt) | |
# Debugging: Print the prompt | |
print(f"Prompt: {prompt}") | |
# Call the abstract proofreading function with the prepared parameters | |
return abstract_proofread(model_path, temperature, base_url, api_key, prompt) | |
def gemini_proofread(input_text, normalize_output): | |
""" | |
Proofreads the given text using the GEMINI_MODEL. | |
Parameters: | |
input_text (str): The text to be proofread. | |
normalize_output (bool): Flag indicating whether to normalize the output. | |
Returns: | |
str: The proofread text. | |
""" | |
prompt = f"Proofreading for the text: ```{input_text}```" | |
if normalize_output: | |
prompt = output_normalization(prompt) | |
response = GEMINI_MODEL.generate_content(prompt) | |
return response.text | |
def print_and_log(message): | |
""" | |
Prints and logs the given message to a log file. | |
Parameters: | |
message (str): The message to be printed and logged. | |
""" | |
print(message) | |
with open(LOG_FILE, "a+", encoding='utf-8') as log_file: | |
log_file.write(message + "\n") | |
def write_to_file(filename, content): | |
""" | |
Writes the given content to a specified file. | |
Parameters: | |
filename (str): The name of the file to write to. | |
content (str): The content to be written. | |
""" | |
print(content) | |
with open(filename, "a+", encoding='utf-8') as file: | |
file.write(content) | |
def output_normalization(prompt): | |
""" | |
Normalizes the output by appending a specific instruction to the prompt. | |
Parameters: | |
prompt (str): The initial prompt. | |
Returns: | |
str: The modified prompt. | |
""" | |
return prompt + " Please only output the proofread text without any explanation." | |
def chatGPT_proofread(input_text, normalize_output): | |
""" | |
Proofreads the given text using the chat_model. | |
Parameters: | |
input_text (str): The text to be proofread. | |
normalize_output (bool): Flag indicating whether to normalize the output. | |
Returns: | |
str: The proofread text. | |
""" | |
prompt = f"Proofreading for the text: ```{input_text}```" | |
if normalize_output: | |
prompt = output_normalization(prompt) | |
print(f"Starting API call with prompt: {prompt}") | |
result = chat_model.predict(prompt) | |
print(f"Ending API call with prompt: {prompt}") | |
return result | |
def normalize_text(input_text): | |
""" | |
Normalizes the given text by removing certain characters and extra spaces. | |
Parameters: | |
input_text (str): The text to be normalized. | |
Returns: | |
str: The normalized text. | |
""" | |
result = input_text.strip() | |
result = result.replace("**", "") | |
result = result.replace("\n", " ") | |
result = result.replace(" ", " ") # Remove extra spaces | |
return result | |
def write_to_csv(filename, row_data): | |
""" | |
Writes a row of data to a specified CSV file. | |
Parameters: | |
filename (str): The name of the CSV file. | |
row_data (list): The row data to be written. | |
""" | |
with open(filename, 'a+', encoding='UTF8', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow(row_data) | |
def number_of_csv_lines(filename): | |
""" | |
Returns the number of lines in a specified CSV file. | |
Parameters: | |
filename (str): The name of the CSV file. | |
Returns: | |
int: The number of lines in the CSV file. | |
""" | |
file_data = pd.read_csv(filename, sep=',').values | |
return len(file_data) | |
def read_csv_data(input_file): | |
""" | |
Reads data from a specified CSV file. | |
Parameters: | |
input_file (str): The name of the CSV file. | |
Returns: | |
numpy.ndarray: The data read from the CSV file. | |
""" | |
file_data = pd.read_csv(input_file, dtype='string', keep_default_na=False, sep=',').values | |
return file_data | |
def bart_score(text_1, text_2): | |
""" | |
Computes the BART score between two texts. | |
Parameters: | |
text_1 (str): The first text. | |
text_2 (str): The second text. | |
Returns: | |
float: The BART score. | |
""" | |
score = bart_scorer.score([text_1], [text_2]) | |
return score | |
def check_bart_score(input_text, raw_text): | |
""" | |
Checks if the BART score between input_text and raw_text is above a threshold. | |
Parameters: | |
input_text (str): The input text. | |
raw_text (str): The raw text to compare against. | |
Returns: | |
bool: True if the score is above the threshold, False otherwise. | |
""" | |
THRESHOLD = -2.459 | |
normalized_text = normalize_text(raw_text) | |
score = bart_score(input_text, normalized_text)[0] | |
return score >= THRESHOLD | |
def get_column(input_file, column_name): | |
""" | |
Retrieves a specific column from a CSV file. | |
Parameters: | |
input_file (str): The name of the CSV file. | |
column_name (str): The name of the column to retrieve. | |
Returns: | |
numpy.ndarray: The values from the specified column. | |
""" | |
df = pd.read_csv(input_file, dtype='string', keep_default_na=False, sep=',') | |
column_data = df[column_name] | |
return column_data.values | |
def generate_column_names(categories): | |
""" | |
Generates a list of column names based on given categories. | |
Parameters: | |
categories (list): The list of categories. | |
Returns: | |
list: The generated list of column names. | |
""" | |
column_names = ['human'] | |
for name in categories: | |
column_names.append(name) | |
for first in categories: | |
for second in categories: | |
column_names.append(f"{first}_{second}") | |
return column_names | |
def write_new_data(output_file, current_data, column_names): | |
""" | |
Writes new data to a CSV file based on current data and column names. | |
Parameters: | |
output_file (str): The name of the output CSV file. | |
current_data (dict): The current data to be written. | |
column_names (list): The list of column names. | |
""" | |
data_row = [current_data[column] for column in column_names] | |
write_to_csv(output_file, data_row) | |
def refine(input_text, candidate): | |
""" | |
Refines the candidate string by removing specific surrounding marks if they are present | |
in the input_text with a count difference of exactly 2. | |
Args: | |
input_text (str): The original text. | |
candidate (str): The candidate text to be refined. | |
Returns: | |
str: The refined candidate text. | |
""" | |
# Create a copy of the candidate string and strip whitespace | |
refined_candidate = candidate.strip() | |
# List of marks to check and potentially remove | |
marks = ["```", "'", '"'] | |
# Iterate through each mark | |
for mark in marks: | |
# Count occurrences of the mark in input_text and refined_candidate | |
count_input_text = input_text.count(mark) | |
count_refined_candidate = refined_candidate.count(mark) | |
# Check if the mark should be stripped | |
if (count_refined_candidate == count_input_text + 2 and | |
refined_candidate.startswith(mark) and | |
refined_candidate.endswith(mark)): | |
# Strip the mark from both ends of the refined_candidate | |
refined_candidate = refined_candidate.strip(mark) | |
return refined_candidate | |
def extract_by_best_similarity(input_text, raw_text): | |
""" | |
Extracts the best candidate string from the raw text based on the highest similarity score | |
compared to the input text. The similarity score is calculated using the BART score. | |
Args: | |
input_text (str): The original text. | |
raw_text (str): The raw text containing multiple candidate strings. | |
Returns: | |
str: The best candidate string with the highest similarity score. | |
Returns the input text if no suitable candidate is found. | |
""" | |
# Refine the raw text | |
refined_raw_text = refine(input_text, raw_text) | |
# Tokenize the refined raw text into sentences | |
raw_candidates = nltk.sent_tokenize(refined_raw_text) | |
# Split sentences further by newlines to get individual candidates | |
candidate_list = [] | |
for sentence in raw_candidates: | |
candidate_list.extend(sentence.split("\n")) | |
# Initialize variables to track the best similarity score and the best candidate | |
best_similarity = -9999 | |
best_candidate = "" | |
# Iterate over each candidate to find the best one based on the BART score | |
for candidate in candidate_list: | |
refined_candidate = refine(input_text, candidate) | |
if check_bart_score(input_text, refined_candidate): | |
score = bart_score(input_text, refined_candidate)[0] | |
if score > best_similarity: | |
best_similarity = score | |
best_candidate = refined_candidate | |
# Print the best candidate found | |
print(f"best_candidate = {best_candidate}") | |
# Return the best candidate if found, otherwise return the input text | |
if best_candidate == "": | |
return input_text | |
return best_candidate | |
def proofread_with_best_similarity(input_text, model_kind): | |
""" | |
Proofreads the input text using the specified model and extracts the best-corrected text based on similarity. | |
Args: | |
input_text (str): The original text to be proofread. | |
model_kind (str): The kind of model to use for proofreading (e.g., CHATGPT, GEMINI). | |
Returns: | |
tuple: A tuple containing the raw proofread text and the best-corrected text. | |
""" | |
# Normalize the input text | |
normalized_input_text = normalize_text(input_text) | |
print_and_log(f"INPUT = {normalized_input_text}") | |
result_text = "" | |
raw_text = "" | |
for i in range(1): # Loop is redundant as it runs only once; consider removing if unnecessary | |
# Select the proofreading model based on model_kind | |
if model_kind == CHATGPT: | |
raw_text = chatGPT_proofread(normalized_input_text, normalize_output=IS_OUTPUT_NORMALIZATION) | |
elif model_kind == GEMINI: | |
raw_text = gemini_proofread(normalized_input_text, normalize_output=IS_OUTPUT_NORMALIZATION) | |
else: | |
raw_text = proofread_by_model_name(model_kind, normalized_input_text, normalize_output=IS_OUTPUT_NORMALIZATION) | |
# Extract the best candidate text based on similarity | |
result_text = extract_by_best_similarity(normalized_input_text, raw_text) | |
# Log the raw and result texts | |
print_and_log(f"RAW_{i} = {raw_text}") | |
print_and_log(f"RESULT_{i} = {result_text}") | |
# Normalize the result text | |
result_text = normalize_text(result_text) | |
# If a valid result is obtained, return it | |
if result_text != "": | |
return raw_text, result_text | |
# Return the raw and result texts | |
return raw_text, result_text | |
def generate_file_name(existing_data_file, existing_kinds, new_kinds): | |
""" | |
Generates a new file name based on the path of an existing data file and a combination of existing and new kinds. | |
Args: | |
existing_data_file (str): The path to the existing data file. | |
existing_kinds (list): A list of existing kinds. | |
new_kinds (list): A list of new kinds. | |
Returns: | |
str: The generated file name with the full path. | |
""" | |
# Combine existing and new kinds into a single list | |
combined_kinds = existing_kinds + new_kinds | |
# Get the directory path of the existing data file | |
directory_path = os.path.dirname(existing_data_file) | |
# Create a new file name by joining the kinds with underscores and adding a suffix | |
new_file_name = "_".join(combined_kinds) + "_with_best_similarity.csv" | |
# Combine the directory path with the new file name to get the full output file path | |
output_file_path = os.path.join(directory_path, new_file_name) | |
return output_file_path | |
def generate_new_data_with_best_similarity(existing_data_file, existing_kinds, new_kinds): | |
""" | |
Generates new data with the best similarity based on existing and new kinds, and writes the results to a CSV file. | |
Args: | |
existing_data_file (str): The path to the existing data file. | |
existing_kinds (list): A list of existing kinds. | |
new_kinds (list): A list of new kinds. | |
Returns: | |
None | |
""" | |
# Combine existing and new kinds into a single list | |
all_kinds = existing_kinds + new_kinds | |
# Generate column names for the CSV file | |
column_names = generate_column_names(all_kinds) | |
# Generate column names for existing kinds | |
existing_column_names = generate_column_names(existing_kinds) | |
# Generate the output file name | |
output_file = generate_file_name(existing_data_file, existing_kinds, new_kinds) | |
# Create the output file with column names if it doesn't exist | |
if not os.path.exists(output_file): | |
write_to_csv(output_file, column_names) | |
# Read existing data from the file | |
existing_data = {kind: get_column(existing_data_file, kind) for kind in existing_column_names} | |
# Read input data from the output file | |
input_data = read_csv_data(output_file) | |
start_index = len(input_data) | |
print(f"start_index = {start_index}") | |
num_rows = len(existing_data["human"]) | |
global_generate_set = [] | |
global_reuse = [] | |
for index in range(start_index, num_rows): | |
# Initialize generation and reuse sets | |
generate_set = [] | |
reuse_set = [] | |
# Prepare the current generation dictionary | |
current_generation = {kind: existing_data[kind][index] for kind in existing_column_names} | |
print(f"current_generation before generation = {current_generation}") | |
human_text = current_generation["human"] | |
# Generate new kinds based on human text | |
for kind in new_kinds: | |
_, generated_text = proofread_with_best_similarity(human_text, kind) | |
current_generation[kind] = generated_text | |
generate_set.append(kind) | |
print(f"current_generation after generate one = {current_generation}") | |
# Generate combinations of kinds | |
for first_kind in all_kinds: | |
for second_kind in all_kinds: | |
combination_name = f"{first_kind}_{second_kind}" | |
if combination_name not in current_generation: | |
if first_kind in current_generation and current_generation[first_kind] == human_text: | |
generated_text = current_generation[second_kind] | |
reuse_set.append(f"{combination_name} from {second_kind}") | |
else: | |
is_need_generation = True | |
for first_kind_2 in all_kinds: | |
if first_kind != first_kind_2 and current_generation[first_kind] == current_generation[first_kind_2]: | |
combination_name_2 = f"{first_kind_2}_{second_kind}" | |
if combination_name_2 in current_generation: | |
generated_text = current_generation[combination_name_2] | |
reuse_set.append(f"{combination_name} from {combination_name_2}") | |
is_need_generation = False | |
break | |
if is_need_generation: | |
_, generated_text = proofread_with_best_similarity(current_generation[first_kind], second_kind) | |
generate_set.append(f"{first_kind}_{second_kind}") | |
current_generation[combination_name] = generated_text | |
# Write the current generation to the output file | |
write_new_data(output_file, current_generation, column_names) | |
# Update global sets | |
global_generate_set.append(generate_set) | |
global_reuse | |
def shuffle(array, seed): | |
""" | |
Shuffles the elements of each sublist in the given array using the specified seed. | |
Args: | |
array (list of lists): The array containing sublists to shuffle. | |
seed (int): The seed value for the random number generator. | |
Returns: | |
None | |
""" | |
for sublist in array: | |
random.Random(seed).shuffle(sublist) | |
def generate_human_with_shuffle(dataset_name, column_name, num_samples, output_file): | |
""" | |
Generates a shuffled list of sentences from the dataset and writes them to a CSV file. | |
Args: | |
dataset_name (str): The name of the dataset to load. | |
column_name (str): The column name to extract sentences from. | |
num_samples (int): The number of samples to process. | |
output_file (str): The path to the output CSV file. | |
Returns: | |
None | |
""" | |
# Load the dataset | |
dataset = load_dataset(dataset_name) | |
data = dataset['train'] | |
lines = [] | |
# Tokenize sentences and add to the lines list | |
for sample in data: | |
nltk_tokens = nltk.sent_tokenize(sample[column_name]) | |
lines.extend(nltk_tokens) | |
# Filter out empty lines | |
filtered_lines = [line for line in lines if line != ""] | |
lines = filtered_lines | |
# Shuffle the lines | |
shuffle([lines], seed=SEED) | |
# Ensure the output file exists and write the header if it doesn't | |
if not os.path.exists(output_file): | |
header = ["human"] | |
write_to_csv(output_file, header) | |
# Get the number of lines already processed in the output file | |
number_of_processed_lines = number_of_csv_lines(output_file) | |
# Print the initial lines to be processed | |
print(f"Lines before processing: {lines[:num_samples]}") | |
# Slice the lines list to get the unprocessed lines | |
lines = lines[number_of_processed_lines:num_samples] | |
# Print the lines after slicing | |
print(f"Lines after slicing: {lines}") | |
# Process each line and write to the output file | |
for index, human in enumerate(lines): | |
normalized_text = normalize_text(human) | |
output_data = [normalized_text] | |
write_to_csv(output_file, output_data) | |
print(f"Processed {index + 1} / {len(lines)}; Total processed: {number_of_processed_lines + index + 1} / {num_samples}") | |
def split(data, ratio): | |
""" | |
Splits the data into training and testing sets based on the given ratio. | |
Args: | |
data (list): The dataset to split. | |
ratio (float): The ratio for splitting the data into training and testing sets. | |
Returns: | |
tuple: A tuple containing the training data and the testing data. | |
""" | |
train_size = int(len(data) * ratio) | |
train_data = data[:train_size] | |
test_data = data[train_size:] | |
return train_data, test_data | |
def bart_score_in_batch(text_1, text_2): | |
""" | |
Calculates the BART score for pairs of texts in batches. | |
Args: | |
text_1 (list of str): The first list of texts. | |
text_2 (list of str): The second list of texts. | |
Returns: | |
list: A list of BART scores for each pair of texts. | |
""" | |
return bart_scorer.score(text_1, text_2, batch_size=BATCH_SIZE) | |
def extract_feature_in_batch(text_1, text_2, feature_kind): | |
""" | |
Extracts features for pairs of texts using BART scores. | |
Args: | |
text_1 (list of str): The first list of texts. | |
text_2 (list of str): The second list of texts. | |
feature_kind (str): The type of feature to extract. | |
Returns: | |
list: A list of extracted features. | |
""" | |
features = bart_score_in_batch(text_1, text_2) | |
return features | |
def abstract_train(features, labels): | |
""" | |
Trains a model using the given features and labels. | |
Args: | |
features (list): The input features for training. | |
labels (list): The target labels for training. | |
Returns: | |
object: The trained model. | |
""" | |
model = MLPClassifier() | |
model.fit(features, labels) | |
return model | |
def evaluate_model(model, features, labels): | |
""" | |
Evaluates the model's performance using accuracy and ROC AUC scores. | |
Args: | |
model (object): The trained model to evaluate. | |
features (list): The input features for evaluation. | |
labels (list): The target labels for evaluation. | |
Returns: | |
None | |
""" | |
predictions = model.predict(features) | |
rounded_predictions = [round(value) for value in predictions] | |
accuracy = accuracy_score(labels, rounded_predictions) | |
write_to_file(OUTPUT_FILE, f"Accuracy: {accuracy * 100.0:.1f}%\n") | |
roc_auc = roc_auc_score(labels, rounded_predictions) | |
write_to_file(OUTPUT_FILE, f"ROC AUC: {roc_auc * 100.0:.1f}%\n") | |
def combine_text_with_BERT_format(text_list): | |
""" | |
Combines a list of texts into a single string formatted for BERT input. | |
Args: | |
text_list (list of str): The list of texts to combine. | |
Returns: | |
str: The combined text string formatted for BERT input. | |
""" | |
combined_text = f"<s>{text_list[0]}</s>" | |
for i in range(1, len(text_list)): | |
combined_text += f"</s>{text_list[i]}</s>" | |
return combined_text | |
def preprocess_function_multimodel(sample): | |
""" | |
Preprocesses a given sample for a multi-model setup by calculating BART scores | |
and formatting the text for BERT input. | |
Args: | |
sample (dict): A dictionary containing a key "text", which is a list of lists of strings. | |
Returns: | |
dict: A dictionary containing tokenized and preprocessed text data. | |
""" | |
num_texts = len(sample["text"][0]) # Number of texts in each sub-sample | |
texts_grouped_by_index = [[] for _ in range(num_texts)] # Initialize empty lists for grouping texts by index | |
# Group texts by their index across sub-samples | |
for sub_sample in sample["text"]: | |
for i in range(num_texts): | |
texts_grouped_by_index[i].append(sub_sample[i]) | |
# Calculate BART scores for each text pair (text[0] with text[i]) | |
bart_scores = [bart_score_in_batch(texts_grouped_by_index[0], texts_grouped_by_index[i]) for i in range(1, num_texts)] | |
combined_texts = [] | |
# Process each sub-sample for BERT input | |
for index, sub_sample in enumerate(sample["text"]): | |
text_array = [sub_sample[0]] # Start with the input text | |
score_generation_pairs = [] | |
# Pair scores with their corresponding generations | |
for i in range(1, num_texts): | |
generation_text = sub_sample[i] | |
generation_score = bart_scores[i-1][index] | |
score_generation_pairs.append((generation_score, generation_text)) | |
# Sort pairs by score in descending order | |
sorted_pairs = sorted(score_generation_pairs, reverse=True) | |
# Append sorted texts to text_array | |
for _, sorted_text in sorted_pairs: | |
text_array.append(sorted_text) | |
# Combine texts into a single BERT-formatted string | |
combined_text = combine_text_with_BERT_format(text_array) | |
combined_texts.append(combined_text) | |
# Tokenize the combined texts for BERT | |
return tokenizer(combined_texts, add_special_tokens=False, truncation=True) | |
def preprocess_function_single_from_multimodel(sample): | |
""" | |
Extracts the first text from each sub-sample in a multi-model sample and tokenizes it. | |
Args: | |
sample (dict): A dictionary containing a key "text", which is a list of lists of strings. | |
Returns: | |
dict: A dictionary containing tokenized text data. | |
""" | |
combined_texts = [] | |
# Iterate through each sub-sample | |
for sub_sample in sample["text"]: | |
input_text = sub_sample[0] # Extract the first text from the sub-sample | |
combined_texts.append(input_text) # Append it to the list of combined texts | |
# Tokenize the combined texts | |
return tokenizer(combined_texts, truncation=True) | |
def check_api_error(data): | |
""" | |
Checks if any item in the provided data indicates an API error. | |
Args: | |
data (list): A list of items to be checked for API errors. | |
Returns: | |
bool: True if an API error or ignore by API error is found, otherwise False. | |
""" | |
for item in data: | |
if item == API_ERROR or item == IGNORE_BY_API_ERROR: # Check for API error indicators | |
return True # Return True if an error indicator is found | |
return False # Return False if no error indicators are found | |
def train_only_by_transformer_with_test_evaluation_early_stop(train_data, test_data, input_type, num_classes=2): | |
""" | |
Trains a transformer model using the provided training and testing datasets with early stopping. | |
Args: | |
train_data (Dataset): The training dataset. | |
test_data (Dataset): The testing dataset. | |
input_type (str): The type of input data, either MULTIMODEL or SINGLE_FROM_MULTIMODEL. | |
num_classes (int, optional): The number of classes for classification. Defaults to 2. | |
Returns: | |
Trainer: The trained model wrapped in a Trainer object. | |
""" | |
# Preprocess datasets based on the input type | |
if input_type == MULTIMODEL: | |
train_data = train_data.map(preprocess_function_multimodel, batched=True) | |
test_data = test_data.map(preprocess_function_multimodel, batched=True) | |
elif input_type == SINGLE_FROM_MULTIMODEL: | |
train_data = train_data.map(preprocess_function_single_from_multimodel, batched=True) | |
test_data = test_data.map(preprocess_function_single_from_multimodel, batched=True) | |
# Data collator to pad inputs | |
data_collator = DataCollatorWithPadding(tokenizer=tokenizer) | |
# Load appropriate model based on number of classes | |
if num_classes == 3: | |
model = AutoModelForSequenceClassification.from_pretrained( | |
"pretrained_model/roberta-base_num_labels_3", num_labels=num_classes) | |
else: | |
model = AutoModelForSequenceClassification.from_pretrained( | |
ROBERTA_MODEL_PATHS[MODEL_NAME], num_labels=num_classes) | |
learning_rate = LEARNING_RATES[MODEL_NAME] | |
output_folder = "training_with_callbacks" | |
# Remove the output folder if it already exists | |
if os.path.exists(output_folder): | |
shutil.rmtree(output_folder) | |
# Training arguments | |
training_args = TrainingArguments( | |
output_dir=output_folder, | |
evaluation_strategy="epoch", | |
logging_strategy="epoch", | |
save_strategy="epoch", | |
learning_rate=learning_rate, | |
per_device_train_batch_size=BATCH_SIZE, | |
per_device_eval_batch_size=BATCH_SIZE, | |
num_train_epochs=NUMBER_OF_MAX_EPOCH_WITH_EARLY_STOPPING, | |
weight_decay=0.01, | |
push_to_hub=False, | |
metric_for_best_model=OPTIMIZED_METRIC, | |
load_best_model_at_end=True | |
) | |
# Create Trainer object | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=train_data, | |
eval_dataset=test_data, | |
tokenizer=tokenizer, | |
data_collator=data_collator, | |
compute_metrics=compute_metrics, | |
callbacks=[EarlyStoppingCallback(early_stopping_patience=PATIENCE)] | |
) | |
# Add custom callback | |
trainer.add_callback(CustomCallback(trainer)) | |
# Start training | |
trainer.train() | |
return trainer | |
def calculate_number_of_models(num_columns): | |
""" | |
Calculates the number of models required based on the number of columns. | |
Args: | |
num_columns (int): The total number of columns. | |
Returns: | |
int: The number of models required. | |
Raises: | |
Exception: If the number of models cannot be calculated to match the number of columns. | |
""" | |
num_models = 0 | |
count_human = 1 # Initial count representing human input | |
while True: | |
count_single = num_models # Single model count | |
count_pair = num_models * num_models # Pair model count | |
total_count = count_human + count_single + count_pair | |
if total_count == num_columns: | |
return num_models | |
elif total_count > num_columns: | |
raise Exception("Cannot calculate the number of models to match the number of columns") | |
num_models += 1 | |
def read_multimodel_data_from_csv(multimodel_csv_file): | |
""" | |
Reads multimodel data from a CSV file and organizes it into a structured format. | |
Args: | |
multimodel_csv_file (str): Path to the CSV file containing multimodel data. | |
Returns: | |
list: A list of dictionaries, each containing 'human', 'single', and 'pair' data. | |
Raises: | |
Exception: If there is an error in reading the CSV file or processing the data. | |
""" | |
# Read CSV data into a list of lists | |
input_data = read_csv_data(multimodel_csv_file) | |
# Initialize the result list | |
structured_data = [] | |
# Calculate the number of models based on the number of columns in the first row | |
num_models = calculate_number_of_models(len(input_data[0])) | |
# Process each row in the input data | |
for row in input_data: | |
row_data = {} | |
index = 0 | |
# Extract human data | |
row_data["human"] = row[index] | |
index += 1 | |
# Extract single model data | |
single_model_data = [] | |
for _ in range(num_models): | |
single_model_data.append(row[index]) | |
index += 1 | |
row_data["single"] = single_model_data | |
# Extract pair model data | |
pair_model_data = [] | |
for _ in range(num_models): | |
sub_pair_data = [] | |
for _ in range(num_models): | |
sub_pair_data.append(row[index]) | |
index += 1 | |
pair_model_data.append(sub_pair_data) | |
row_data["pair"] = pair_model_data | |
# Append the structured row data to the result list | |
structured_data.append(row_data) | |
return structured_data | |
def check_error(data_item): | |
""" | |
Checks for errors in a data item by verifying the 'human', 'single', and 'pair' fields. | |
Args: | |
data_item (dict): A dictionary containing 'human', 'single', and 'pair' data. | |
Returns: | |
bool: True if any of the fields contain an error, otherwise False. | |
""" | |
# Check for API error in the 'human' field | |
if check_api_error(data_item["human"]): | |
return True | |
# Check for API error in the 'single' model data | |
for single_text in data_item["single"]: | |
if check_api_error(single_text): | |
return True | |
# Get the number of models from the 'single' model data | |
num_models = len(data_item["single"]) | |
# Check for API error in the 'pair' model data | |
for i in range(num_models): | |
for j in range(num_models): | |
if check_api_error(data_item["pair"][i][j]): | |
return True | |
# No errors found | |
return False | |
def create_pair_sample(data_item, training_indices): | |
""" | |
Creates pair samples for training by comparing human data with machine-generated data. | |
Args: | |
data_item (dict): A dictionary containing 'human', 'single', and 'pair' data. | |
training_indices (list): A list of indices used for training. | |
Returns: | |
list: A list of dictionaries, each containing a 'text' array and a 'label'. | |
""" | |
# Initialize the result list | |
result_samples = [] | |
# Check if there is any error in the data_item | |
if check_error(data_item): | |
return result_samples | |
print(training_indices) | |
print(data_item) | |
# Create machine samples | |
for train_idx in training_indices: | |
if data_item["human"] != data_item["single"][train_idx]: | |
text_array = [] | |
machine_text = data_item["single"][train_idx] | |
text_array.append(machine_text) | |
for sub_idx in training_indices: | |
text_array.append(data_item["pair"][train_idx][sub_idx]) | |
sample = { | |
"text": text_array, | |
"label": MACHINE_LABEL | |
} | |
result_samples.append(sample) | |
# Create human samples | |
text_array = [data_item["human"]] | |
for train_idx in training_indices: | |
text_array.append(data_item["single"][train_idx]) | |
human_sample = { | |
"text": text_array, | |
"label": HUMAN_LABEL | |
} | |
# Append human samples for each machine sample | |
num_machine_samples = len(result_samples) | |
for _ in range(num_machine_samples): | |
result_samples.append(human_sample) | |
return result_samples | |
def create_pair_test_sample(data_item, training_indices, testing_indices): | |
""" | |
Creates pair test samples by comparing human data with machine-generated data. | |
Args: | |
data_item (dict): A dictionary containing 'human', 'single', and 'pair' data. | |
training_indices (list): A list of indices used for training. | |
testing_indices (list): A list of indices used for testing. | |
Returns: | |
list: A list of dictionaries, each containing a 'text' array and a 'label'. | |
""" | |
# Initialize the result list | |
result_samples = [] | |
# Check if there is any error in the data_item | |
if check_error(data_item): | |
return result_samples | |
# Create machine samples based on testing indices | |
for test_idx in testing_indices: | |
if data_item["human"] != data_item["single"][test_idx]: | |
text_array = [] | |
machine_text = data_item["single"][test_idx] | |
text_array.append(machine_text) | |
for train_idx in training_indices: | |
text_array.append(data_item["pair"][test_idx][train_idx]) | |
sample = { | |
"text": text_array, | |
"label": MACHINE_LABEL | |
} | |
result_samples.append(sample) | |
# Create human sample | |
text_array = [data_item["human"]] | |
for train_idx in training_indices: | |
text_array.append(data_item["single"][train_idx]) | |
human_sample = { | |
"text": text_array, | |
"label": HUMAN_LABEL | |
} | |
# Append the human sample for each machine sample | |
num_machine_samples = len(result_samples) | |
for _ in range(num_machine_samples): | |
result_samples.append(human_sample) | |
return result_samples | |
def create_train_val_sample(data, training_indices): | |
""" | |
Creates training and validation samples from the provided data. | |
Args: | |
data (list): A list of data items, each to be processed. | |
training_indices (list): A list of indices used for training. | |
Returns: | |
list: A list of training and validation samples created from the data. | |
""" | |
# Initialize the result list | |
result_samples = [] | |
# Process each item in the data | |
for data_item in data: | |
# Create pair samples for the current item | |
sub_samples = create_pair_sample(data_item, training_indices) | |
# Extend the result list with the created sub-samples | |
result_samples.extend(sub_samples) | |
return result_samples | |
def create_test_sample(data, training_indices, testing_indices): | |
""" | |
Creates test samples from the provided data by comparing human data with machine-generated data. | |
Args: | |
data (list): A list of data items, each to be processed. | |
training_indices (list): A list of indices used for training. | |
testing_indices (list): A list of indices used for testing. | |
Returns: | |
list: A list of test samples created from the data. | |
""" | |
# Initialize the result list | |
result_samples = [] | |
# Process each item in the data | |
for data_item in data: | |
# Create pair test samples for the current item | |
sub_samples = create_pair_test_sample(data_item, training_indices, testing_indices) | |
# Extend the result list with the created sub-samples | |
result_samples.extend(sub_samples) | |
return result_samples | |
def distribute_data(data, train_indices, test_indices, train_ratio, val_ratio): | |
""" | |
Distributes the data into training, validation, and test samples. | |
Args: | |
data (list): A list of data items to be split and processed. | |
train_indices (list): A list of indices used for training. | |
test_indices (list): A list of indices used for testing. | |
train_ratio (float): The ratio of data to be used for training. | |
val_ratio (float): The ratio of data to be used for validation. | |
Returns: | |
tuple: A tuple containing lists of training, validation, and test samples. | |
""" | |
# Split the data into training, validation, and test sets | |
train_data, val_data, test_data = split_train_val_test(data, train_ratio, val_ratio) | |
# Create training samples | |
train_samples = create_train_val_sample(train_data, train_indices) | |
write_to_file(OUTPUT_FILE, f"train samples = {len(train_samples)}\n") | |
# Create validation samples | |
val_samples = create_train_val_sample(val_data, train_indices) | |
write_to_file(OUTPUT_FILE, f"val samples = {len(val_samples)}\n") | |
# Create test samples | |
test_samples = create_test_sample(test_data, train_indices, test_indices) | |
write_to_file(OUTPUT_FILE, f"test samples = {len(test_samples)}\n") | |
return train_samples, val_samples, test_samples | |
def convert_to_huggingface_with_multimodel(samples): | |
""" | |
Converts a list of samples to the Hugging Face Dataset format. | |
Args: | |
samples (list): A list of samples to be converted. | |
Returns: | |
Dataset: A Hugging Face Dataset object created from the samples. | |
""" | |
return Dataset.from_list(samples) | |
def train_by_transformer_with_multimodel_and_early_stop(train_samples, val_samples, input_type): | |
""" | |
Trains a transformer model with multimodal data and early stopping. | |
Args: | |
train_samples (list): A list of training samples. | |
val_samples (list): A list of validation samples. | |
input_type (str): The type of input data (e.g., multimodal). | |
Returns: | |
object: The trained model with early stopping. | |
""" | |
# Convert training and validation samples to Hugging Face Dataset format | |
train_data = convert_to_huggingface_with_multimodel(train_samples) | |
val_data = convert_to_huggingface_with_multimodel(val_samples) | |
# Train the model with early stopping and return the trained model | |
return train_only_by_transformer_with_test_evaluation_early_stop(train_data, val_data, input_type) | |
def test_by_transformer_with_multimodel(detector, test_samples, input_type): | |
""" | |
Tests a trained transformer model with multimodal data. | |
Args: | |
detector (object): The trained model to be evaluated. | |
test_samples (list): A list of test samples. | |
input_type (str): The type of input data (e.g., multimodal). | |
Returns: | |
None | |
""" | |
# Convert test samples to Hugging Face Dataset format | |
test_data = convert_to_huggingface_with_multimodel(test_samples) | |
# Apply the appropriate preprocessing function based on the input type | |
if input_type == MULTIMODEL: | |
test_data = test_data.map(preprocess_function_multimodel, batched=True) | |
elif input_type == SINGLE_FROM_MULTIMODEL: | |
test_data = test_data.map(preprocess_function_single_from_multimodel, batched=True) | |
print("Test data:", test_data) | |
# Evaluate the model on the test data | |
result = detector.evaluate(eval_dataset=test_data) | |
print("Test result:", result) | |
# Extract and log the ROC AUC score | |
roc_auc = result['eval_roc_auc'] | |
write_to_file(OUTPUT_FILE, "roc_auc: %.1f%%" % (roc_auc * 100.0) + "\n") | |
def extract_by_feature_kind(samples, feature_type): | |
""" | |
Extracts features from the given samples based on the specified feature type. | |
Args: | |
samples (list): A list of samples where each sample is a dictionary with 'text' and 'label' keys. | |
feature_type (str): The type of feature to extract. | |
Returns: | |
tuple: A tuple containing the extracted features and corresponding labels. | |
""" | |
text_1_list = [] | |
text_2_list = [] | |
labels = [] | |
for sample in samples: | |
text_1_list.append(sample["text"][0]) | |
text_2_list.append(sample["text"][1]) | |
labels.append(sample["label"]) | |
# Extract features in batch based on the feature type | |
features = extract_feature_in_batch(text_1_list, text_2_list, feature_type) | |
return features, labels | |
def train_by_feature_kind(train_samples, feature_type): | |
""" | |
Trains a model using features extracted from the training samples based on the specified feature type. | |
Args: | |
train_samples (list): A list of training samples where each sample is a dictionary with 'text' and 'label' keys. | |
feature_type (str): The type of feature to extract for training. | |
Returns: | |
object: The trained model. | |
""" | |
# Extract features and labels from the training samples | |
features, labels = extract_by_feature_kind(train_samples, feature_type) | |
# Convert features to a numpy array and reshape for training | |
features = np.array(features) | |
features = features.reshape(-1, 1) | |
# Train the model using the extracted features and labels | |
model = abstract_train(features, labels) | |
return model | |
def test_by_feature_kind(detector, samples, feature_type): | |
""" | |
Tests a detector using features extracted from the provided samples based on the specified feature type. | |
Args: | |
detector (object): The detector model to be evaluated. | |
samples (list): A list of samples where each sample is a dictionary with 'text' and 'label' keys. | |
feature_type (str): The type of feature to extract for testing. | |
Returns: | |
None | |
""" | |
# Extract features and labels from the samples | |
features, labels = extract_by_feature_kind(samples, feature_type) | |
# Convert features to a numpy array and reshape for evaluation | |
features = np.array(features) | |
features = features.reshape(-1, 1) | |
# Evaluate the detector model using the extracted features and labels | |
evaluate_model(detector, features, labels) | |
def general_process_multimodels_train_val_test(train_samples, val_samples, test_samples): | |
""" | |
General process for training, validating, and testing models using multi-model and feature kind approaches. | |
Args: | |
train_samples (list): Training samples. | |
val_samples (list): Validation samples. | |
test_samples (list): Test samples. | |
Returns: | |
None | |
""" | |
# Multi-model approach | |
input_kind = MULTIMODEL | |
write_to_file(OUTPUT_FILE, f"\nInput kind = {input_kind} \n") | |
# Train detector using multi-model with early stopping | |
detector = train_by_transformer_with_multimodel_and_early_stop(train_samples, val_samples, input_kind) | |
detector.save_model("./models/multi_model_detector") | |
# Evaluate on train set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON TRAIN SET \n") | |
test_by_transformer_with_multimodel(detector, train_samples, input_kind) | |
# Evaluate on validation set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON VALIDATION SET \n") | |
test_by_transformer_with_multimodel(detector, val_samples, input_kind) | |
# Evaluate on test set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON TEST SET \n") | |
test_by_transformer_with_multimodel(detector, test_samples, input_kind) | |
# Single from multi-model approach | |
input_kind = SINGLE_FROM_MULTIMODEL | |
write_to_file(OUTPUT_FILE, f"\nInput kind = {input_kind} \n") | |
# Train detector using single from multi-model with early stopping | |
detector = train_by_transformer_with_multimodel_and_early_stop(train_samples, val_samples, input_kind) | |
detector.save_model("./models/single_model_detector_1") | |
# Evaluate on train set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON TRAIN SET \n") | |
test_by_transformer_with_multimodel(detector, train_samples, input_kind) | |
# Evaluate on validation set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON VALIDATION SET \n") | |
test_by_transformer_with_multimodel(detector, val_samples, input_kind) | |
# Evaluate on test set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON TEST SET \n") | |
test_by_transformer_with_multimodel(detector, test_samples, input_kind) | |
# Feature kind approach | |
sample_length = len(train_samples[0]["text"]) | |
if sample_length == 2: # Check if the sample length is 2, indicating BART feature kind | |
feature_kind = BART | |
write_to_file(OUTPUT_FILE, f"\nFeature kind = {feature_kind} \n") | |
# Train detector using feature kind | |
detector = train_by_feature_kind(train_samples, feature_kind) | |
# Evaluate on train set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON TRAIN SET \n") | |
test_by_feature_kind(detector, train_samples, feature_kind) | |
# Evaluate on validation set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON VALIDATION SET \n") | |
test_by_feature_kind(detector, val_samples, feature_kind) | |
# Evaluate on test set | |
write_to_file(OUTPUT_FILE, f"EVALUATE ON TEST SET \n") | |
test_by_feature_kind(detector, test_samples, feature_kind) | |
def process_multi_models_with_validation(multimodel_csv_file, train_indices, test_indices, num_samples): | |
""" | |
Processes multi-model data with validation, training, and testing. | |
Args: | |
multimodel_csv_file (str): Path to the CSV file containing multi-model data. | |
train_indices (list): Indices for the training data. | |
test_indices (list): Indices for the testing data. | |
num_samples (int): Number of samples to process. | |
Returns: | |
None | |
""" | |
# Log the details of the process | |
write_to_file(OUTPUT_FILE, f"PROCESSING FILE={multimodel_csv_file} \n") | |
write_to_file(OUTPUT_FILE, f"EXPERIMENT WITH {MODEL_NAME} model \n") | |
write_to_file(OUTPUT_FILE, f"NUMBER OF MAX EPOCHS WITH EARLY STOPPING = {NUMBER_OF_MAX_EPOCH_WITH_EARLY_STOPPING} \n") | |
write_to_file(OUTPUT_FILE, f"PATIENCE = {PATIENCE} \n") | |
write_to_file(OUTPUT_FILE, f"OPTIMIZED METRIC = {OPTIMIZED_METRIC} \n") | |
write_to_file(OUTPUT_FILE, f"BATCH SIZE = {BATCH_SIZE} \n") | |
write_to_file(OUTPUT_FILE, f"Number of samples = {num_samples} \n") | |
# Read multi-model data from the CSV file | |
data = read_multimodel_data_from_csv(multimodel_csv_file) | |
# Limit data to the specified number of samples | |
data = data[:num_samples] | |
# Distribute data into training, validation, and testing sets | |
train_samples, val_samples, test_samples = distribute_data(data, train_indices, test_indices, TRAIN_RATIO, VAL_RATIO) | |
# Log the training and testing indices | |
write_to_file(OUTPUT_FILE, f"Multimodel training with train indices {train_indices}, test with test indices {test_indices} \n") | |
# Process the multi-models for training, validation, and testing | |
general_process_multimodels_train_val_test(train_samples, val_samples, test_samples) | |
def split_train_val_test(data, train_ratio, val_ratio): | |
""" | |
Splits the dataset into training, validation, and test sets based on specified ratios. | |
Args: | |
data (list): The dataset to be split. | |
train_ratio (float): The ratio of the dataset to be used for training. | |
val_ratio (float): The ratio of the dataset to be used for validation. | |
Returns: | |
tuple: A tuple containing three lists - (train_data, val_data, test_data). | |
""" | |
# Calculate the number of samples for the training set | |
num_train_samples = int(len(data) * train_ratio) | |
# Calculate the number of samples for the validation set | |
num_val_samples = int(len(data) * val_ratio) | |
# Split the data into training, validation, and test sets | |
train_data = data[:num_train_samples] | |
val_data = data[num_train_samples:(num_train_samples + num_val_samples)] | |
test_data = data[(num_train_samples + num_val_samples):] | |
return train_data, val_data, test_data | |
def main(): | |
""" | |
Main function to handle argument parsing and execute the sequence of operations | |
including data generation and processing with multiple models. | |
""" | |
parser = argparse.ArgumentParser(description='SimLLM.') | |
# Argument for specifying the list of large language models | |
parser.add_argument('--LLMs', nargs="+", default=[CHATGPT],#, "Yi", "OpenChat"], | |
help='List of large language models') | |
# Argument for specifying the list of training indexes | |
parser.add_argument('--train_indexes', type=int, default=[0,1,2], nargs="+", | |
help='List of training indexes') | |
# Argument for specifying the list of testing indexes | |
parser.add_argument('--test_indexes', type=int, default=[0], nargs="+", | |
help='List of testing indexes') | |
# Argument for specifying the number of samples | |
parser.add_argument('--num_samples', type=int, default=5000, | |
help='Number of samples') | |
# Argument for multimodel_csv_file | |
parser.add_argument('--multimodel_csv_file', type=str, default="data/ChatGPT_Nous_Hermes_2_Yi_34B_openchat_3_5_1210_with_best_similarity.csv", | |
help='multimodel_csv_file') | |
# Parse the command-line arguments | |
args = parser.parse_args() | |
if args.multimodel_csv_file == "": | |
# Static dataset parameters | |
dataset_name = "xsum" | |
column_name = "document" | |
num_samples = args.num_samples | |
output_file = "data/test.csv" | |
# Generate human data with shuffle | |
# generate_human_with_shuffle(dataset_name, column_name, num_samples, output_file) | |
# Existing data parameters | |
existing_data_file = output_file | |
existing_kinds = [] | |
# New kinds of models to generate data with | |
new_kinds = args.LLMs | |
# Generate new data with best similarity | |
generate_new_data_with_best_similarity(existing_data_file, existing_kinds, new_kinds) | |
# Generate a filename for the multimodel CSV file | |
multimodel_csv_file = generate_file_name(existing_data_file, existing_kinds, new_kinds) | |
else: | |
multimodel_csv_file = args.multimodel_csv_file | |
# Number of samples to process (-1 means process all samples) | |
num_samples_to_process = -1 | |
# Training and testing indexes from arguments | |
training_indexes = args.train_indexes | |
testing_indexes = args.test_indexes | |
# Process multiple models with validation | |
process_multi_models_with_validation(multimodel_csv_file, training_indexes, testing_indexes, num_samples_to_process) | |
if __name__ == "__main__": | |
main() | |