dfa32412 commited on
Commit
e7b1094
·
verified ·
1 Parent(s): baf8eb5

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +361 -298
app.py CHANGED
@@ -1,298 +1,361 @@
1
- import os
2
- import httpx
3
- import json
4
- from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
- from fastapi.security import APIKeyHeader
6
- from fastapi.responses import StreamingResponse, JSONResponse
7
- import logging
8
- from contextlib import asynccontextmanager
9
- import typing
10
- import itertools # For key rotation
11
- import asyncio # For potential sleep during retry
12
-
13
- # --- Configuration ---
14
-
15
- # --- Client Authentication (Proxy Access) ---
16
- # Load Allowed Client API Keys (for clients talking to this proxy)
17
- ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
- if not ALLOWED_API_KEYS_STR:
19
- raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
- ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
- if not ALLOWED_KEYS:
22
- raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
- logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
-
25
- # --- Upstream API Configuration ---
26
- # URL to fetch upstream API keys from (one key per line)
27
- UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
-
29
- # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
- # Or required if the upstream target needs a key in a different way sometimes.
31
- # Let's make it optional now.
32
- DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
-
34
- # Upstream API Base URL
35
- OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://models.aixplain.com/api/v1")
36
- OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
-
38
- if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
- raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
-
41
- # --- Logging ---
42
- logging.basicConfig(level=logging.INFO)
43
- logger = logging.getLogger(__name__)
44
-
45
- # --- Authentication Dependency (Client -> Proxy) ---
46
- api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
- async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
- """Dependency to verify the client's API key provided to this proxy."""
49
- if not api_key_header:
50
- logger.warning("Missing Authorization header from client")
51
- raise HTTPException(status_code=401, detail="Missing Authorization header")
52
- parts = api_key_header.split()
53
- if len(parts) != 2 or parts[0].lower() != "bearer":
54
- logger.warning(f"Invalid Authorization header format from client.")
55
- raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
- client_api_key = parts[1]
57
- if client_api_key not in ALLOWED_KEYS:
58
- truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
- logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
- raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
- logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
- return client_api_key
63
-
64
- # --- Key Fetching and Rotation Logic ---
65
- async def fetch_upstream_keys(url: str) -> list[str]:
66
- """Fetches keys from the given URL, one key per line."""
67
- keys = []
68
- try:
69
- async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
- logger.info(f"Fetching upstream API keys from: {url}")
71
- response = await client.get(url)
72
- response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
- content = response.text
74
- keys = [line.strip() for line in content.splitlines() if line.strip()]
75
- logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
- if not keys:
77
- logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
- return keys
79
- except httpx.RequestError as e:
80
- logger.error(f"Error fetching upstream keys from {url}: {e}")
81
- return [] # Return empty list on fetch error
82
- except httpx.HTTPStatusError as e:
83
- logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
- logger.error(f"Response body: {e.response.text}")
85
- return [] # Return empty list on bad status
86
-
87
- # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
- @asynccontextmanager
89
- async def lifespan(app: FastAPI):
90
- # --- Initialize Upstream Key Iterator ---
91
- upstream_keys = []
92
- if UPSTREAM_KEYS_URL:
93
- upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
-
95
- if not upstream_keys:
96
- logger.warning("No upstream keys fetched from URL or URL not provided.")
97
- if DEFAULT_OPENAI_API_KEY:
98
- logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
- upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
- else:
101
- # Critical failure - no keys available
102
- logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
- # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
- # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
- # Or raise an exception here to prevent startup:
106
- raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
-
108
- # Store keys and create the cycling iterator in app.state
109
- app.state.upstream_api_keys = upstream_keys
110
- app.state.key_iterator = itertools.cycle(upstream_keys)
111
- logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
-
113
- # --- Initialize HTTPX Client ---
114
- logger.info("Initializing main HTTPX client...")
115
- timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
- client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
- app.state.http_client = client # Store client in app.state
118
- logger.info("HTTPX client initialized.")
119
-
120
- yield # Application runs here
121
-
122
- # --- Cleanup ---
123
- logger.info("Closing HTTPX client...")
124
- await app.state.http_client.aclose()
125
- logger.info("HTTPX client closed.")
126
- app.state.upstream_api_keys = [] # Clear keys
127
- app.state.key_iterator = None
128
- logger.info("Upstream keys cleared.")
129
-
130
- # --- FastAPI App ---
131
- app = FastAPI(lifespan=lifespan)
132
-
133
- # --- Streaming Helper ---
134
- async def yield_openai_chunks(response_body):
135
- """Asynchronously yields chunks from the upstream response stream."""
136
- # (Content remains the same as before)
137
- logger.info("Starting to stream chunks from upstream...")
138
- try:
139
- resp_json = json.loads(response_body.decode())
140
-
141
- for choices in resp_json["choices"]:
142
- choices["delta"] = choices["message"]
143
- del choices["message"]
144
-
145
- yield "data:" + json.dumps(resp_json) + "\n\n"
146
- yield "data: [DONE]"
147
- except Exception as e:
148
- logger.error(f"Error during streaming upstream response: {e}")
149
- finally:
150
- logger.info("Upstream streaming response closed.")
151
-
152
-
153
- # --- Proxy Endpoint ---
154
- @app.post("/v1/chat/completions")
155
- async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
156
- """
157
- Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
158
- Uses rotated keys for upstream authentication.
159
- """
160
- client: httpx.AsyncClient = request.app.state.http_client
161
- key_iterator = request.app.state.key_iterator
162
-
163
- if not client or not key_iterator:
164
- logger.error("HTTPX client or Key Iterator not available (app state issue).")
165
- raise HTTPException(status_code=503, detail="Service temporarily unavailable")
166
-
167
- # --- Get Next Upstream API Key ---
168
- try:
169
- current_upstream_key = next(key_iterator)
170
- # Log rotation (optional, consider security of logging key info)
171
- # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
172
- except StopIteration:
173
- # This should not happen if lifespan logic is correct and keys were loaded
174
- logger.error("Upstream key iterator exhausted unexpectedly.")
175
- raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
176
- except Exception as e:
177
- logger.error(f"Unexpected error getting next key: {e}")
178
- raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
179
-
180
- # --- Get Request Data ---
181
- try:
182
- request_body = await request.body()
183
- payload = json.loads(request_body)
184
- except json.JSONDecodeError:
185
- raise HTTPException(status_code=400, detail="Invalid JSON body")
186
-
187
- is_streaming = payload.get("stream", False)
188
-
189
- # --- Prepare Upstream Request ---
190
- upstream_headers = {
191
- "Content-Type": request.headers.get("Content-Type", "application/json"),
192
- "Accept": request.headers.get("Accept", "application/json"),
193
- }
194
-
195
- # --- Upstream Authentication (Using Rotated Key) ---
196
- # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
197
- if "freeaichatplayground.com" in OPENAI_API_BASE:
198
- logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
199
- payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
200
- else:
201
- # Default to standard Bearer token authentication for upstream
202
- logger.debug("Using Authorization header for upstream authentication.")
203
- upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
204
-
205
- if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
206
- logger.info("Adding 'Accept: text/event-stream' for streaming request")
207
- upstream_headers["Accept"] = "text/event-stream, application/json"
208
-
209
- logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
210
-
211
- # --- Make Request to Upstream ---
212
- response = None # Define response here to ensure it's available in finally block
213
- try:
214
- req = client.build_request(
215
- "POST",
216
- OPENAI_CHAT_ENDPOINT, # Use the full URL
217
- json=payload,
218
- headers=upstream_headers,
219
- )
220
- response = await client.send(req, stream=True)
221
-
222
- # Check for immediate errors *before* processing body/stream
223
- if response.status_code >= 400:
224
- error_body = await response.aread() # Read error fully
225
- await response.aclose()
226
- logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
227
- try: detail = json.loads(error_body)
228
- except json.JSONDecodeError: detail = error_body.decode()
229
- raise HTTPException(status_code=response.status_code, detail=detail)
230
- response_body = await response.aread()
231
-
232
- # --- Handle Streaming Response ---
233
- if is_streaming:
234
- logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
235
- status_code = response.status_code
236
- if status_code == 201:
237
- status_code = 200
238
- return StreamingResponse(
239
- yield_openai_chunks(response_body), # Generator handles closing response
240
- status_code=status_code,
241
- media_type=response.headers.get("content-type", "text/event-stream"),
242
- )
243
- # --- Handle Non-Streaming Response ---
244
- else:
245
- logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
246
- response_body = await response.aread()
247
- await response.aclose() # Ensure closed
248
- content_type = response.headers.get("content-type", "application/json")
249
- return Response( # Return raw response, FastAPI handles JSON content type
250
- content=response_body,
251
- status_code=response.status_code,
252
- media_type=content_type,
253
- )
254
-
255
- except httpx.TimeoutException as e:
256
- logger.error(f"Request to upstream timed out: {e}")
257
- if response: await response.aclose()
258
- raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
259
- except httpx.RequestError as e:
260
- logger.error(f"Error requesting upstream API: {e}")
261
- if response: await response.aclose()
262
- raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
263
- except HTTPException as e:
264
- # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
265
- if response and not response.is_closed: await response.aclose()
266
- raise e
267
- except Exception as e:
268
- logger.exception("An unexpected error occurred during response processing.")
269
- if response and not response.is_closed: await response.aclose()
270
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
271
-
272
-
273
- # --- Health Check Endpoint ---
274
- @app.get("/health")
275
- async def health_check():
276
- """Simple health check endpoint."""
277
- # Could add checks here, e.g., if keys were loaded
278
- key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
279
- return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
280
-
281
- # --- Main Execution Guard ---
282
- if __name__ == "__main__":
283
- import uvicorn
284
- # Startup checks are implicitly handled by config loading at the top
285
- print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
286
- print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
287
- if UPSTREAM_KEYS_URL:
288
- print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
289
- elif DEFAULT_OPENAI_API_KEY:
290
- print("Using single OPENAI_API_KEY for upstream.")
291
- else:
292
- print("ERROR: No upstream key source configured!") # Should have failed earlier
293
- print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
294
- print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
295
- print("---")
296
-
297
- uvicorn.run(app, host="0.0.0.0", port=7860)
298
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+
4
+ import httpx
5
+ import json
6
+ from fastapi import FastAPI, Request, HTTPException, Response, Depends
7
+ from fastapi.security import APIKeyHeader
8
+ from fastapi.responses import StreamingResponse, JSONResponse
9
+ import logging
10
+ from contextlib import asynccontextmanager
11
+ import typing
12
+ import itertools # For key rotation
13
+ from typing import List, Optional, Dict, Any, Union
14
+
15
+ from trio import sleep
16
+
17
+ # --- Configuration ---
18
+
19
+ # --- Client Authentication (Proxy Access) ---
20
+ # Load Allowed Client API Keys (for clients talking to this proxy)
21
+ ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
22
+ if not ALLOWED_API_KEYS_STR:
23
+ raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
24
+ ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
25
+ if not ALLOWED_KEYS:
26
+ raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
27
+ logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
28
+
29
+ # --- Upstream API Configuration ---
30
+ # URL to fetch upstream API keys from (one key per line)
31
+ UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
32
+
33
+ # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
34
+ # Or required if the upstream target needs a key in a different way sometimes.
35
+ # Let's make it optional now.
36
+ DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
37
+
38
+ # Upstream API Base URL
39
+ OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://models.aixplain.com/api/v1")
40
+ OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
41
+
42
+ if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
43
+ raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
44
+
45
+ # --- Logging ---
46
+ logging.basicConfig(level=logging.INFO)
47
+ logger = logging.getLogger(__name__)
48
+
49
+ # --- Authentication Dependency (Client -> Proxy) ---
50
+ api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
51
+
52
+
53
+ # OpenAI API请求模型
54
+ class ChatMessage:
55
+ role: str
56
+ content: str
57
+
58
+ async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
59
+ """Dependency to verify the client's API key provided to this proxy."""
60
+ if not api_key_header:
61
+ logger.warning("Missing Authorization header from client")
62
+ raise HTTPException(status_code=401, detail="Missing Authorization header")
63
+ parts = api_key_header.split()
64
+ if len(parts) != 2 or parts[0].lower() != "bearer":
65
+ logger.warning(f"Invalid Authorization header format from client.")
66
+ raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
67
+ client_api_key = parts[1]
68
+ if client_api_key not in ALLOWED_KEYS:
69
+ truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
70
+ logger.warning(f"Invalid Client API Key received: {truncated_key}")
71
+ raise HTTPException(status_code=403, detail="Invalid API Key provided")
72
+ logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
73
+ return client_api_key
74
+
75
+ # --- Key Fetching and Rotation Logic ---
76
+ async def fetch_upstream_keys(url: str) -> list[str]:
77
+ """Fetches keys from the given URL, one key per line."""
78
+ keys = []
79
+ try:
80
+ async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
81
+ logger.info(f"Fetching upstream API keys from: {url}")
82
+ response = await client.get(url)
83
+ response.raise_for_status() # Raise exception for 4xx/5xx status codes
84
+ content = response.text
85
+ keys = [line.strip() for line in content.splitlines() if line.strip()]
86
+ logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
87
+ if not keys:
88
+ logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
89
+ return keys
90
+ except httpx.RequestError as e:
91
+ logger.error(f"Error fetching upstream keys from {url}: {e}")
92
+ return [] # Return empty list on fetch error
93
+ except httpx.HTTPStatusError as e:
94
+ logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
95
+ logger.error(f"Response body: {e.response.text}")
96
+ return [] # Return empty list on bad status
97
+
98
+ # --- HTTP Client and Key Iterator Management (Lifespan) ---
99
+ @asynccontextmanager
100
+ async def lifespan(app: FastAPI):
101
+ # --- Initialize Upstream Key Iterator ---
102
+ upstream_keys = []
103
+ if UPSTREAM_KEYS_URL:
104
+ upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
105
+
106
+ if not upstream_keys:
107
+ logger.warning("No upstream keys fetched from URL or URL not provided.")
108
+ if DEFAULT_OPENAI_API_KEY:
109
+ logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
110
+ upstream_keys = [DEFAULT_OPENAI_API_KEY]
111
+ else:
112
+ # Critical failure - no keys available
113
+ logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
114
+ # In a real scenario, you might want a more graceful shutdown or retry mechanism
115
+ # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
116
+ # Or raise an exception here to prevent startup:
117
+ raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
118
+
119
+ # Store keys and create the cycling iterator in app.state
120
+ app.state.upstream_api_keys = upstream_keys
121
+ app.state.key_iterator = itertools.cycle(upstream_keys)
122
+ logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
123
+
124
+ # --- Initialize HTTPX Client ---
125
+ logger.info("Initializing main HTTPX client...")
126
+ timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
127
+ client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
128
+ app.state.http_client = client # Store client in app.state
129
+ logger.info("HTTPX client initialized.")
130
+
131
+ yield # Application runs here
132
+
133
+ # --- Cleanup ---
134
+ logger.info("Closing HTTPX client...")
135
+ await app.state.http_client.aclose()
136
+ logger.info("HTTPX client closed.")
137
+ app.state.upstream_api_keys = [] # Clear keys
138
+ app.state.key_iterator = None
139
+ logger.info("Upstream keys cleared.")
140
+
141
+ # --- FastAPI App ---
142
+ app = FastAPI(lifespan=lifespan)
143
+
144
+ # --- Streaming Helper ---
145
+ async def yield_openai_chunks(response_body):
146
+ """Asynchronously yields chunks from the upstream response stream."""
147
+ # (Content remains the same as before)
148
+ logger.info("Starting to stream chunks from upstream...")
149
+ try:
150
+ resp_json = json.loads(response_body)
151
+
152
+ for choices in resp_json["choices"]:
153
+ if "message" in choices:
154
+ choices["delta"] = choices["message"]
155
+ del choices["message"]
156
+
157
+ yield "data:" + json.dumps(resp_json) + "\n\n"
158
+ yield "data: [DONE]"
159
+ except Exception as e:
160
+ logger.error(f"Error during streaming upstream response: {e}")
161
+ finally:
162
+ logger.info("Upstream streaming response closed.")
163
+
164
+ def format_messages_for_xplain(messages) -> str:
165
+ """格式化消息列表为DeepSider API所需的提示格式"""
166
+ prompt = ""
167
+ for msg in messages:
168
+ role = msg["role"]
169
+ # 将OpenAI的角色映射到DeepSider能理解的格式
170
+ if role == "system":
171
+ # 系统消息放在开头 作为指导
172
+ prompt = f"{msg['content']}\n\n" + prompt
173
+ elif role == "user":
174
+ prompt += f"Human: {msg['content']}\n\n"
175
+ elif role == "assistant":
176
+ content = msg['content']
177
+ prompt += f"Assistant: {msg['content']}\n\n"
178
+
179
+ else:
180
+ # 其他角色按用户处理
181
+ prompt += f"Human ({role}): {msg['content']}\n\n"
182
+
183
+ # 如果最后一个消息不是用户的 添加一个Human前缀引导模型回答
184
+ if messages and messages[-1]["role"] != "user":
185
+ prompt += "Human: "
186
+
187
+ return prompt.strip()
188
+
189
+
190
+ # --- Proxy Endpoint ---
191
+ @app.post("/v1/chat/completions")
192
+ async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
193
+ """
194
+ Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
195
+ Uses rotated keys for upstream authentication.
196
+ """
197
+ client: httpx.AsyncClient = request.app.state.http_client
198
+ key_iterator = request.app.state.key_iterator
199
+
200
+ if not client or not key_iterator:
201
+ logger.error("HTTPX client or Key Iterator not available (app state issue).")
202
+ raise HTTPException(status_code=503, detail="Service temporarily unavailable")
203
+
204
+ # --- Get Next Upstream API Key ---
205
+ try:
206
+ current_upstream_key = next(key_iterator)
207
+ # Log rotation (optional, consider security of logging key info)
208
+ # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
209
+ except StopIteration:
210
+ # This should not happen if lifespan logic is correct and keys were loaded
211
+ logger.error("Upstream key iterator exhausted unexpectedly.")
212
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
213
+ except Exception as e:
214
+ logger.error(f"Unexpected error getting next key: {e}")
215
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
216
+
217
+ # --- Get Request Data ---
218
+ try:
219
+ request_body = await request.body()
220
+ payload = json.loads(request_body)
221
+ except json.JSONDecodeError:
222
+ raise HTTPException(status_code=400, detail="Invalid JSON body")
223
+ type = payload.get("type", "claude")
224
+
225
+ is_streaming = payload.get("stream", False)
226
+
227
+ # --- Prepare Upstream Request ---
228
+ upstream_headers = {
229
+ "Content-Type": request.headers.get("Content-Type", "application/json"),
230
+ "Accept": request.headers.get("Accept", "application/json"),
231
+ }
232
+
233
+ # --- Upstream Authentication (Using Rotated Key) ---
234
+ # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
235
+ if "freeaichatplayground.com" in OPENAI_API_BASE:
236
+ logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
237
+ payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
238
+ else:
239
+ # Default to standard Bearer token authentication for upstream
240
+ logger.debug("Using Authorization header for upstream authentication.")
241
+ upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
242
+ upstream_headers["x-api-key"] = f"{current_upstream_key}" # Use ROTATED key
243
+
244
+ if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
245
+ logger.info("Adding 'Accept: text/event-stream' for streaming request")
246
+ upstream_headers["Accept"] = "text/event-stream, application/json"
247
+
248
+ logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
249
+
250
+
251
+
252
+ # --- Make Request to Upstream ---
253
+ response = None # Define response here to ensure it's available in finally block
254
+ response_body = None
255
+ try:
256
+ if type == "claude":
257
+ newPayload = {
258
+ "text": format_messages_for_xplain(payload["messages"]),
259
+ }
260
+ req = client.build_request("POST","https://models.aixplain.com/api/v1/execute/" + payload["model"],json=newPayload,headers=upstream_headers)
261
+ response = await client.send(req)
262
+ xreqId = response.json()["data"]
263
+
264
+ req = client.build_request("GET",xreqId)
265
+ tryCount = 0
266
+ while tryCount < 100:
267
+ response = await client.send(req)
268
+ tryCount += 1
269
+
270
+ rj = response.json()
271
+ if rj["completed"] == True:
272
+ data = {"id": "123456-456789-123456", "object": "chat.completion.chunk",
273
+ "choices": [{"delta": {"content": rj["data"]}, "index": 0, "finish_reason": None}]}
274
+ response_body = json.dumps(data)
275
+ break
276
+ time.sleep(1)
277
+ else:
278
+ req = client.build_request(
279
+ "POST",
280
+ OPENAI_CHAT_ENDPOINT, # Use the full URL
281
+ json=payload,
282
+ headers=upstream_headers,
283
+ )
284
+ response = await client.send(req, stream=True)
285
+ if response.status_code >= 400:
286
+ error_body = await response.aread() # Read error fully
287
+ await response.aclose()
288
+ logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
289
+ try: detail = json.loads(error_body)
290
+ except json.JSONDecodeError: detail = error_body.decode()
291
+ raise HTTPException(status_code=response.status_code, detail=detail)
292
+ response_body = await response.aread()
293
+ response_body = response_body.decode()
294
+
295
+ # --- Handle Streaming Response ---
296
+ if is_streaming:
297
+ logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
298
+ status_code = response.status_code
299
+ if status_code == 201:
300
+ status_code = 200
301
+ return StreamingResponse(
302
+ yield_openai_chunks(response_body), # Generator handles closing response
303
+ status_code=status_code,
304
+ media_type=response.headers.get("content-type", "text/event-stream"),
305
+ )
306
+ # --- Handle Non-Streaming Response ---
307
+ else:
308
+ logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
309
+ response_body = await response.aread()
310
+ await response.aclose() # Ensure closed
311
+ content_type = response.headers.get("content-type", "application/json")
312
+ return Response( # Return raw response, FastAPI handles JSON content type
313
+ content=response_body,
314
+ status_code=response.status_code,
315
+ media_type=content_type,
316
+ )
317
+
318
+ except httpx.TimeoutException as e:
319
+ logger.error(f"Request to upstream timed out: {e}")
320
+ if response: await response.aclose()
321
+ raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
322
+ except httpx.RequestError as e:
323
+ logger.error(f"Error requesting upstream API: {e}")
324
+ if response: await response.aclose()
325
+ raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
326
+ except HTTPException as e:
327
+ # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
328
+ if response and not response.is_closed: await response.aclose()
329
+ raise e
330
+ except Exception as e:
331
+ logger.exception("An unexpected error occurred during response processing.")
332
+ if response and not response.is_closed: await response.aclose()
333
+ raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
334
+
335
+
336
+ # --- Health Check Endpoint ---
337
+ @app.get("/health")
338
+ async def health_check():
339
+ """Simple health check endpoint."""
340
+ # Could add checks here, e.g., if keys were loaded
341
+ key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
342
+ return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
343
+
344
+ # --- Main Execution Guard ---
345
+ if __name__ == "__main__":
346
+ import uvicorn
347
+ # Startup checks are implicitly handled by config loading at the top
348
+ print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
349
+ print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
350
+ if UPSTREAM_KEYS_URL:
351
+ print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
352
+ elif DEFAULT_OPENAI_API_KEY:
353
+ print("Using single OPENAI_API_KEY for upstream.")
354
+ else:
355
+ print("ERROR: No upstream key source configured!") # Should have failed earlier
356
+ print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
357
+ print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
358
+ print("---")
359
+
360
+ uvicorn.run(app, host="0.0.0.0", port=7860)
361
+