Zebra / backend /solver.py
guoj5's picture
Fix some issues
52d50ce
# solver.py
import sys
import re
import json
import os
import subprocess
from deep_convert import deep_convert
from openai import OpenAI
from dotenv import load_dotenv
def solve_puzzle(index, puzzle, expected_solution, sys_content, sat_cnt_content):
"""
使用 sys_content + puzzle 调用模型,生成 Z3 Python 代码,
运行并比对输出是否与 expected_solution 相同。
"""
load_dotenv()
client = OpenAI(
api_key=os.getenv("GEMINI_API_KEY"),
base_url="https://generativelanguage.googleapis.com/v1beta/openai/"
)
messages = [
{"role": "user", "content": sys_content},
{"role": "user", "content": puzzle + f'\nCategories: {str(expected_solution["header"][1:])}\n'},
]
attempts = 0
current_solution = None
while attempts < 3:
messages.append({"role": "user", "content": "Make sure you stick to the given categories with strictly identical names otherwise there will be a critical error."})
attempts += 1
response = client.chat.completions.create(
model="gemini-2.0-flash",
messages=messages,
temperature=0.0,
stream=False
)
content = response.choices[0].message.content
messages.append(response.choices[0].message)
# 提取 code
code_blocks = re.findall(r"```(?:python)?(.*?)```", content, re.DOTALL)
if not code_blocks:
messages.append({"role": "user", "content": "Please write a complete Python code in your response. Try again."})
continue
code_to_run = code_blocks[-1].strip()
result = subprocess.run(
[sys.executable, "-c", code_to_run],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = result.stdout.strip()
# print(output)
if result.stderr.strip():
messages.append({"role": "user", "content": f"Your code has errors: {result.stderr.strip()}. Please check your code and provide the complete code in the end."})
continue
try:
current_solution = json.loads(output)
except json.JSONDecodeError:
messages.append({"role": "user", "content": "Your output is not valid JSON. Please ensure your code prints the solution as a JSON dictionary."})
continue
if 'rows' in current_solution.keys() and current_solution['rows'] == deep_convert(expected_solution)['rows']:
return {
"index": index,
"success": True,
"solution": current_solution,
"attempts": attempts,
"generatedCode": code_to_run,
"modelResponse": content,
"problematicConstraints": None
}
else:
messages.append({"role": "user", "content": "The solution does not match the expected answer. Please check your categories and step by step reason your constraints and provide the complete code in the end."})
if result.stderr.strip():
return {
"index": index,
"success": False,
"solution": current_solution,
"attempts": attempts,
"generatedCode": code_to_run,
"modelResponse": content,
"problematicConstraints": "Code error:\n" + result.stderr.strip()
}
# if 'rows' not in current_solution.keys():
# return {
# "index": index,
# "success": False,
# "solution": current_solution,
# "attempts": attempts,
# "generatedCode": code_to_run,
# "modelResponse": content,
# "problematicConstraints": str(current_solution)
# }
code_by_line = code_to_run.split("\n")
fisrt_cons_line = None
last_cons_line = None
for i, line in enumerate(code_by_line):
if "s.add" in line and "pos" in line:
if not fisrt_cons_line:
fisrt_cons_line = i
last_cons_line = i
experiment_code_line = code_by_line[:fisrt_cons_line]
categories = expected_solution['header']
for i, houses in enumerate(expected_solution['rows']):
for j in range(1, len(houses)):
experiment_code_line.append(f"s.add(pos(\"{categories[j]}\", \"{houses[j]}\") == {i+1})")
experiment_code_line.append("")
experiment_code_line.append("print(s.check())")
def satisfied(constraint):
experiment_code_line[-2] = constraint
experiment_code = "\n".join(experiment_code_line)
sat_checker = experiment_code.strip()
result = subprocess.run(
[sys.executable, "-c", sat_checker],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if result.stderr.strip():
return "error: " + result.stderr.strip()
output = result.stdout.strip()
return output.lower()
if "error" in current_solution.keys():
if "Multiple" in current_solution['error']:
problematic_constraints = "Issue: Multiple solutions\n"
else:
problematic_constraints = "Issue: No solution\n"
else:
problematic_constraints = "Issue: Wrong answer\n"
cnt_cons = 0
ok_satisfied = True
problematic_constraints += "\nSatisfaction judge:\n"
for i, line in enumerate(code_by_line):
if "s.add" in line and "pos" in line:
constraint = line.strip()
cnt_cons += 1
output = satisfied(constraint)
if "error" in output:
problematic_constraints += f"Error when checking {cnt_cons}-th constraint: {output}.\n"
ok_satisfied = False
break
if output == "unsat":
ok_satisfied = False
problematic_constraints += f"In line {i + 1}, the {cnt_cons}-th constraint: {constraint}. Not satisfied.\n"
if ok_satisfied:
problematic_constraints += "All constraints are satisfied.\n"
# with open("Sat_cnt.txt", "r") as f:
# sat_cnt = f.read()
# Cannot directly load because the server strucutre
cnt_cons = 0
problematic_constraints += "\nRedundency judge:\n"
code_by_line_experiment = code_to_run.split("\n")[:last_cons_line + 1]
code_by_line_experiment.append("\n")
def run_result():
experiment_code = "\n".join(code_by_line_experiment) + sat_cnt_content
sat_checker = experiment_code.strip()
result = subprocess.run(
[sys.executable, "-c", sat_checker],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.stdout.strip()
res = run_result()
# print("\n".join(code_by_line_experiment) + sat_cnt_content)
# print(res)
if not res or res == "error":
problematic_constraints += f"Unable to judge."
else:
cur = int(res.split(':')[1])
problematic_constraints += f"The number of solutions: {cur}\n"
for i, line in enumerate(code_by_line):
if "s.add" in line and "pos" in line:
cnt_cons += 1
code_by_line_experiment[i] = ""
res = run_result()
if not res or res == "error":
problematic_constraints += f"Unable to judge further."
break
now_cnt = int(res.split(':')[1])
if now_cnt == cur:
problematic_constraints += f"In line {i + 1}, the {cnt_cons}-th constraint: {line}. A suspect redundency.\n"
code_by_line_experiment[i] = line
return {
"index": index,
"success": False,
"solution": current_solution,
"attempts": attempts,
"generatedCode": code_to_run,
"modelResponse": content,
"problematicConstraints": problematic_constraints
}