|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import ast |
|
import difflib |
|
import importlib |
|
import typing |
|
from typing import Any, ClassVar, Dict, List, Optional |
|
|
|
from camel.interpreters.base import BaseInterpreter |
|
from camel.interpreters.interpreter_error import InterpreterError |
|
|
|
|
|
class InternalPythonInterpreter(BaseInterpreter): |
|
r"""A customized python interpreter to control the execution of |
|
LLM-generated codes. The interpreter makes sure the code can only execute |
|
functions given in action space and import white list. It also supports |
|
fuzzy variable matching to retrieve uncertain input variable name. |
|
|
|
.. highlight:: none |
|
|
|
This class is adapted from the hugging face implementation |
|
`python_interpreter.py <https://github.com/huggingface/transformers/blob/8f |
|
093fb799246f7dd9104ff44728da0c53a9f67a/src/transformers/tools/python_interp |
|
reter.py>`_. The original license applies:: |
|
|
|
Copyright 2023 The HuggingFace Inc. team. All rights reserved. |
|
|
|
Licensed under the Apache License, Version 2.0 (the "License"); |
|
you may not use this file except in compliance with the License. |
|
You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
|
implied. See the License for the specific language governing |
|
permissions and limitations under the License. |
|
|
|
We have modified the original code to suit our requirements. We have |
|
encapsulated the original functions within a class and saved the |
|
interpreter state after execution. We have added support for "import" |
|
statements, "for" statements, and several binary and unary operators. We |
|
have added import white list to keep `import` statement safe. Additionally, |
|
we have modified the variable matching logic and introduced the |
|
:obj:`fuzz_state` for fuzzy matching. |
|
|
|
Modifications copyright (C) 2023 CAMEL-AI.org |
|
|
|
Args: |
|
action_space (Dict[str, Any], optional): A dictionary that maps action |
|
names to their corresponding functions or objects. The interpreter |
|
can only execute functions that are either directly listed in this |
|
dictionary or are member functions of objects listed in this |
|
dictionary. The concept of :obj:`action_space` is derived from |
|
EmbodiedAgent, representing the actions that an agent is capable of |
|
performing. If `None`, set to empty dict. (default: :obj:`None`) |
|
import_white_list (List[str], optional): A list that stores |
|
the Python modules or functions that can be imported in the code. |
|
All submodules and functions of the modules listed in this list are |
|
importable. Any other import statements will be rejected. The |
|
module and its submodule or function name are separated by a period |
|
(:obj:`.`). (default: :obj:`None`) |
|
unsafe_mode (bool, optional): If `True`, the interpreter runs the code |
|
by `eval()` without any security check. (default: :obj:`False`) |
|
raise_error (bool, optional): Raise error if the interpreter fails. |
|
(default: :obj:`False`) |
|
""" |
|
|
|
_CODE_TYPES: ClassVar[List[str]] = ["python", "py", "python3", "python2"] |
|
|
|
def __init__( |
|
self, |
|
action_space: Optional[Dict[str, Any]] = None, |
|
import_white_list: Optional[List[str]] = None, |
|
unsafe_mode: bool = False, |
|
raise_error: bool = False, |
|
) -> None: |
|
self.action_space = action_space or dict() |
|
self.state = self.action_space.copy() |
|
self.fuzz_state: Dict[str, Any] = dict() |
|
self.import_white_list = import_white_list or list() |
|
self.raise_error = raise_error |
|
self.unsafe_mode = unsafe_mode |
|
|
|
def run(self, code: str, code_type: str) -> str: |
|
r"""Executes the given code with specified code type in the |
|
interpreter. |
|
|
|
This method takes a string of code and its type, checks if the code |
|
type is supported, and then executes the code. If `unsafe_mode` is |
|
set to `False`, the code is executed in a controlled environment using |
|
the `execute` method. If `unsafe_mode` is `True`, the code is executed |
|
using `eval()` with the action space as the global context. An |
|
`InterpreterError` is raised if the code type is unsupported or if any |
|
runtime error occurs during execution. |
|
|
|
Args: |
|
code (str): The python code to be executed. |
|
code_type (str): The type of the code, which should be one of the |
|
supported code types (`python`, `py`, `python3`, `python2`). |
|
|
|
|
|
Returns: |
|
str: The string representation of the output of the executed code. |
|
|
|
Raises: |
|
InterpreterError: If the `code_type` is not supported or if any |
|
runtime error occurs during the execution of the code. |
|
""" |
|
if code_type not in self._CODE_TYPES: |
|
raise InterpreterError( |
|
f"Unsupported code type {code_type}. " |
|
f"`{self.__class__.__name__}` only supports " |
|
f"{', '.join(self._CODE_TYPES)}." |
|
) |
|
if not self.unsafe_mode: |
|
return str(self.execute(code)) |
|
else: |
|
return str(eval(code, self.action_space)) |
|
|
|
def update_action_space(self, action_space: Dict[str, Any]) -> None: |
|
r"""Updates action space for *python* interpreter.""" |
|
self.action_space.update(action_space) |
|
|
|
def supported_code_types(self) -> List[str]: |
|
r"""Provides supported code types by the interpreter.""" |
|
return self._CODE_TYPES |
|
|
|
def execute( |
|
self, |
|
code: str, |
|
state: Optional[Dict[str, Any]] = None, |
|
fuzz_state: Optional[Dict[str, Any]] = None, |
|
keep_state: bool = True, |
|
) -> Any: |
|
r"""Execute the input python codes in a security environment. |
|
|
|
Args: |
|
code (str): Generated python code to be executed. |
|
state (Optional[Dict[str, Any]], optional): External variables that |
|
may be used in the generated code. (default: :obj:`None`) |
|
fuzz_state (Optional[Dict[str, Any]], optional): External variables |
|
that do not have certain variable names. The interpreter will |
|
use fuzzy matching to access these variables. For example, if |
|
:obj:`fuzz_state` has a variable :obj:`image`, the generated |
|
code can use :obj:`input_image` to access it. (default: |
|
:obj:`None`) |
|
keep_state (bool, optional): If :obj:`True`, :obj:`state` and |
|
:obj:`fuzz_state` will be kept for later execution. Otherwise, |
|
they will be cleared. (default: :obj:`True`) |
|
|
|
Returns: |
|
Any: The value of the last statement (excluding "import") in the |
|
code. For this interpreter, the value of an expression is its |
|
value, the value of an "assign" statement is the assigned |
|
value, and the value of an "if" and "for" block statement is |
|
the value of the last statement in the block. |
|
""" |
|
if state is not None: |
|
self.state.update(state) |
|
if fuzz_state is not None: |
|
self.fuzz_state.update(fuzz_state) |
|
|
|
try: |
|
expression = ast.parse(code) |
|
except SyntaxError as e: |
|
if self.raise_error: |
|
raise InterpreterError(f"Syntax error in code: {e}") |
|
else: |
|
import traceback |
|
|
|
return traceback.format_exc() |
|
|
|
result = None |
|
for idx, node in enumerate(expression.body): |
|
try: |
|
line_result = self._execute_ast(node) |
|
except InterpreterError as e: |
|
if not keep_state: |
|
self.clear_state() |
|
msg = ( |
|
f"Evaluation of the code stopped at node {idx}. " |
|
f"See:\n{e}" |
|
) |
|
|
|
|
|
if self.raise_error: |
|
raise InterpreterError(msg) |
|
else: |
|
import traceback |
|
|
|
return traceback.format_exc() |
|
if line_result is not None: |
|
result = line_result |
|
|
|
if not keep_state: |
|
self.clear_state() |
|
|
|
return result |
|
|
|
def clear_state(self) -> None: |
|
r"""Initialize :obj:`state` and :obj:`fuzz_state`.""" |
|
self.state = self.action_space.copy() |
|
self.fuzz_state = {} |
|
|
|
|
|
|
|
@typing.no_type_check |
|
def _execute_ast(self, expression: ast.AST) -> Any: |
|
if isinstance(expression, ast.Assign): |
|
|
|
|
|
|
|
return self._execute_assign(expression) |
|
elif isinstance(expression, ast.Attribute): |
|
value = self._execute_ast(expression.value) |
|
return getattr(value, expression.attr) |
|
elif isinstance(expression, ast.BinOp): |
|
|
|
return self._execute_binop(expression) |
|
elif isinstance(expression, ast.Call): |
|
|
|
return self._execute_call(expression) |
|
elif isinstance(expression, ast.Compare): |
|
|
|
return self._execute_condition(expression) |
|
elif isinstance(expression, ast.Constant): |
|
|
|
return expression.value |
|
elif isinstance(expression, ast.Dict): |
|
|
|
result: Dict = {} |
|
for k, v in zip(expression.keys, expression.values): |
|
if k is not None: |
|
result[self._execute_ast(k)] = self._execute_ast(v) |
|
else: |
|
result.update(self._execute_ast(v)) |
|
return result |
|
elif isinstance(expression, ast.Expr): |
|
|
|
return self._execute_ast(expression.value) |
|
elif isinstance(expression, ast.For): |
|
return self._execute_for(expression) |
|
elif isinstance(expression, ast.FormattedValue): |
|
|
|
|
|
return self._execute_ast(expression.value) |
|
elif isinstance(expression, ast.If): |
|
|
|
return self._execute_if(expression) |
|
elif isinstance(expression, ast.Import): |
|
|
|
self._execute_import(expression) |
|
return None |
|
elif isinstance(expression, ast.ImportFrom): |
|
self._execute_import_from(expression) |
|
return None |
|
elif hasattr(ast, "Index") and isinstance(expression, ast.Index): |
|
|
|
return self._execute_ast(expression.value) |
|
elif isinstance(expression, ast.JoinedStr): |
|
return "".join( |
|
[str(self._execute_ast(v)) for v in expression.values] |
|
) |
|
elif isinstance(expression, ast.List): |
|
|
|
return [self._execute_ast(elt) for elt in expression.elts] |
|
elif isinstance(expression, ast.Name): |
|
|
|
return self._execute_name(expression) |
|
elif isinstance(expression, ast.Subscript): |
|
|
|
return self._execute_subscript(expression) |
|
elif isinstance(expression, ast.Tuple): |
|
return tuple([self._execute_ast(elt) for elt in expression.elts]) |
|
elif isinstance(expression, ast.UnaryOp): |
|
|
|
return self._execute_unaryop(expression) |
|
else: |
|
|
|
|
|
raise InterpreterError( |
|
f"{expression.__class__.__name__} is not supported." |
|
) |
|
|
|
def _execute_assign(self, assign: ast.Assign) -> Any: |
|
targets = assign.targets |
|
result = self._execute_ast(assign.value) |
|
|
|
for target in targets: |
|
self._assign(target, result) |
|
return result |
|
|
|
def _assign(self, target: ast.expr, value: Any): |
|
if isinstance(target, ast.Name): |
|
self.state[target.id] = value |
|
elif isinstance(target, ast.Tuple): |
|
if not isinstance(value, tuple): |
|
raise InterpreterError( |
|
f"Expected type tuple, but got" |
|
f"{value.__class__.__name__} instead." |
|
) |
|
if len(target.elts) != len(value): |
|
raise InterpreterError( |
|
f"Expected {len(target.elts)} values but got" |
|
f" {len(value)}." |
|
) |
|
for t, v in zip(target.elts, value): |
|
self.state[self._execute_ast(t)] = v |
|
else: |
|
raise InterpreterError( |
|
f"Unsupported variable type. Expected " |
|
f"ast.Name or ast.Tuple, got " |
|
f"{target.__class__.__name__} instead." |
|
) |
|
|
|
def _execute_call(self, call: ast.Call) -> Any: |
|
callable_func = self._execute_ast(call.func) |
|
|
|
|
|
args = [self._execute_ast(arg) for arg in call.args] |
|
kwargs = { |
|
keyword.arg: self._execute_ast(keyword.value) |
|
for keyword in call.keywords |
|
} |
|
return callable_func(*args, **kwargs) |
|
|
|
def _execute_subscript(self, subscript: ast.Subscript): |
|
index = self._execute_ast(subscript.slice) |
|
value = self._execute_ast(subscript.value) |
|
if not isinstance(subscript.ctx, ast.Load): |
|
raise InterpreterError( |
|
f"{subscript.ctx.__class__.__name__} is not supported for " |
|
"subscript." |
|
) |
|
if isinstance(value, (list, tuple)): |
|
return value[int(index)] |
|
if index in value: |
|
return value[index] |
|
if isinstance(index, str) and isinstance(value, dict): |
|
close_matches = difflib.get_close_matches( |
|
index, |
|
[key for key in list(value.keys()) if isinstance(key, str)], |
|
) |
|
if len(close_matches) > 0: |
|
return value[close_matches[0]] |
|
|
|
raise InterpreterError(f"Could not index {value} with '{index}'.") |
|
|
|
def _execute_name(self, name: ast.Name): |
|
if isinstance(name.ctx, ast.Store): |
|
return name.id |
|
elif isinstance(name.ctx, ast.Load): |
|
return self._get_value_from_state(name.id) |
|
else: |
|
raise InterpreterError(f"{name.ctx} is not supported.") |
|
|
|
def _execute_condition(self, condition: ast.Compare): |
|
if len(condition.ops) > 1: |
|
raise InterpreterError( |
|
"Cannot evaluate conditions with multiple operators" |
|
) |
|
|
|
left = self._execute_ast(condition.left) |
|
comparator = condition.ops[0] |
|
right = self._execute_ast(condition.comparators[0]) |
|
|
|
if isinstance(comparator, ast.Eq): |
|
return left == right |
|
elif isinstance(comparator, ast.NotEq): |
|
return left != right |
|
elif isinstance(comparator, ast.Lt): |
|
return left < right |
|
elif isinstance(comparator, ast.LtE): |
|
return left <= right |
|
elif isinstance(comparator, ast.Gt): |
|
return left > right |
|
elif isinstance(comparator, ast.GtE): |
|
return left >= right |
|
elif isinstance(comparator, ast.Is): |
|
return left is right |
|
elif isinstance(comparator, ast.IsNot): |
|
return left is not right |
|
elif isinstance(comparator, ast.In): |
|
return left in right |
|
elif isinstance(comparator, ast.NotIn): |
|
return left not in right |
|
else: |
|
raise InterpreterError(f"Unsupported operator: {comparator}") |
|
|
|
def _execute_if(self, if_statement: ast.If): |
|
result = None |
|
if not isinstance(if_statement.test, ast.Compare): |
|
raise InterpreterError( |
|
"Only Campare expr supported in if statement, get" |
|
f" {if_statement.test.__class__.__name__}" |
|
) |
|
if self._execute_condition(if_statement.test): |
|
for line in if_statement.body: |
|
line_result = self._execute_ast(line) |
|
if line_result is not None: |
|
result = line_result |
|
else: |
|
for line in if_statement.orelse: |
|
line_result = self._execute_ast(line) |
|
if line_result is not None: |
|
result = line_result |
|
return result |
|
|
|
def _execute_for(self, for_statement: ast.For): |
|
result = None |
|
for value in self._execute_ast(for_statement.iter): |
|
self._assign(for_statement.target, value) |
|
for line in for_statement.body: |
|
line_result = self._execute_ast(line) |
|
if line_result is not None: |
|
result = line_result |
|
|
|
return result |
|
|
|
def _execute_import(self, import_module: ast.Import) -> None: |
|
for module in import_module.names: |
|
self._validate_import(module.name) |
|
alias = module.asname or module.name |
|
self.state[alias] = importlib.import_module(module.name) |
|
|
|
def _execute_import_from(self, import_from: ast.ImportFrom): |
|
if import_from.module is None: |
|
raise InterpreterError("\"from . import\" is not supported.") |
|
for import_name in import_from.names: |
|
full_name = import_from.module + f".{import_name.name}" |
|
self._validate_import(full_name) |
|
imported_module = importlib.import_module(import_from.module) |
|
alias = import_name.asname or import_name.name |
|
self.state[alias] = getattr(imported_module, import_name.name) |
|
|
|
def _validate_import(self, full_name: str): |
|
tmp_name = "" |
|
found_name = False |
|
for name in full_name.split("."): |
|
tmp_name += name if tmp_name == "" else f".{name}" |
|
if tmp_name in self.import_white_list: |
|
found_name = True |
|
return |
|
|
|
if not found_name: |
|
raise InterpreterError( |
|
f"It is not permitted to import modules " |
|
f"than module white list (try to import " |
|
f"{full_name})." |
|
) |
|
|
|
def _execute_binop(self, binop: ast.BinOp): |
|
left = self._execute_ast(binop.left) |
|
operator = binop.op |
|
right = self._execute_ast(binop.right) |
|
|
|
if isinstance(operator, ast.Add): |
|
return left + right |
|
elif isinstance(operator, ast.Sub): |
|
return left - right |
|
elif isinstance(operator, ast.Mult): |
|
return left * right |
|
elif isinstance(operator, ast.Div): |
|
return left / right |
|
elif isinstance(operator, ast.FloorDiv): |
|
return left // right |
|
elif isinstance(operator, ast.Mod): |
|
return left % right |
|
elif isinstance(operator, ast.Pow): |
|
return left**right |
|
elif isinstance(operator, ast.LShift): |
|
return left << right |
|
elif isinstance(operator, ast.RShift): |
|
return left >> right |
|
elif isinstance(operator, ast.MatMult): |
|
return left @ right |
|
else: |
|
raise InterpreterError(f"Operator not supported: {operator}") |
|
|
|
def _execute_unaryop(self, unaryop: ast.UnaryOp): |
|
operand = self._execute_ast(unaryop.operand) |
|
operator = unaryop.op |
|
|
|
if isinstance(operator, ast.UAdd): |
|
return +operand |
|
elif isinstance(operator, ast.USub): |
|
return -operand |
|
elif isinstance(operator, ast.Not): |
|
return not operand |
|
else: |
|
raise InterpreterError(f"Operator not supported: {operator}") |
|
|
|
def _get_value_from_state(self, key: str) -> Any: |
|
if key in self.state: |
|
return self.state[key] |
|
else: |
|
close_matches = difflib.get_close_matches( |
|
key, list(self.fuzz_state.keys()), n=1 |
|
) |
|
if close_matches: |
|
return self.fuzz_state[close_matches[0]] |
|
else: |
|
raise InterpreterError(f"The variable `{key}` is not defined.") |
|
|