Spaces:
Runtime error
Runtime error
File size: 2,690 Bytes
ed4d993 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
import functools
import logging
import multiprocessing
import re
import sys
from io import StringIO
from typing import Dict, Optional
from langchain.pydantic_v1 import BaseModel, Field
logger = logging.getLogger(__name__)
@functools.lru_cache(maxsize=None)
def warn_once() -> None:
"""Warn once about the dangers of PythonREPL."""
logger.warning("Python REPL can execute arbitrary code. Use with caution.")
class PythonREPL(BaseModel):
"""Simulates a standalone Python REPL."""
globals: Optional[Dict] = Field(default_factory=dict, alias="_globals")
locals: Optional[Dict] = Field(default_factory=dict, alias="_locals")
@staticmethod
def sanitize_input(query: str) -> str:
"""Sanitize input to the python REPL.
Remove whitespace, backtick & python
(if llm mistakes python console as terminal)
Args:
query: The query to sanitize
Returns:
str: The sanitized query
"""
query = re.sub(r"^(\s|`)*(?i:python)?\s*", "", query)
query = re.sub(r"(\s|`)*$", "", query)
return query
@classmethod
def worker(
cls,
command: str,
globals: Optional[Dict],
locals: Optional[Dict],
queue: multiprocessing.Queue,
) -> None:
old_stdout = sys.stdout
sys.stdout = mystdout = StringIO()
try:
cleaned_command = cls.sanitize_input(command)
exec(cleaned_command, globals, locals)
sys.stdout = old_stdout
queue.put(mystdout.getvalue())
except Exception as e:
sys.stdout = old_stdout
queue.put(repr(e))
def run(self, command: str, timeout: Optional[int] = None) -> str:
"""Run command with own globals/locals and returns anything printed.
Timeout after the specified number of seconds."""
# Warn against dangers of PythonREPL
warn_once()
queue: multiprocessing.Queue = multiprocessing.Queue()
# Only use multiprocessing if we are enforcing a timeout
if timeout is not None:
# create a Process
p = multiprocessing.Process(
target=self.worker, args=(command, self.globals, self.locals, queue)
)
# start it
p.start()
# wait for the process to finish or kill it after timeout seconds
p.join(timeout)
if p.is_alive():
p.terminate()
return "Execution timed out"
else:
self.worker(command, self.globals, self.locals, queue)
# get the result from the worker function
return queue.get()
|