import json import os import re import time import uuid import asyncio import requests import logging from typing import Optional, List from pydantic import BaseModel from fastapi import FastAPI, Request, Response, Depends, HTTPException, status from fastapi.responses import JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # 设置API密钥 API_KEY = os.getenv("API_KEY", "linux.do") # 创建FastAPI应用 app = FastAPI() # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 定义请求模型 class ChatMessage(BaseModel): role: str content: str class ChatCompletionRequest(BaseModel): model: str messages: List[ChatMessage] temperature: Optional[float] = 0.6 top_p: Optional[float] = 0.95 stream: Optional[bool] = True system_message: Optional[str] = "You are a helpful assistant." # 获取会话cookie的函数 async def get_session_cookie(): try: response = requests.get( "https://chat.akash.network/api/auth/session", headers={"Content-Type": "application/json"} ) if response.cookies: return '; '.join([f"{cookie.name}={cookie.value}" for cookie in response.cookies]) return "" except Exception as e: logger.error(f"获取会话cookie失败: {e}") return "" # API密钥验证依赖项 async def verify_api_key(request: Request): auth_header = request.headers.get("Authorization") if auth_header != "Bearer " + API_KEY: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail={"success": False, "message": "Unauthorized: Invalid API key"} ) return True # 获取模型列表的端点 @app.get("/v1/models") async def get_models(authorized: bool = Depends(verify_api_key)): # 获取最新的会话cookie cookie = await get_session_cookie() # 构建请求头 headers = {"Content-Type": "application/json", "Cookie": cookie} # 发送请求到Akash API response = requests.get("https://chat.akash.network/api/models", headers=headers) models_data = response.json() current_timestamp = int(time.time()) converted_data = { "object": "list", "data": [ { "id": model["id"], "object": "model", "created": current_timestamp, "owned_by": "openai" if "Meta" in model["id"] else "third_party", "permissions": [], "root": model["id"], "parent": None, "capabilities": { "temperature": model["temperature"], "top_p": model["top_p"], }, "name": model["name"], "description": model["description"], "available": model["available"], } for model in models_data.get("models", []) ], } return converted_data # 聊天完成端点 @app.post("/v1/chat/completions") async def chat_completions(request: Request, authorized: bool = Depends(verify_api_key)): logger.info("开始处理聊天完成请求") try: # 获取请求体 body = await request.json() # 获取最新的会话cookie cookie = await get_session_cookie() # 检查模型参数 if not body.get("model"): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Missing model parameter"} ) # 获取可用模型列表 models_response = requests.get( "https://chat.akash.network/api/models", headers={"Content-Type": "application/json", "Cookie": cookie} ) models_data = models_response.json() available_models = [model["id"] for model in models_data.get("models", [])] # 模型名称修正 requested_model = body.get("model") matched_model = next((model for model in available_models if model.lower() == requested_model.lower()), None) if not matched_model: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"error": f"Model '{requested_model}' not found"} ) # 生成唯一ID chat_id = str(uuid.uuid4()).replace("-", "")[:16] # 构建Akash格式的请求数据 akash_data = { "id": chat_id, "messages": body.get("messages", []), "model": matched_model, # 使用修正后的模型名称 "system": body.get("system_message", "You are a helpful assistant."), "temperature": body.get("temperature", 0.6), "topP": body.get("top_p", 0.95), } # 构建请求头 headers = {"Content-Type": "application/json", "Cookie": cookie} _stream = body.get("stream", True) # AkashGen模型特殊处理 if body.get("model", "DeepSeek-R1") == "AkashGen": _stream = False # 发送请求到Akash API response = requests.post( "https://chat.akash.network/api/chat", json=akash_data, headers=headers, stream=_stream, ) logger.debug(f"Akash API响应: {response.text}") # 处理流式响应 if _stream is True: async def generate(): content_buffer = "" for line in response.iter_lines(): if not line: continue try: # 解析行数据,格式为 "type:json_data" line_str = line.decode("utf-8") msg_type, msg_data = line_str.split(":", 1) # 处理内容类型的消息 if msg_type == "0": # 只去掉两边的双引号 if msg_data.startswith('"') and msg_data.endswith('"'): msg_data = msg_data.replace('\\"', '"') msg_data = msg_data[1:-1] msg_data = msg_data.replace("\\n", "\n") content_buffer += msg_data # 构建 OpenAI 格式的响应块 chunk = { "id": f"chatcmpl-{chat_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": body.get("model", "DeepSeek-R1"), "choices": [ { "delta": {"content": msg_data}, "index": 0, "finish_reason": None, } ], } yield f"data: {json.dumps(chunk)}\n\n" # 处理结束消息 elif msg_type in ["e", "d"]: chunk = { "id": f"chatcmpl-{chat_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": body.get("model", "DeepSeek-R1"), "choices": [ {"delta": {}, "index": 0, "finish_reason": "stop"} ], } yield f"data: {json.dumps(chunk)}\n\n" yield "data: [DONE]\n\n" break except Exception as e: logger.error(f"处理响应行时出错: {e}") continue return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "text/event-stream", }, ) else: # 处理非流式响应 if body.get("model", "DeepSeek-R1") != "AkashGen": text_matches = re.findall(r'0:"(.*?)"', response.text) parsed_text = "".join(text_matches) response_data = { "object": "chat.completion", "created": int(time.time() * 1000), "model": body.get("model", "DeepSeek-R1"), "choices": [ { "index": 0, "message": {"role": "user", "content": parsed_text}, "finish_reason": "stop", } ], } logger.debug(json.dumps(response_data, ensure_ascii=False)) return Response( content=json.dumps(response_data, ensure_ascii=False), status_code=response.status_code, headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", }, ) else: # 处理AkashGen模型(图像生成) match = re.search(r"jobId='([^']+)'", response.text) job_id = None if match: job_id = match.group(1) logger.info(f"获取到图像生成任务ID: {job_id}") # 轮询图像生成状态 async def check_image_status(): while True: try: _img_response = requests.get( "https://chat.akash.network/api/image-status?ids=" + job_id, headers=headers, ) _data = _img_response.json() # 检查图像是否生成完成 if _data[0]["status"] == "completed": logger.info(f"图片生成完成: {job_id}") return _data[0]["result"] else: logger.debug(f"图片生成中: {job_id}") except Exception as e: logger.error(f"请求图像状态时出现异常: {e}") # 每隔5秒请求一次 await asyncio.sleep(5) # 等待图像生成完成 image_url = await check_image_status() # 返回包含图像URL的响应 response_data = { "object": "chat.completion", "created": int(time.time() * 1000), "model": body.get("model", "DeepSeek-R1"), "choices": [ { "index": 0, "message": { "role": "user", "content": f"根据您的描述,这里是一张生成的图片:\n\n![生成的图片]({image_url})", }, "finish_reason": "stop", } ], } return Response( content=json.dumps(response_data, ensure_ascii=False), status_code=response.status_code, headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json", }, ) else: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": "当前官方服务异常"} ) except Exception as e: logger.error(f"处理聊天完成请求时发生错误: {e}") return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"error": str(e)} ) # 启动服务器 if __name__ == "__main__": import uvicorn logger.info("启动FastAPI服务器,监听端口5200") uvicorn.run("app:app", host="0.0.0.0", port=5200, reload=True)