File size: 4,464 Bytes
256a159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# flake8: noqa: E501
import csv
import os
import os.path as osp
import re
from collections import defaultdict
from datetime import datetime
import numpy as np
from mmengine import ConfigDict
try:
from prettytable import from_csv
except ImportError:
from_csv = None
from opencompass.utils import model_abbr_from_cfg
from .subjective_post_process import post_process_autoj
from .utils import get_judgeanswer_and_reference, get_outdir
def post_process_ir(judgement: str):
"""Input a string like below:
Conclusion: [[Correct]]\nReasoning: xxx
and extract the score
"""
matches = re.findall(r'\[\[(.*?)\]\]', judgement)
if matches:
matches = matches[0]
if matches in ['Correct', 'Wrong', '对', '错']:
if matches == 'Correct' or matches == '对':
return {'score': 1}
else:
return {'score': 0}
else:
return None
else:
return None
def get_results(
judged_answers,
references,
fout,
fout_flag,
model,
):
capability_ratings = defaultdict(int)
capability_counts = defaultdict(int)
for ans, ref in zip(judged_answers, references):
lan = ref['others']['lan']
capability_ratings['total'] += ans['score']
capability_counts['total'] += 1
capability_ratings[lan] += ans['score']
capability_counts[lan] += 1
capability_avg_ratings = defaultdict(float)
for capability, total_score in capability_ratings.items():
capability_avg_ratings[
capability] = total_score / capability_counts[capability]
scores = {model: capability_avg_ratings}
with open(fout, 'a+', newline='') as csvfile:
writer = csv.writer(csvfile)
if fout_flag == 0:
num_header = [str(i) for i in range(4)]
writer.writerow(num_header)
header = ['模型']
for category in capability_avg_ratings:
header.append(category)
writer.writerow(header)
row = [model]
for category in capability_avg_ratings:
row.append(scores[model][category])
writer.writerow(row)
class IRSummarizer:
"""Do the subjectivity analyze based on evaluation results.
Args:
config (ConfigDict): The configuration object of the evaluation task.
It's expected to be filled out at runtime.
"""
def __init__(self, config: ConfigDict, judge_type='autoj') -> None:
self.tasks = []
self.cfg = config
self.eval_model_cfgs = self.cfg['eval']['partitioner']['models']
self.eval_model_abbrs = [
model_abbr_from_cfg(model) for model in self.eval_model_cfgs
]
self.judge_abbr = model_abbr_from_cfg(self.cfg['judge_model'])
self.judge_type = judge_type
assert self.judge_type in ['general', 'autoj']
self.judge_map = {
'general': post_process_ir,
'autoj': post_process_autoj,
}
self.judge_function = self.judge_map[self.judge_type]
def summarize(self,
time_str: str = datetime.now().strftime('%Y%m%d_%H%M%S')):
"""Summarize the subjectivity analysis based on evaluation results.
Args:
time_str (str): Timestamp for file naming.
Returns:
pd.DataFrame: The summary results.
"""
dataset_cfgs = self.cfg['datasets']
output_dir, results_folder = get_outdir(self.cfg, time_str)
fout_flag = 0
for eval_model_abbr in self.eval_model_abbrs:
subdir = eval_model_abbr + '_judged-by--' + self.judge_abbr
subdir_path = os.path.join(results_folder, subdir)
if os.path.isdir(subdir_path):
model, judge_model = eval_model_abbr, self.judge_abbr
fout = osp.join(output_dir,
'judged-by--' + judge_model + '.csv')
for dataset in dataset_cfgs:
judged_answers, references = get_judgeanswer_and_reference(
dataset, subdir_path, self.judge_function)
get_results(judged_answers, references, fout, fout_flag,
model)
fout_flag += 1
else:
print(subdir_path + ' is not exist! please check!')
with open(fout, 'r') as f:
x = from_csv(f)
print(x)
|