|
import os |
|
import argparse |
|
import re |
|
import torch |
|
import pandas as pd |
|
from tqdm import tqdm |
|
from thefuzz import process |
|
from transformers.trainer_utils import set_seed |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from transformers.generation import GenerationConfig |
|
|
|
''' |
|
wget https://people.eecs.berkeley.edu/~hendrycks/data.tar |
|
mkdir data/mmlu |
|
mv data.tar data/mmlu |
|
cd data/mmlu; tar xf data.tar |
|
cd ../../ |
|
|
|
pip install thefuzz |
|
python eval/evaluate_chat_mmlu.py -d data/mmlu/data/ |
|
''' |
|
|
|
def load_models_tokenizer(args): |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
args.checkpoint_path, trust_remote_code=True |
|
) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
args.checkpoint_path, |
|
device_map="auto", |
|
trust_remote_code=True, |
|
bf16=True, |
|
use_flash_attn=True, |
|
).eval() |
|
model.generation_config = GenerationConfig.from_pretrained( |
|
args.checkpoint_path, trust_remote_code=True |
|
) |
|
model.generation_config.do_sample = False |
|
return model, tokenizer |
|
|
|
|
|
def format_example(line): |
|
example = ( |
|
"The following is a multiple-choice question. Please choose the most suitable one among A, B, C and D as the answer to this question.\n\n" |
|
+ line["question"] |
|
+ "\n" |
|
) |
|
for choice in choices: |
|
example += f'{choice}. {line[f"{choice}"]}\n' |
|
return example |
|
|
|
|
|
def process_before_extraction(gen, choice_dict): |
|
|
|
|
|
for key, val in sorted(choice_dict.items(), key=lambda x: len(x[1]), reverse=True): |
|
pattern = re.compile(re.escape(val.rstrip(".")), re.IGNORECASE) |
|
gen = pattern.sub(key, gen) |
|
return gen |
|
|
|
|
|
def extract_choice(gen, choice_list): |
|
|
|
res = re.search( |
|
r"(?:(?:[Cc]hoose)|(?:(?:[Aa]nswer|[Cc]hoice)(?![^ABCD]{0,20}?(?:n't|not))[^ABCD]{0,10}?\b(?:|is|:|be))\b)[^ABCD]{0,20}?\b(A|B|C|D)\b", |
|
gen, |
|
) |
|
|
|
|
|
if res is None: |
|
res = re.search( |
|
r"\b(A|B|C|D)\b(?![^ABCD]{0,8}?(?:n't|not)[^ABCD]{0,5}?(?:correct|right))[^ABCD]{0,10}?\b(?:correct|right)\b", |
|
gen, |
|
) |
|
|
|
|
|
if res is None: |
|
res = re.search(r"^(A|B|C|D)(?:\.|,|:|$)", gen) |
|
|
|
|
|
if res is None: |
|
res = re.search(r"(?<![a-zA-Z])(A|B|C|D)(?![a-zA-Z=])", gen) |
|
|
|
if res is None: |
|
return choices[choice_list.index(process.extractOne(gen, choice_list)[0])] |
|
return res.group(1) |
|
|
|
|
|
def extract_answer(response, row): |
|
gen = process_before_extraction( |
|
response, {choice: row[choice] for choice in choices} |
|
) |
|
pred = extract_choice(gen, [row[choice] for choice in choices]) |
|
return pred |
|
|
|
|
|
@torch.no_grad() |
|
def eval_subject( |
|
model, |
|
tokenizer, |
|
subject_name, |
|
test_df, |
|
save_result_dir=None, |
|
overwrite=False, |
|
**kwargs |
|
): |
|
result_path = os.path.join(save_result_dir, f"{subject_name}_result.csv") |
|
if not overwrite and os.path.exists(result_path): |
|
print(f"{result_path} existed, skip!") |
|
score = [] |
|
for (_, datarow), (_, resultrow) in zip( |
|
test_df.iterrows(), pd.read_csv(result_path).iterrows() |
|
): |
|
|
|
pred = resultrow["model_output"] |
|
correct = 1 if pred == datarow["answer"] else 0 |
|
score.append(correct) |
|
return score |
|
|
|
result = [] |
|
score = [] |
|
|
|
for _, row in tqdm(test_df.iterrows(), total=len(test_df)): |
|
question = format_example(row) |
|
|
|
response, _ = model.chat( |
|
tokenizer, |
|
question, |
|
history=None, |
|
) |
|
print(question) |
|
print(response) |
|
pred = extract_answer(response, row) |
|
print(pred) |
|
print("======================") |
|
|
|
if "answer" in row: |
|
correct = 1 if pred == row["answer"] else 0 |
|
score.append(correct) |
|
if args.debug: |
|
print(f'{question} pred: {pred} ref: {row["answer"]}') |
|
result.append(pred) |
|
|
|
if save_result_dir: |
|
test_df["model_output"] = result |
|
test_df["model_response"] = response |
|
if score: |
|
test_df["correctness"] = score |
|
os.makedirs(save_result_dir, exist_ok=True) |
|
test_df.to_csv( |
|
os.path.join(save_result_dir, f"{subject_name}_result.csv"), |
|
encoding="utf-8", |
|
index=False, |
|
) |
|
|
|
return score |
|
|
|
|
|
def cal_mmlu(res): |
|
acc_sum_dict = dict() |
|
acc_norm_sum_dict = dict() |
|
cnt_dict = dict() |
|
acc_sum = 0.0 |
|
cnt = 0 |
|
|
|
for class_ in TASK_NAME_MAPPING.keys(): |
|
acc_sum_dict[class_] = 0.0 |
|
acc_norm_sum_dict[class_] = 0.0 |
|
cnt_dict[class_] = 0.0 |
|
|
|
for tt in TASK_NAME_MAPPING[class_]: |
|
acc_sum += sum(res[tt]) |
|
cnt += len(res[tt]) |
|
|
|
acc_sum_dict[class_] += sum(res[tt]) |
|
cnt_dict[class_] += len(res[tt]) |
|
|
|
print("\n\n\n") |
|
for k in TASK_NAME_MAPPING.keys(): |
|
if k in cnt_dict: |
|
print("%s ACC: %.2f " % (k, acc_sum_dict[k] * 100 / cnt_dict[k])) |
|
print("AVERAGE ACC:%.2f " % (acc_sum * 100 / cnt)) |
|
|
|
|
|
def main(args): |
|
print("loading model weights") |
|
if args.checkpoint_path is not None: |
|
model, tokenizer = load_models_tokenizer(args) |
|
else: |
|
model, tokenizer = None, None |
|
print("model loaded") |
|
|
|
dev_result = {} |
|
for subject_name in tqdm(SUBJECTS): |
|
|
|
|
|
test_file_path = os.path.join( |
|
args.eval_data_path, "test", f"{subject_name}_test.csv" |
|
) |
|
|
|
|
|
test_df = pd.read_csv( |
|
test_file_path, names=["question", "A", "B", "C", "D", "answer"] |
|
) |
|
|
|
score = eval_subject( |
|
model, |
|
tokenizer, |
|
subject_name, |
|
test_df, |
|
save_result_dir=f"outs_chat/mmlu_eval_result", |
|
overwrite=args.overwrite, |
|
) |
|
dev_result[subject_name] = score |
|
cal_mmlu(dev_result) |
|
|
|
|
|
TASK_NAME_MAPPING = { |
|
"stem": [ |
|
"abstract_algebra", |
|
"anatomy", |
|
"astronomy", |
|
"college_biology", |
|
"college_chemistry", |
|
"college_computer_science", |
|
"college_mathematics", |
|
"college_physics", |
|
"computer_security", |
|
"conceptual_physics", |
|
"electrical_engineering", |
|
"elementary_mathematics", |
|
"high_school_biology", |
|
"high_school_chemistry", |
|
"high_school_computer_science", |
|
"high_school_mathematics", |
|
"high_school_physics", |
|
"high_school_statistics", |
|
"machine_learning", |
|
], |
|
"Humanities": [ |
|
"formal_logic", |
|
"high_school_european_history", |
|
"high_school_us_history", |
|
"high_school_world_history", |
|
"international_law", |
|
"jurisprudence", |
|
"logical_fallacies", |
|
"moral_disputes", |
|
"moral_scenarios", |
|
"philosophy", |
|
"prehistory", |
|
"professional_law", |
|
"world_religions", |
|
], |
|
"other": [ |
|
"business_ethics", |
|
"college_medicine", |
|
"human_aging", |
|
"management", |
|
"marketing", |
|
"medical_genetics", |
|
"miscellaneous", |
|
"nutrition", |
|
"professional_accounting", |
|
"professional_medicine", |
|
"virology", |
|
"global_facts", |
|
"clinical_knowledge", |
|
], |
|
"social": [ |
|
"econometrics", |
|
"high_school_geography", |
|
"high_school_government_and_politics", |
|
"high_school_macroeconomics", |
|
"high_school_microeconomics", |
|
"high_school_psychology", |
|
"human_sexuality", |
|
"professional_psychology", |
|
"public_relations", |
|
"security_studies", |
|
"sociology", |
|
"us_foreign_policy", |
|
], |
|
} |
|
SUBJECTS = [v for vl in TASK_NAME_MAPPING.values() for v in vl] |
|
choices = ["A", "B", "C", "D"] |
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="Test HF checkpoint.") |
|
parser.add_argument( |
|
"-c", |
|
"--checkpoint-path", |
|
type=str, |
|
help="Checkpoint path", |
|
default="Qwen/Qwen-7B-Chat", |
|
) |
|
parser.add_argument("-s", "--seed", type=int, default=1234, help="Random seed") |
|
|
|
|
|
group = parser.add_argument_group(title="Evaluation options") |
|
group.add_argument("-d", "--eval_data_path", type=str, help="Path to eval data") |
|
group.add_argument( |
|
"--debug", action="store_true", default=False, help="Print infos." |
|
) |
|
group.add_argument( |
|
"--overwrite", |
|
action="store_true", |
|
default=False, |
|
help="Overwrite existed results", |
|
) |
|
|
|
args = parser.parse_args() |
|
set_seed(args.seed) |
|
|
|
main(args) |
|
|