|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
from openai import OpenAI, Stream |
|
|
|
from camel.configs import DEEPSEEK_API_PARAMS, DeepSeekConfig |
|
from camel.logger import get_logger |
|
from camel.messages import OpenAIMessage |
|
from camel.models.base_model import BaseModelBackend |
|
from camel.types import ( |
|
ChatCompletion, |
|
ChatCompletionChunk, |
|
ModelType, |
|
) |
|
from camel.utils import BaseTokenCounter, OpenAITokenCounter, api_keys_required |
|
from retry import retry |
|
import json |
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
class DeepSeekModel(BaseModelBackend): |
|
r"""DeepSeek API in a unified BaseModelBackend interface. |
|
|
|
Args: |
|
model_type (Union[ModelType, str]): Model for which a backend is |
|
created. |
|
model_config_dict (Optional[Dict[str, Any]], optional): A dictionary |
|
that will be fed into:obj:`openai.ChatCompletion.create()`. If |
|
:obj:`None`, :obj:`DeepSeekConfig().as_dict()` will be used. |
|
(default: :obj:`None`) |
|
api_key (Optional[str], optional): The API key for authenticating with |
|
the DeepSeek service. (default: :obj:`None`) |
|
url (Optional[str], optional): The url to the DeepSeek service. |
|
(default: :obj:`https://api.deepseek.com`) |
|
token_counter (Optional[BaseTokenCounter], optional): Token counter to |
|
use for the model. If not provided, :obj:`OpenAITokenCounter` |
|
will be used. (default: :obj:`None`) |
|
|
|
References: |
|
https://api-docs.deepseek.com/ |
|
""" |
|
|
|
def __init__( |
|
self, |
|
model_type: Union[ModelType, str], |
|
model_config_dict: Optional[Dict[str, Any]] = None, |
|
api_key: Optional[str] = None, |
|
url: Optional[str] = None, |
|
token_counter: Optional[BaseTokenCounter] = None, |
|
) -> None: |
|
if model_config_dict is None: |
|
model_config_dict = DeepSeekConfig().as_dict() |
|
api_key = api_key or os.environ.get("DEEPSEEK_API_KEY") |
|
url = url or os.environ.get( |
|
"DEEPSEEK_API_BASE_URL", |
|
"https://api.deepseek.com", |
|
) |
|
super().__init__( |
|
model_type, model_config_dict, api_key, url, token_counter |
|
) |
|
|
|
self._client = OpenAI( |
|
timeout=180, |
|
max_retries=3, |
|
api_key=self._api_key, |
|
base_url=self._url, |
|
) |
|
|
|
@property |
|
def token_counter(self) -> BaseTokenCounter: |
|
r"""Initialize the token counter for the model backend. |
|
|
|
Returns: |
|
BaseTokenCounter: The token counter following the model's |
|
tokenization style. |
|
""" |
|
if not self._token_counter: |
|
self._token_counter = OpenAITokenCounter( |
|
model=ModelType.GPT_4O_MINI |
|
) |
|
return self._token_counter |
|
|
|
@retry((ValueError, TypeError, json.decoder.JSONDecodeError), delay=10, logger=logger) |
|
def run( |
|
self, |
|
messages: List[OpenAIMessage], |
|
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
r"""Runs inference of DeepSeek chat completion. |
|
|
|
Args: |
|
messages (List[OpenAIMessage]): Message list with the chat history |
|
in OpenAI API format. |
|
|
|
Returns: |
|
Union[ChatCompletion, Stream[ChatCompletionChunk]]: |
|
`ChatCompletion` in the non-stream mode, or |
|
`Stream[ChatCompletionChunk]` in the stream mode. |
|
""" |
|
|
|
|
|
if self.model_type in [ |
|
ModelType.DEEPSEEK_REASONER, |
|
]: |
|
import re |
|
|
|
logger.warning( |
|
"You are using a DeepSeek Reasoner model, " |
|
"which has certain limitations, reference: " |
|
"`https://api-docs.deepseek.com/guides/reasoning_model#api-parameters`" |
|
) |
|
|
|
|
|
|
|
unsupported_keys = [ |
|
"temperature", |
|
"top_p", |
|
"presence_penalty", |
|
"frequency_penalty", |
|
"logprobs", |
|
"top_logprobs", |
|
"tools", |
|
] |
|
for key in unsupported_keys: |
|
if key in self.model_config_dict: |
|
del self.model_config_dict[key] |
|
|
|
|
|
|
|
messages = [ |
|
{ |
|
**msg, |
|
'content': re.sub( |
|
r'<think>.*?</think>', |
|
'', |
|
msg['content'], |
|
flags=re.DOTALL, |
|
).strip(), |
|
} |
|
for msg in messages |
|
] |
|
|
|
response = self._client.chat.completions.create( |
|
messages=messages, |
|
model=self.model_type, |
|
**self.model_config_dict, |
|
) |
|
|
|
|
|
if ( |
|
self.model_type |
|
in [ |
|
ModelType.DEEPSEEK_REASONER, |
|
] |
|
and os.environ.get("GET_REASONING_CONTENT", "false").lower() |
|
== "true" |
|
): |
|
reasoning_content = response.choices[0].message.reasoning_content |
|
combined_content = ( |
|
f"<think>\n{reasoning_content}\n</think>\n" |
|
if reasoning_content |
|
else "" |
|
) + response.choices[0].message.content |
|
|
|
response = ChatCompletion.construct( |
|
id=response.id, |
|
choices=[ |
|
dict( |
|
index=response.choices[0].index, |
|
message={ |
|
"role": response.choices[0].message.role, |
|
"content": combined_content, |
|
"tool_calls": None, |
|
}, |
|
finish_reason=response.choices[0].finish_reason |
|
if response.choices[0].finish_reason |
|
else None, |
|
) |
|
], |
|
created=response.created, |
|
model=response.model, |
|
object="chat.completion", |
|
usage=response.usage, |
|
) |
|
|
|
return response |
|
|
|
def check_model_config(self): |
|
r"""Check whether the model configuration contains any |
|
unexpected arguments to DeepSeek API. |
|
|
|
Raises: |
|
ValueError: If the model configuration dictionary contains any |
|
unexpected arguments to DeepSeek API. |
|
""" |
|
for param in self.model_config_dict: |
|
if param not in DEEPSEEK_API_PARAMS: |
|
raise ValueError( |
|
f"Unexpected argument `{param}` is " |
|
"input into DeepSeek model backend." |
|
) |
|
|
|
@property |
|
def stream(self) -> bool: |
|
r"""Returns whether the model is in stream mode, which sends partial |
|
results each time. |
|
|
|
Returns: |
|
bool: Whether the model is in stream mode. |
|
""" |
|
return self.model_config_dict.get("stream", False) |