|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import ast |
|
import json |
|
import logging |
|
import os |
|
import uuid |
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union |
|
|
|
if TYPE_CHECKING: |
|
from cohere.types import ChatMessageV2, ChatResponse |
|
|
|
from camel.configs import COHERE_API_PARAMS, CohereConfig |
|
from camel.messages import OpenAIMessage |
|
from camel.models import BaseModelBackend |
|
from camel.types import ChatCompletion, ModelType |
|
from camel.utils import ( |
|
BaseTokenCounter, |
|
OpenAITokenCounter, |
|
api_keys_required, |
|
) |
|
|
|
try: |
|
if os.getenv("AGENTOPS_API_KEY") is not None: |
|
from agentops import LLMEvent, record |
|
else: |
|
raise ImportError |
|
except (ImportError, AttributeError): |
|
LLMEvent = None |
|
|
|
|
|
class CohereModel(BaseModelBackend): |
|
r"""Cohere API in a unified BaseModelBackend interface.""" |
|
|
|
def __init__( |
|
self, |
|
model_type: Union[ModelType, str], |
|
model_config_dict: Optional[Dict[str, Any]] = None, |
|
api_key: Optional[str] = None, |
|
url: Optional[str] = None, |
|
token_counter: Optional[BaseTokenCounter] = None, |
|
): |
|
import cohere |
|
|
|
if model_config_dict is None: |
|
model_config_dict = CohereConfig().as_dict() |
|
|
|
api_key = api_key or os.environ.get("COHERE_API_KEY") |
|
url = url or os.environ.get("COHERE_API_BASE_URL") |
|
super().__init__( |
|
model_type, model_config_dict, api_key, url, token_counter |
|
) |
|
self._client = cohere.ClientV2(api_key=self._api_key) |
|
|
|
def _to_openai_response(self, response: 'ChatResponse') -> ChatCompletion: |
|
if response.usage and response.usage.tokens: |
|
input_tokens = response.usage.tokens.input_tokens or 0 |
|
output_tokens = response.usage.tokens.output_tokens or 0 |
|
usage = { |
|
"prompt_tokens": input_tokens, |
|
"completion_tokens": output_tokens, |
|
"total_tokens": input_tokens + output_tokens, |
|
} |
|
else: |
|
usage = {} |
|
|
|
tool_calls = response.message.tool_calls |
|
choices = [] |
|
if tool_calls: |
|
for tool_call in tool_calls: |
|
openai_tool_calls = [ |
|
dict( |
|
id=tool_call.id, |
|
function={ |
|
"name": tool_call.function.name, |
|
"arguments": tool_call.function.arguments, |
|
} |
|
if tool_call.function |
|
else {}, |
|
type=tool_call.type, |
|
) |
|
] |
|
|
|
choice = dict( |
|
index=None, |
|
message={ |
|
"role": "assistant", |
|
"content": response.message.tool_plan, |
|
"tool_calls": openai_tool_calls, |
|
}, |
|
finish_reason=response.finish_reason |
|
if response.finish_reason |
|
else None, |
|
) |
|
choices.append(choice) |
|
|
|
else: |
|
openai_tool_calls = None |
|
|
|
choice = dict( |
|
index=None, |
|
message={ |
|
"role": "assistant", |
|
"content": response.message.content[0].text, |
|
"tool_calls": openai_tool_calls, |
|
}, |
|
finish_reason=response.finish_reason |
|
if response.finish_reason |
|
else None, |
|
) |
|
choices.append(choice) |
|
|
|
obj = ChatCompletion.construct( |
|
id=response.id, |
|
choices=choices, |
|
created=None, |
|
model=self.model_type, |
|
object="chat.completion", |
|
usage=usage, |
|
) |
|
return obj |
|
|
|
def _to_cohere_chatmessage( |
|
self, messages: List[OpenAIMessage] |
|
) -> List["ChatMessageV2"]: |
|
from cohere.types import ToolCallV2Function |
|
from cohere.types.chat_message_v2 import ( |
|
AssistantChatMessageV2, |
|
SystemChatMessageV2, |
|
ToolCallV2, |
|
ToolChatMessageV2, |
|
UserChatMessageV2, |
|
) |
|
|
|
tool_call_id = None |
|
new_messages = [] |
|
for msg in messages: |
|
role = msg.get("role") |
|
content = msg.get("content") |
|
function_call = msg.get("function_call") |
|
|
|
if role == "user": |
|
new_message = UserChatMessageV2(role="user", content=content) |
|
elif role in {"tool", "function"}: |
|
new_message = ToolChatMessageV2( |
|
role="tool", |
|
tool_call_id=tool_call_id, |
|
content=content, |
|
) |
|
elif role == "assistant": |
|
if not function_call: |
|
new_message = AssistantChatMessageV2( |
|
role="assistant", |
|
content=content, |
|
) |
|
else: |
|
arguments = function_call.get("arguments") |
|
arguments_dict = ast.literal_eval(arguments) |
|
arguments_json = json.dumps(arguments_dict) |
|
|
|
assis_tool_call_id = str(uuid.uuid4()) |
|
tool_call_id = assis_tool_call_id |
|
new_message = AssistantChatMessageV2( |
|
role="assistant", |
|
tool_calls=[ |
|
ToolCallV2( |
|
id=assis_tool_call_id, |
|
type="function", |
|
function=ToolCallV2Function( |
|
name=function_call.get("name"), |
|
arguments=arguments_json, |
|
), |
|
) |
|
], |
|
content=content, |
|
) |
|
elif role == "system": |
|
new_message = SystemChatMessageV2( |
|
role="system", |
|
content=content, |
|
) |
|
else: |
|
raise ValueError(f"Unsupported message role: {role}") |
|
|
|
new_messages.append(new_message) |
|
return new_messages |
|
|
|
@property |
|
def token_counter(self) -> BaseTokenCounter: |
|
r"""Initialize the token counter for the model backend. |
|
|
|
Returns: |
|
BaseTokenCounter: The token counter following the model's |
|
tokenization style. |
|
""" |
|
if not self._token_counter: |
|
self._token_counter = OpenAITokenCounter( |
|
model=ModelType.GPT_4O_MINI |
|
) |
|
return self._token_counter |
|
|
|
@api_keys_required("COHERE_API_KEY") |
|
def run(self, messages: List[OpenAIMessage]) -> ChatCompletion: |
|
r"""Runs inference of Cohere chat completion. |
|
|
|
Args: |
|
messages (List[OpenAIMessage]): Message list with the chat history |
|
in OpenAI API format. |
|
Returns: |
|
ChatCompletion. |
|
""" |
|
from cohere.core.api_error import ApiError |
|
|
|
cohere_messages = self._to_cohere_chatmessage(messages) |
|
|
|
try: |
|
response = self._client.chat( |
|
messages=cohere_messages, |
|
model=self.model_type, |
|
**self.model_config_dict, |
|
) |
|
except ApiError as e: |
|
logging.error(f"Cohere API Error: {e.status_code}") |
|
logging.error(f"Error body: {e.body}") |
|
raise |
|
except Exception as e: |
|
logging.error(f"Unexpected error when calling Cohere API: {e!s}") |
|
raise |
|
|
|
openai_response = self._to_openai_response(response) |
|
|
|
|
|
if LLMEvent: |
|
llm_event = LLMEvent( |
|
thread_id=openai_response.id, |
|
prompt=" ".join( |
|
[message.get("content") for message in messages] |
|
), |
|
prompt_tokens=openai_response.usage.prompt_tokens, |
|
completion=openai_response.choices[0].message.content, |
|
completion_tokens=openai_response.usage.completion_tokens, |
|
model=self.model_type, |
|
) |
|
record(llm_event) |
|
|
|
return openai_response |
|
|
|
def check_model_config(self): |
|
r"""Check whether the model configuration contains any unexpected |
|
arguments to Cohere API. |
|
|
|
Raises: |
|
ValueError: If the model configuration dictionary contains any |
|
unexpected arguments to Cohere API. |
|
""" |
|
for param in self.model_config_dict: |
|
if param not in COHERE_API_PARAMS: |
|
raise ValueError( |
|
f"Unexpected argument `{param}` is " |
|
"input into Cohere model backend." |
|
) |
|
|
|
@property |
|
def stream(self) -> bool: |
|
r"""Returns whether the model is in stream mode, which sends partial |
|
results each time. Current it's not supported. |
|
|
|
Returns: |
|
bool: Whether the model is in stream mode. |
|
""" |
|
return False |
|
|