|
import json |
|
import os |
|
from typing import List, Optional, AsyncGenerator |
|
|
|
from camel.toolkits import MCPToolkit |
|
from contextlib import AsyncExitStack, asynccontextmanager |
|
|
|
|
|
class MCPToolkitManager: |
|
r"""MCPToolkitManager is a class for managing multiple MCPToolkit |
|
instances and providing unified connection management. |
|
|
|
Attributes: |
|
toolkits (List[MCPToolkit]): A list of MCPToolkit instances to be |
|
managed. |
|
""" |
|
|
|
def __init__(self, toolkits: List[MCPToolkit]): |
|
self.toolkits = toolkits |
|
self._exit_stack: Optional[AsyncExitStack] = None |
|
self._connected = False |
|
|
|
|
|
@staticmethod |
|
def from_config(config_path: str) -> "MCPToolkitManager": |
|
r"""Loads an MCPToolkit instance from a JSON configuration file and |
|
returns an MCPToolkitManager instance. |
|
|
|
Args: |
|
config_path (str): The path to the JSON configuration file. |
|
|
|
Returns: |
|
MCPToolkitManager: The MCPToolkitManager instance. |
|
""" |
|
with open(config_path, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
all_toolkits = [] |
|
|
|
|
|
mcp_servers = data.get("mcpServers", {}) |
|
for name, cfg in mcp_servers.items(): |
|
toolkit = MCPToolkit( |
|
command_or_url=cfg["command"], |
|
args=cfg.get("args", []), |
|
env={**os.environ, **cfg.get("env", {})}, |
|
timeout=cfg.get("timeout", None), |
|
) |
|
all_toolkits.append(toolkit) |
|
|
|
|
|
mcp_web_servers = data.get("mcpWebServers", {}) |
|
for name, cfg in mcp_web_servers.items(): |
|
toolkit = MCPToolkit( |
|
command_or_url=cfg["url"], |
|
timeout=cfg.get("timeout", None), |
|
) |
|
all_toolkits.append(toolkit) |
|
|
|
return MCPToolkitManager(all_toolkits) |
|
|
|
@asynccontextmanager |
|
async def connection(self) -> AsyncGenerator["MCPToolkitManager", None]: |
|
r"""Connect multiple MCPToolkit instances and close them when |
|
leaving""" |
|
self._exit_stack = AsyncExitStack() |
|
try: |
|
for tk in self.toolkits: |
|
await self._exit_stack.enter_async_context(tk.connection()) |
|
self._connected = True |
|
yield self |
|
finally: |
|
self._connected = False |
|
await self._exit_stack.aclose() |
|
self._exit_stack = None |
|
|
|
def is_connected(self) -> bool: |
|
r"""Returns whether the MCPToolkitManager is connected.""" |
|
return self._connected |
|
|
|
def get_all_tools(self): |
|
r"""Returns all tools from all MCPToolkit instances.""" |
|
all_tools = [] |
|
for tk in self.toolkits: |
|
all_tools.extend(tk.get_tools()) |
|
return all_tools |