File size: 6,676 Bytes
62da328
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import atexit
import json
import logging
import subprocess
import time
from functools import wraps
from pathlib import Path
from subprocess import Popen
from typing import Any, Dict, List, Optional, Union

import requests
from pydantic import BaseModel

from camel.runtime import BaseRuntime
from camel.toolkits.function_tool import FunctionTool

logger = logging.getLogger(__name__)


class RemoteHttpRuntime(BaseRuntime):
    r"""A runtime that runs functions in a remote HTTP server.
    You need to run the API server in the remote server first.

    Args:
        host (str): The host of the remote server.
        port (int): The port of the remote server. (default::obj: `8000`)
        python_exec (str): The python executable to run the API server.
            (default::obj: `python3`)
    """

    def __init__(
        self, host: str, port: int = 8000, python_exec: str = "python3"
    ):
        super().__init__()
        self.host = host
        self.port = port
        self.python_exec = python_exec
        self.api_path = Path(__file__).parent / "api.py"
        self.entrypoint: Dict[str, str] = dict()
        self.process: Optional[Popen] = None

    def build(self) -> "RemoteHttpRuntime":
        r"""Build the API server.

        Returns:
            RemoteHttpRuntime: The current runtime.
        """
        self.process = subprocess.Popen(
            [
                self.python_exec,
                str(self.api_path),
                str(self.port),
                *list(self.entrypoint.values()),
            ]
        )
        atexit.register(self._cleanup)
        return self

    def _cleanup(self):
        r"""Clean up the API server when exiting."""

        if self.process and self.process.poll() is None:
            self.process.terminate()
            self.process.wait()
            self.process = None

    def add(  # type: ignore[override]
        self,
        funcs: Union[FunctionTool, List[FunctionTool]],
        entrypoint: str,
        redirect_stdout: bool = False,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> "RemoteHttpRuntime":
        r"""Add a function or list of functions to the runtime.

        Args:
            funcs (Union[FunctionTool, List[FunctionTool]]): The function or
                list of functions to add.
            entrypoint (str): The entrypoint for the function.
            redirect_stdout (bool): Whether to return the stdout of
                the function. (default::obj: `False`)
            arguments (Optional[Dict[str, Any]]): The arguments for the
                function. (default::obj: `None`)

        Returns:
            RemoteHttpRuntime: The current runtime.
        """
        if not isinstance(funcs, list):
            funcs = [funcs]
        if arguments is not None:
            entrypoint += json.dumps(arguments)

        for func in funcs:
            inner_func = func.func

            # Create a wrapper that explicitly binds `func`
            @wraps(inner_func)
            def wrapper(
                *args, func=func, redirect_stdout=redirect_stdout, **kwargs
            ):
                for key, value in kwargs.items():
                    if isinstance(value, BaseModel):
                        kwargs[key] = value.model_dump()

                resp = requests.post(
                    f"http://{self.host}:{self.port}/{func.get_function_name()}",
                    json=dict(
                        args=args,
                        kwargs=kwargs,
                        redirect_stdout=redirect_stdout,
                    ),
                )
                if resp.status_code != 200:
                    logger.error(
                        f"""ailed to execute function: 
                        {func.get_function_name()}, 
                        status code: {resp.status_code}, 
                        response: {resp.text}"""
                    )
                    return {
                        "error": f"""Failed to execute function:
                        {func.get_function_name()}, 
                        response: {resp.text}"""
                    }
                data = resp.json()
                if redirect_stdout:
                    print(data["stdout"])
                return json.loads(data["output"])

            func.func = wrapper
            self.tools_map[func.get_function_name()] = func
        self.entrypoint[func.get_function_name()] = entrypoint

        return self

    @property
    def ok(self) -> bool:
        r"""Check if the API Server is running.

        Returns:
            bool: Whether the API Server is running.
        """
        try:
            _ = requests.get(f"http://{self.host}:{self.port}")
            return True
        except requests.exceptions.ConnectionError:
            return False

    def wait(self, timeout: int = 10) -> bool:
        r"""Wait for the API Server to be ready.

        Args:
            timeout (int): The number of seconds to wait. (default::obj: `10`)

        Returns:
            bool: Whether the API Server is ready.
        """
        for _ in range(timeout):
            if self.ok:
                return True
            time.sleep(1)
        return False

    def __del__(self):
        r"""Clean up the API server when the object is deleted."""
        self._cleanup()

    def stop(self) -> "RemoteHttpRuntime":
        r"""Stop the API server.

        Returns:
            RemoteHttpRuntime: The current runtime.
        """
        self._cleanup()
        return self

    def reset(self) -> "RemoteHttpRuntime":
        r"""Reset the API server.

        Returns:
            RemoteHttpRuntime: The current runtime.
        """
        return self.stop().build()

    @property
    def docs(self) -> str:
        r"""Get the URL for the API documentation.

        Returns:
            str: The URL for the API documentation.
        """
        return f"http://{self.host}:{self.port}/docs"