from flask import Flask, request, jsonify, Response import requests import uuid import json import time import os import re _COOKIES = os.environ.get("COOKIES", "") API_KEY = os.getenv("API_KEY", "linux.do") app = Flask(__name__) @app.before_request def check_api_key(): key = request.headers.get("Authorization") if key != "Bearer "+API_KEY: return jsonify({"success": False, "message": "Unauthorized: Invalid API key"}), 403 @app.route('/v1/models', methods=['GET']) def get_models(): # 构建请求头 headers = { "Content-Type": "application/json", "Cookie": _COOKIES } # 发送请求到Akash API response = requests.get( 'https://chat.akash.network/api/models', headers=headers ) models_data = response.json() current_timestamp = int(time.time()) converted_data = { "object": "list", "data": [ { "id": model["id"], "object": "model", "created": current_timestamp, "owned_by": model["name"].lower().replace(" ", "_") } for model in models_data["models"] ] } return converted_data @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): print("start") try: # 获取OpenAI格式的请求数据 data = request.json # 生成唯一ID chat_id = str(uuid.uuid4()).replace('-', '')[:16] # 构建Akash格式的请求数据 akash_data = { "id": chat_id, "messages": data.get('messages', []), "model": data.get('model', "DeepSeek-R1"), "system": data.get('system_message', "You are a helpful assistant."), "temperature": data.get('temperature', 0.6), "topP": data.get('top_p', 0.95) } # 构建请求头 headers = { "Content-Type": "application/json", "Cookie": _COOKIES } _stream=data.get('stream', True) if data.get('model', "DeepSeek-R1") == "AkashGen": _stream = False # 发送请求到Akash API response = requests.post( 'https://chat.akash.network/api/chat', json=akash_data, headers=headers, stream=_stream ) def generate(): content_buffer = "" for line in response.iter_lines(): if not line: continue try: # 解析行数据,格式为 "type:json_data" line_str = line.decode('utf-8') msg_type, msg_data = line_str.split(':', 1) # 处理内容类型的消息 if msg_type == '0': # 只去掉两边的双引号 if msg_data.startswith('"') and msg_data.endswith('"'): msg_data = msg_data.replace('\\"', '"') msg_data = msg_data[1:-1] msg_data = msg_data.replace("\\n", "\n") content_buffer += msg_data # 构建 OpenAI 格式的响应块 chunk = { "id": f"chatcmpl-{chat_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": data.get('model', "DeepSeek-R1"), "choices": [{ "delta": {"content": msg_data}, "index": 0, "finish_reason": None }] } yield f"data: {json.dumps(chunk)}\n\n" # 处理结束消息 elif msg_type in ['e', 'd']: chunk = { "id": f"chatcmpl-{chat_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": data.get('model', "DeepSeek-R1"), "choices": [{ "delta": {}, "index": 0, "finish_reason": "stop" }] } yield f"data: {json.dumps(chunk)}\n\n" yield "data: [DONE]\n\n" break except Exception as e: print(f"Error processing line: {e}") continue if _stream == True: return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'text/event-stream' } ) else: if data.get('model', "DeepSeek-R1") != "AkashGen": text_matches = re.findall(r'0:"(.*?)"', response.text) parsed_text = "".join(text_matches) return Response( json.dumps({ "object": "chat.completion", "created": int(time.time() * 1000), "model": data.get('model', "DeepSeek-R1"), "choices": [ { "index": 0, "message": { "role": "user", "content": parsed_text }, "finish_reason": "stop" } ] },ensure_ascii=False), status=response.status_code, headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json' } ) else: match = re.search(r"jobId='([^']+)'", response.text) job_id = None if match: job_id = match.group(1) print("jobId:", job_id) while True: try: _img_response = requests.get( 'https://chat.akash.network/api/image-status?ids='+job_id, headers=headers ) _data = _img_response.json() # 检查响应状态码是否为200 if _data[0]["status"] == "completed": print("图片生成完成:", job_id) return Response( json.dumps({ "object": "chat.completion", "created": int(time.time() * 1000), "model": data.get('model', "DeepSeek-R1"), "choices": [ { "index": 0, "message": { "role": "user", "content": "根据您的描述,这里是一张生成的图片:\n\n![生成的图片]("+_data[0]['result']+")" }, "finish_reason": "stop" } ] },ensure_ascii=False), status=response.status_code, headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json' } ) else: print("图片生成中:", job_id) except Exception as e: print("请求过程中出现异常:", e) # 每隔5秒请求一次 time.sleep(5) else: return jsonify({"error": "当前官方服务异常"}), 500 except Exception as e: print(e) return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5200)