abacus_chat_proxy / openai_service.py
malt666's picture
Upload 12 files
ad9a66f verified
raw
history blame
3.98 kB
from flask import Flask, request, jsonify
import tiktoken
import os
app = Flask(__name__)
# OpenAI模型映射
MODEL_MAPPINGS = {
# GPT-4系列
"gpt-4o": "o200k_base",
"gpt-4-turbo": "cl100k_base",
"gpt-4": "cl100k_base",
# GPT-3.5系列
"gpt-3.5-turbo": "cl100k_base",
"gpt-35-turbo": "cl100k_base",
# 旧模型
"text-davinci-003": "p50k_base",
"text-davinci-002": "p50k_base",
"davinci": "r50k_base",
# 嵌入模型
"text-embedding-ada-002": "cl100k_base",
}
@app.route('/count_tokens', methods=['POST'])
def count_tokens():
try:
data = request.json
messages = data.get('messages', [])
system = data.get('system')
model = data.get('model', 'gpt-3.5-turbo')
# 根据模型名称选择合适的编码器
model_key = model.lower()
encoding_name = None
# 查找完全匹配
if model_key in MODEL_MAPPINGS:
encoding_name = MODEL_MAPPINGS[model_key]
else:
# 查找部分匹配
for key in MODEL_MAPPINGS:
if key in model_key:
encoding_name = MODEL_MAPPINGS[key]
break
# 如果没有找到匹配,使用默认的cl100k_base编码器
if not encoding_name:
encoding_name = "cl100k_base" # 最常用的编码器
# 获取编码器
try:
encoding = tiktoken.get_encoding(encoding_name)
except KeyError:
# 如果找不到编码器,使用gpt-3.5-turbo的编码器
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
# 计算tokens
total_tokens = 0
# 按照OpenAI的格式计算tokens
# 参考: https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
# 对于ChatGPT模型,每个请求都有3个隐藏tokens
if encoding_name in ["cl100k_base", "o200k_base"]:
# 每条消息开头有3个token,结尾有1个token
total_tokens += 3 # 每个请求的起始tokens
# 计算每条消息的tokens
for message in messages:
total_tokens += 4 # 每条消息增加4个token (包括角色)
for key, value in message.items():
total_tokens += len(encoding.encode(value))
# 名称字段比较少见,但也计入
if key == "name":
total_tokens -= 1 # 角色名称单独token计算减免
# 计算system消息的token
if system:
total_tokens += 4 # system消息也视为一条消息
total_tokens += len(encoding.encode(system))
else:
# 对于旧模型,只计算文本的token数量
all_text = ""
if system:
all_text += system + "\n\n"
for message in messages:
role = message.get('role', '')
content = message.get('content', '')
all_text += f"{role}: {content}\n"
total_tokens = len(encoding.encode(all_text))
return jsonify({
'input_tokens': total_tokens,
'model': model,
'encoding': encoding_name
})
except Exception as e:
return jsonify({
'error': str(e)
}), 400
@app.route('/health', methods=['GET'])
def health():
return jsonify({
'status': 'healthy',
'tokenizer': 'openai-tiktoken',
'supported_models': list(MODEL_MAPPINGS.keys())
})
if __name__ == '__main__':
app.run(host='127.0.0.1', port=7862)