import json import traceback from collections import defaultdict level_dict = { "crossword": ["5_5", "10_10", "15_15"], "acrostic": ["easy", "hard"], "logic": ["4_4", "4_5", "4_6", "4_7"], "cryptogram": ["easy", "hard"], "sudoku": ["4_4_easy", "4_4_hard", "9_9_easy", "9_9_hard"], "drop": ["easy", "hard"] } def norm_dict(d): if d: return {str(key).lower(): str(value).replace(" ", "").lower() for key, value in d.items()} else: return {} def calculate_dict_correct(gold, prediction_text): try: prediction = eval(prediction_text) gold = norm_dict(gold) prediction = norm_dict(prediction) matching_dict = {} correct_cnt = 0 for key, gold_value in gold.items(): predicted_value = prediction.get(key, "MISSING") is_correct = (gold_value == predicted_value) correct_cnt += is_correct matching_dict[key] = { "gold": gold_value, "model": predicted_value, "correct": is_correct } correct_100 = (correct_cnt == len(gold)) correct_50 = (correct_cnt / len(gold) >= 0.5) except Exception as e: print(prediction_text) print(f"Error: {e}") print(traceback.format_exc()) correct_cnt = 0 correct_100 = False correct_50 = False matching_dict = { key: { "gold": gold[key], "model": f"ERROR: {str(e)}", "correct": False } for key in gold.keys() } return correct_cnt, correct_100, correct_50, matching_dict def calculate_logic_answer_correct(gold, prediction_text): def norm(ans): return [{str(key).lower(): str(value).lower() for key, value in d.items()} for d in ans] try: prediction = eval(prediction_text) gold = norm(gold) prediction = norm(prediction) except Exception as e: print(f"Error: {e}") print(traceback.format_exc()) prediction = [] correct_cnt = 0 all_cnt = 0 for d_gold in gold: first_pair = list(d_gold.items())[0] d_prediction = [d for d in prediction if first_pair in list(d.items())] if not d_prediction: d_prediction = {} else: d_prediction = d_prediction[0] for key, gold_value in d_gold.items(): if key == first_pair[0]: continue all_cnt += 1 predicted_value = d_prediction.get(key, "") if gold_value == predicted_value: correct_cnt += 1 correct_100 = (correct_cnt == all_cnt) correct_50 = (correct_cnt / all_cnt >= 0.5) return correct_cnt, all_cnt, correct_100, correct_50 def calculate_sudoku_answer_correct(grid, gold, prediction_text): try: prediction = eval(prediction_text) except Exception as e: print(f"Error: {e}") print(traceback.format_exc()) prediction = [[]] all_cnt = sum([row.count(0) for row in grid]) correct_cnt = 0 for i in range(min(len(gold), len(prediction))): for j in range(min(len(gold[i]), len(prediction[i]))): if gold[i][j] == prediction[i][j] and grid[i][j] == 0: correct_cnt += 1 if correct_cnt > all_cnt: print("Error: correct_cnt > all_cnt") correct_cnt = all_cnt correct_100 = (correct_cnt == all_cnt) correct_50 = (correct_cnt / all_cnt >= 0.5) return correct_cnt, all_cnt, correct_100, correct_50 def calculate_drop_answer_correct(gold, prediction_text): try: prediction = eval(prediction_text) except Exception as e: print(f"Error: {e}") print(traceback.format_exc()) prediction = [[]] all_cnt = len([x for row in gold for x in row if x != "#"]) correct_cnt = 0 for i in range(min(len(gold), len(prediction))): for j in range(min(len(gold[i]), len(prediction[i]))): if gold[i][j] != "#" and gold[i][j] == prediction[i][j]: correct_cnt += 1 if correct_cnt > all_cnt: print("Error: correct_cnt > all_cnt") correct_cnt = all_cnt correct_100 = (correct_cnt == all_cnt) correct_50 = (correct_cnt / all_cnt >= 0.5) return correct_cnt, all_cnt, correct_100, correct_50 def eval_crossword(data_list, golden_list): eval_dict = defaultdict(dict) for level in level_dict["crossword"]: golden = [g for g in golden_list if g["level"] == level] golden_dict = {g["tag"]: g for g in golden} data = [d for d in data_list if d["level"] == level] answer_exist_cnt = 0 subtask_cnt = 0 subtask_correct_cnt = 0 sample_correct_100_cnt = 0 sample_correct_50_cnt = 0 for d in data: tag = str(d["tag"]) model_answer = d['answer'] gold = json.loads(golden_dict[tag]['answer']) if model_answer != "{}": answer_exist_cnt += 1 curr_subtask_correct_cnt, curr_correct_100, curr_correct_50, matching_dict = calculate_dict_correct(gold, model_answer) subtask_cnt += len(gold) subtask_correct_cnt += curr_subtask_correct_cnt sample_correct_100_cnt += curr_correct_100 sample_correct_50_cnt += curr_correct_50 eval_dict[level] = { "CR": answer_exist_cnt / len(data), "S-Acc": subtask_correct_cnt / subtask_cnt, "EM": sample_correct_100_cnt / len(data), "PM-0.5": sample_correct_50_cnt / len(data), } return eval_dict def eval_acrostic(data_list, golden_list): eval_dict = defaultdict(dict) for level in level_dict["acrostic"]: golden = [g for g in golden_list if g["level"] == level] golden_dict = {g["tag"]: g for g in golden} data = [d for d in data_list if d["level"] == level] answer_exist_cnt = 0 subtask_cnt = 0 subtask_correct_cnt = 0 sample_correct_100_cnt = 0 sample_correct_50_cnt = 0 for d in data: tag = str(d["tag"]) model_answer = d['answer'] gold = json.loads(golden_dict[tag]['answer']) if model_answer != "{}": answer_exist_cnt += 1 curr_subtask_correct_cnt, curr_correct_100, curr_correct_50, matching_dict = calculate_dict_correct(gold, model_answer) subtask_cnt += len(gold) subtask_correct_cnt += curr_subtask_correct_cnt sample_correct_100_cnt += curr_correct_100 sample_correct_50_cnt += curr_correct_50 eval_dict[level] = { "CR": answer_exist_cnt / len(data), "S-Acc": subtask_correct_cnt / subtask_cnt, "EM": sample_correct_100_cnt / len(data), "PM-0.5": sample_correct_50_cnt / len(data), } return eval_dict def eval_logic(data_list, golden_list): eval_dict = defaultdict(dict) for level in level_dict["logic"]: golden = [g for g in golden_list if g["level"] == level] golden_dict = {g["tag"]: g for g in golden} data = [d for d in data_list if d["level"] == level] answer_exist_cnt = 0 subtask_cnt = 0 subtask_correct_cnt = 0 sample_correct_100_cnt = 0 sample_correct_50_cnt = 0 for d in data: tag = str(d["tag"]) model_answer = d['answer'] gold = json.loads(golden_dict[tag]['answer']) if model_answer != "[]": answer_exist_cnt += 1 curr_subtask_correct_cnt, curr_subtask_cnt, curr_correct_100, curr_correct_50 = calculate_logic_answer_correct(gold, model_answer) subtask_cnt += curr_subtask_cnt subtask_correct_cnt += curr_subtask_correct_cnt sample_correct_100_cnt += curr_correct_100 sample_correct_50_cnt += curr_correct_50 eval_dict[level] = { "CR": answer_exist_cnt / len(data), "S-Acc": subtask_correct_cnt / subtask_cnt, "EM": sample_correct_100_cnt / len(data), "PM-0.5": sample_correct_50_cnt / len(data), } return eval_dict def eval_cryptogram(data_list, golden_list): eval_dict = defaultdict(dict) for level in level_dict["cryptogram"]: golden = [g for g in golden_list if g["level"] == level] golden_dict = {g["tag"]: g for g in golden} data = [d for d in data_list if d["level"] == level] answer_exist_cnt = 0 subtask_cnt = 0 subtask_correct_cnt = 0 sample_correct_100_cnt = 0 sample_correct_50_cnt = 0 for d in data: tag = str(d["tag"]) model_answer = d['answer'] gold = json.loads(golden_dict[tag]['answer']) if model_answer != "{}": answer_exist_cnt += 1 curr_subtask_correct_cnt, curr_correct_100, curr_correct_50, matching_dict = calculate_dict_correct(gold, model_answer) subtask_cnt += len(gold) subtask_correct_cnt += curr_subtask_correct_cnt sample_correct_100_cnt += curr_correct_100 sample_correct_50_cnt += curr_correct_50 eval_dict[level] = { "CR": answer_exist_cnt / len(data), "S-Acc": subtask_correct_cnt / subtask_cnt, "EM": sample_correct_100_cnt / len(data), "PM-0.5": sample_correct_50_cnt / len(data), } return eval_dict def eval_sudoku(data_list, golden_list): eval_dict = defaultdict(dict) for level in level_dict["sudoku"]: golden = [g for g in golden_list if g["level"] == level] golden_dict = {g["tag"]: g for g in golden} data = [d for d in data_list if d["level"] == level] answer_exist_cnt = 0 subtask_cnt = 0 subtask_correct_cnt = 0 sample_correct_100_cnt = 0 sample_correct_50_cnt = 0 for d in data: tag = str(d["tag"]) model_answer = d['answer'] gold = json.loads(golden_dict[tag]['answer']) grid = gold["grid"] gold = gold["answer"] if model_answer != "[[]]": answer_exist_cnt += 1 curr_subtask_correct_cnt, curr_subtask_cnt, curr_correct_100, curr_correct_50 = calculate_sudoku_answer_correct(grid, gold, model_answer) subtask_cnt += curr_subtask_cnt subtask_correct_cnt += curr_subtask_correct_cnt sample_correct_100_cnt += curr_correct_100 sample_correct_50_cnt += curr_correct_50 eval_dict[level] = { "CR": answer_exist_cnt / len(data), "S-Acc": subtask_correct_cnt / subtask_cnt, "EM": sample_correct_100_cnt / len(data), "PM-0.5": sample_correct_50_cnt / len(data), } return eval_dict def eval_drop(data_list, golden_list): eval_dict = defaultdict(dict) for level in level_dict["drop"]: golden = [g for g in golden_list if g["level"] == level] golden_dict = {g["tag"]: g for g in golden} data = [d for d in data_list if d["level"] == level] answer_exist_cnt = 0 subtask_cnt = 0 subtask_correct_cnt = 0 sample_correct_100_cnt = 0 sample_correct_50_cnt = 0 for d in data: tag = str(d["tag"]) model_answer = d['answer'] gold = json.loads(golden_dict[tag]['answer']) if model_answer != "[[]]": answer_exist_cnt += 1 curr_subtask_correct_cnt, curr_subtask_cnt, curr_correct_100, curr_correct_50 = calculate_drop_answer_correct(gold, model_answer) subtask_cnt += curr_subtask_cnt subtask_correct_cnt += curr_subtask_correct_cnt sample_correct_100_cnt += curr_correct_100 sample_correct_50_cnt += curr_correct_50 eval_dict[level] = { "CR": answer_exist_cnt / len(data), "S-Acc": subtask_correct_cnt / subtask_cnt, "EM": sample_correct_100_cnt / len(data), "PM-0.5": sample_correct_50_cnt / len(data), } return eval_dict def evaluate(data_list, golden_list, task): if task == "crossword": return eval_crossword(data_list, golden_list) elif task == "acrostic": return eval_acrostic(data_list, golden_list) elif task == "logic": return eval_logic(data_list, golden_list) elif task == "cryptogram": return eval_cryptogram(data_list, golden_list) elif task == "sudoku": return eval_sudoku(data_list, golden_list) elif task == "drop": return eval_drop(data_list, golden_list) else: raise ValueError(f"Invalid task: {task}")