|
|
|
import sys |
|
import re |
|
import json |
|
import os |
|
import subprocess |
|
from deep_convert import deep_convert |
|
from openai import OpenAI |
|
from dotenv import load_dotenv |
|
|
|
def solve_puzzle(index, puzzle, expected_solution, sys_content, sat_cnt_content): |
|
""" |
|
使用 sys_content + puzzle 调用模型,生成 Z3 Python 代码, |
|
运行并比对输出是否与 expected_solution 相同。 |
|
""" |
|
load_dotenv() |
|
client = OpenAI( |
|
api_key=os.getenv("GEMINI_API_KEY"), |
|
base_url="https://generativelanguage.googleapis.com/v1beta/openai/" |
|
) |
|
|
|
messages = [ |
|
{"role": "user", "content": sys_content}, |
|
{"role": "user", "content": puzzle + f'\nCategories: {str(expected_solution["header"][1:])}\n'}, |
|
] |
|
attempts = 0 |
|
current_solution = None |
|
|
|
while attempts < 3: |
|
messages.append({"role": "user", "content": "Make sure you stick to the given categories with strictly identical names otherwise there will be a critical error."}) |
|
attempts += 1 |
|
response = client.chat.completions.create( |
|
model="gemini-2.0-flash", |
|
messages=messages, |
|
temperature=0.0, |
|
stream=False |
|
) |
|
content = response.choices[0].message.content |
|
messages.append(response.choices[0].message) |
|
|
|
|
|
code_blocks = re.findall(r"```(?:python)?(.*?)```", content, re.DOTALL) |
|
if not code_blocks: |
|
messages.append({"role": "user", "content": "Please write a complete Python code in your response. Try again."}) |
|
continue |
|
|
|
code_to_run = code_blocks[-1].strip() |
|
result = subprocess.run( |
|
[sys.executable, "-c", code_to_run], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True |
|
) |
|
output = result.stdout.strip() |
|
|
|
|
|
if result.stderr.strip(): |
|
messages.append({"role": "user", "content": f"Your code has errors: {result.stderr.strip()}. Please check your code and provide the complete code in the end."}) |
|
continue |
|
|
|
try: |
|
current_solution = json.loads(output) |
|
except json.JSONDecodeError: |
|
messages.append({"role": "user", "content": "Your output is not valid JSON. Please ensure your code prints the solution as a JSON dictionary."}) |
|
continue |
|
|
|
if 'rows' in current_solution.keys() and current_solution['rows'] == deep_convert(expected_solution)['rows']: |
|
return { |
|
"index": index, |
|
"success": True, |
|
"solution": current_solution, |
|
"attempts": attempts, |
|
"generatedCode": code_to_run, |
|
"modelResponse": content, |
|
"problematicConstraints": None |
|
} |
|
else: |
|
messages.append({"role": "user", "content": "The solution does not match the expected answer. Please check your categories and step by step reason your constraints and provide the complete code in the end."}) |
|
|
|
if result.stderr.strip(): |
|
return { |
|
"index": index, |
|
"success": False, |
|
"solution": current_solution, |
|
"attempts": attempts, |
|
"generatedCode": code_to_run, |
|
"modelResponse": content, |
|
"problematicConstraints": "Code error:\n" + result.stderr.strip() |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
code_by_line = code_to_run.split("\n") |
|
|
|
fisrt_cons_line = None |
|
last_cons_line = None |
|
for i, line in enumerate(code_by_line): |
|
if "s.add" in line and "pos" in line: |
|
if not fisrt_cons_line: |
|
fisrt_cons_line = i |
|
last_cons_line = i |
|
|
|
experiment_code_line = code_by_line[:fisrt_cons_line] |
|
categories = expected_solution['header'] |
|
for i, houses in enumerate(expected_solution['rows']): |
|
for j in range(1, len(houses)): |
|
experiment_code_line.append(f"s.add(pos(\"{categories[j]}\", \"{houses[j]}\") == {i+1})") |
|
experiment_code_line.append("") |
|
experiment_code_line.append("print(s.check())") |
|
|
|
def satisfied(constraint): |
|
experiment_code_line[-2] = constraint |
|
experiment_code = "\n".join(experiment_code_line) |
|
sat_checker = experiment_code.strip() |
|
result = subprocess.run( |
|
[sys.executable, "-c", sat_checker], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True |
|
) |
|
if result.stderr.strip(): |
|
return "error: " + result.stderr.strip() |
|
output = result.stdout.strip() |
|
return output.lower() |
|
|
|
if "error" in current_solution.keys(): |
|
if "Multiple" in current_solution['error']: |
|
problematic_constraints = "Issue: Multiple solutions\n" |
|
else: |
|
problematic_constraints = "Issue: No solution\n" |
|
else: |
|
problematic_constraints = "Issue: Wrong answer\n" |
|
|
|
cnt_cons = 0 |
|
ok_satisfied = True |
|
problematic_constraints += "\nSatisfaction judge:\n" |
|
for i, line in enumerate(code_by_line): |
|
if "s.add" in line and "pos" in line: |
|
constraint = line.strip() |
|
cnt_cons += 1 |
|
output = satisfied(constraint) |
|
if "error" in output: |
|
problematic_constraints += f"Error when checking {cnt_cons}-th constraint: {output}.\n" |
|
ok_satisfied = False |
|
break |
|
if output == "unsat": |
|
ok_satisfied = False |
|
problematic_constraints += f"In line {i + 1}, the {cnt_cons}-th constraint: {constraint}. Not satisfied.\n" |
|
if ok_satisfied: |
|
problematic_constraints += "All constraints are satisfied.\n" |
|
|
|
|
|
|
|
|
|
|
|
cnt_cons = 0 |
|
problematic_constraints += "\nRedundency judge:\n" |
|
code_by_line_experiment = code_to_run.split("\n")[:last_cons_line + 1] |
|
code_by_line_experiment.append("\n") |
|
def run_result(): |
|
experiment_code = "\n".join(code_by_line_experiment) + sat_cnt_content |
|
sat_checker = experiment_code.strip() |
|
result = subprocess.run( |
|
[sys.executable, "-c", sat_checker], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True |
|
) |
|
return result.stdout.strip() |
|
res = run_result() |
|
|
|
|
|
if not res or res == "error": |
|
problematic_constraints += f"Unable to judge." |
|
else: |
|
cur = int(res.split(':')[1]) |
|
problematic_constraints += f"The number of solutions: {cur}\n" |
|
for i, line in enumerate(code_by_line): |
|
if "s.add" in line and "pos" in line: |
|
cnt_cons += 1 |
|
code_by_line_experiment[i] = "" |
|
res = run_result() |
|
if not res or res == "error": |
|
problematic_constraints += f"Unable to judge further." |
|
break |
|
now_cnt = int(res.split(':')[1]) |
|
if now_cnt == cur: |
|
problematic_constraints += f"In line {i + 1}, the {cnt_cons}-th constraint: {line}. A suspect redundency.\n" |
|
code_by_line_experiment[i] = line |
|
|
|
return { |
|
"index": index, |
|
"success": False, |
|
"solution": current_solution, |
|
"attempts": attempts, |
|
"generatedCode": code_to_run, |
|
"modelResponse": content, |
|
"problematicConstraints": problematic_constraints |
|
} |
|
|