File size: 6,214 Bytes
256a159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import ast
import json

import networkx as nx
import pandas as pd
from datasets import Dataset

from opencompass.openicl.icl_evaluator import BaseEvaluator
from opencompass.registry import ICL_EVALUATORS, LOAD_DATASET

from ..base import BaseDataset
from .prompts import tsp_dPrompts


def q2text(adj_matrix, distance_limit, p=tsp_dPrompts):
    total_cities = adj_matrix.shape[0]  # exclude the last row
    prompt_text = p['Intro'] + '\n' + \
                p['Initial_question'].format(total_cities=total_cities, distance_limit=distance_limit) + '\n' + \
                p['Output_content'] + '\n' + \
                p['Output_format'] + '\n' + \
                'The distances between cities are below: \n'

    for i in range(adj_matrix.shape[0]):
        for j in range(adj_matrix.shape[1]):
            if i < j:  # only use the upper triangle
                this_line = 'The distance between City {} and City {} is {}.'.format(i, j, adj_matrix[i, j])
                prompt_text += this_line + '\n'
    return prompt_text


@LOAD_DATASET.register_module(force=True)
class cmp_TSP_D_Dataset(BaseDataset):

    @staticmethod
    def load(path: str):
        raw_data = []
        data_path = path
        all_data = []
        for level in range(10):
            for file_num in range(10):
                df = pd.read_csv(data_path + 'decision_data_TSP_level_{}_instance_{}.csv'.format(level, file_num + 1),
                    header=None,
                    index_col=False)
                all_data.append((level + 1, df))

        for (level, q) in all_data:
            threshold = q.iloc[-1, 0]  # therashold is the last row
            distance_matrix = q.iloc[:
                                     -1].values  # distance matrix is the rest of the rows
            prompt = q2text(distance_matrix, threshold)
            raw_data.append({
                'prompt': prompt,
                'q': str(level) + '####\n' + json.dumps(q.to_json()),
                'level': level
            })
        dataset = Dataset.from_list(raw_data)
        return dataset


@ICL_EVALUATORS.register_module(force=True)
class cmp_TSP_D_Evaluator(BaseEvaluator):

    def score(self, predictions, references):
        assert len(predictions) == len(references)

        result = {'pass': 0, 'fail': 0}
        details = {}
        tsp_d_Results = []
        for index, (q, llm_string) in enumerate(zip(references, predictions)):
            output_dict = {}
            output, reasoning = self.parse_xml_to_dict(llm_string)
            level = int(q.split('####\n')[0])
            q = json.loads(q.split('####\n')[-1])
            q = pd.DataFrame(eval(q))
            threshold = q.iloc[-1, 0]  # therashold is the last row
            distance_matrix = q.iloc[:-1].values  # distance matrix is the rest of the rows
            output_dict['output'] = output
            try:
                output_dict['correctness'], _ = self.tsp_decision_check(distance_matrix, threshold, output)
            except Exception as e:
                print(f'Check failed: {e}')
                output_dict['correctness'] = False
            output_dict['reasoning'] = reasoning
            output_dict['level'] = level
            if output_dict:
                tsp_d_Results.append(output_dict)
                if output_dict['correctness']:
                    r = 'pass'
                else:
                    r = 'fail'

            result[r] += level
            details[str(index)] = {'q': q, 'output': output, 'result': r}

        result['score'] = result['pass'] / (result['pass'] + result['fail']) * 100
        result['details'] = details
        final_result = {'Weighted Accuracy': result['score']}
        return final_result

    def parse_xml_to_dict(self, xml_string):
        try:
            assert '<final_answer>' in xml_string
            assert '</final_answer>' in xml_string
            assert '<reasoning>' in xml_string
            assert '</reasoning>' in xml_string
            final_answer_start = xml_string.index('<final_answer>') + len('<final_answer>')
            final_answer_end = xml_string.index('</final_answer>')
            reasoning_start = xml_string.index('<reasoning>') + len('<reasoning>')
            reasoning_end = xml_string.index('</reasoning>')
            final_answer_element = xml_string[final_answer_start:final_answer_end].rstrip().strip().rstrip()
            reasoning_element = xml_string[reasoning_start:reasoning_end].rstrip().strip().rstrip()
            try:
                final_answer_element = ast.literal_eval(final_answer_element)
            except Exception:
                final_answer_element = ''
        except Exception:
            final_answer_element = ''
            reasoning_element = ''

        return final_answer_element, reasoning_element

    def tsp_approx(self, distance_matrix):
        """Returns an approximate solution to the TSP problem.

        :param distance_matrix: A 2D numpy array representing the distance matrix.
        :return: A list of the cities in the order they were visited.
        """
        G = nx.from_numpy_array(distance_matrix)
        return nx.approximation.traveling_salesman_problem(G)

    def tsp_decision_check(self, distance_matrix, threshold, tour):
        """Checks if a given TSP tour is valid and within the threshold
        distance.

        :param distance_matrix: A 2D numpy array representing the distance matrix.
        :param threshold: The maximum distance allowed.
        :param tour: A dictionary containing the feasibility.
        """
        try:
            is_feasible = tour.get('Feasible', 'no').lower() == 'yes'
        except Exception:
            return False, 'Output format incorrect'

        # Calculate the approxed distance of the tour
        tours = self.tsp_approx(distance_matrix)
        tour_distance = sum(distance_matrix[tours[i], tours[i + 1]] for i in range(len(tours) - 1)) + distance_matrix[tours[-1], tours[0]]

        if is_feasible != (tour_distance <= threshold):
            return False, f'Feasibility mismatch: {is_feasible} vs {tour_distance} > {threshold}'
        return True, 'Feasible: {} <= {}'.format(tour_distance, threshold)