dfa32412 commited on
Commit
baf8eb5
·
verified ·
1 Parent(s): 855967f

Upload 2 files

Browse files
Files changed (2) hide show
  1. app.py +298 -0
  2. requirements.txt +3 -0
app.py ADDED
@@ -0,0 +1,298 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import httpx
3
+ import json
4
+ from fastapi import FastAPI, Request, HTTPException, Response, Depends
5
+ from fastapi.security import APIKeyHeader
6
+ from fastapi.responses import StreamingResponse, JSONResponse
7
+ import logging
8
+ from contextlib import asynccontextmanager
9
+ import typing
10
+ import itertools # For key rotation
11
+ import asyncio # For potential sleep during retry
12
+
13
+ # --- Configuration ---
14
+
15
+ # --- Client Authentication (Proxy Access) ---
16
+ # Load Allowed Client API Keys (for clients talking to this proxy)
17
+ ALLOWED_API_KEYS_STR = os.getenv("ALLOWED_API_KEYS")
18
+ if not ALLOWED_API_KEYS_STR:
19
+ raise ValueError("REQUIRED: ALLOWED_API_KEYS environment variable (comma-separated keys for clients) not set.")
20
+ ALLOWED_KEYS = set(key.strip() for key in ALLOWED_API_KEYS_STR.split(',') if key.strip())
21
+ if not ALLOWED_KEYS:
22
+ raise ValueError("ALLOWED_API_KEYS must contain at least one non-empty key.")
23
+ logging.info(f"Loaded {len(ALLOWED_KEYS)} allowed client API keys.")
24
+
25
+ # --- Upstream API Configuration ---
26
+ # URL to fetch upstream API keys from (one key per line)
27
+ UPSTREAM_KEYS_URL = os.getenv("UPSTREAM_KEYS_URL")
28
+
29
+ # Optional: A single fallback/default upstream key (used if URL fetch fails or isn't provided)
30
+ # Or required if the upstream target needs a key in a different way sometimes.
31
+ # Let's make it optional now.
32
+ DEFAULT_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
33
+
34
+ # Upstream API Base URL
35
+ OPENAI_API_BASE = os.getenv("OPENAI_API_BASE", "https://models.aixplain.com/api/v1")
36
+ OPENAI_CHAT_ENDPOINT = f"{OPENAI_API_BASE.rstrip('/')}/chat/completions"
37
+
38
+ if not UPSTREAM_KEYS_URL and not DEFAULT_OPENAI_API_KEY:
39
+ raise ValueError("REQUIRED: Either UPSTREAM_KEYS_URL or OPENAI_API_KEY environment variable must be set for upstream authentication.")
40
+
41
+ # --- Logging ---
42
+ logging.basicConfig(level=logging.INFO)
43
+ logger = logging.getLogger(__name__)
44
+
45
+ # --- Authentication Dependency (Client -> Proxy) ---
46
+ api_key_header_auth = APIKeyHeader(name="Authorization", auto_error=False)
47
+ async def verify_api_key(api_key_header: typing.Optional[str] = Depends(api_key_header_auth)):
48
+ """Dependency to verify the client's API key provided to this proxy."""
49
+ if not api_key_header:
50
+ logger.warning("Missing Authorization header from client")
51
+ raise HTTPException(status_code=401, detail="Missing Authorization header")
52
+ parts = api_key_header.split()
53
+ if len(parts) != 2 or parts[0].lower() != "bearer":
54
+ logger.warning(f"Invalid Authorization header format from client.")
55
+ raise HTTPException(status_code=401, detail="Invalid Authorization header format. Use 'Bearer YOUR_KEY'.")
56
+ client_api_key = parts[1]
57
+ if client_api_key not in ALLOWED_KEYS:
58
+ truncated_key = client_api_key[:4] + "..." + client_api_key[-4:] if len(client_api_key) > 8 else client_api_key
59
+ logger.warning(f"Invalid Client API Key received: {truncated_key}")
60
+ raise HTTPException(status_code=403, detail="Invalid API Key provided")
61
+ logger.info(f"Client authenticated successfully (Key ending: ...{client_api_key[-4:]})")
62
+ return client_api_key
63
+
64
+ # --- Key Fetching and Rotation Logic ---
65
+ async def fetch_upstream_keys(url: str) -> list[str]:
66
+ """Fetches keys from the given URL, one key per line."""
67
+ keys = []
68
+ try:
69
+ async with httpx.AsyncClient(timeout=15.0) as client: # Use a temporary client
70
+ logger.info(f"Fetching upstream API keys from: {url}")
71
+ response = await client.get(url)
72
+ response.raise_for_status() # Raise exception for 4xx/5xx status codes
73
+ content = response.text
74
+ keys = [line.strip() for line in content.splitlines() if line.strip()]
75
+ logger.info(f"Successfully fetched {len(keys)} upstream API keys.")
76
+ if not keys:
77
+ logger.warning(f"No valid keys found at {url}. The response was empty or contained only whitespace.")
78
+ return keys
79
+ except httpx.RequestError as e:
80
+ logger.error(f"Error fetching upstream keys from {url}: {e}")
81
+ return [] # Return empty list on fetch error
82
+ except httpx.HTTPStatusError as e:
83
+ logger.error(f"Error fetching upstream keys from {url}: Status {e.response.status_code}")
84
+ logger.error(f"Response body: {e.response.text}")
85
+ return [] # Return empty list on bad status
86
+
87
+ # --- HTTP Client and Key Iterator Management (Lifespan) ---
88
+ @asynccontextmanager
89
+ async def lifespan(app: FastAPI):
90
+ # --- Initialize Upstream Key Iterator ---
91
+ upstream_keys = []
92
+ if UPSTREAM_KEYS_URL:
93
+ upstream_keys = await fetch_upstream_keys(UPSTREAM_KEYS_URL)
94
+
95
+ if not upstream_keys:
96
+ logger.warning("No upstream keys fetched from URL or URL not provided.")
97
+ if DEFAULT_OPENAI_API_KEY:
98
+ logger.info("Using fallback OPENAI_API_KEY for upstream authentication.")
99
+ upstream_keys = [DEFAULT_OPENAI_API_KEY]
100
+ else:
101
+ # Critical failure - no keys available
102
+ logger.critical("FATAL: No upstream API keys available (URL fetch failed/empty and no fallback OPENAI_API_KEY). Exiting.")
103
+ # In a real scenario, you might want a more graceful shutdown or retry mechanism
104
+ # For simplicity here, we'll let it proceed but log critically. The requests will likely fail later.
105
+ # Or raise an exception here to prevent startup:
106
+ raise RuntimeError("Failed to load any upstream API keys. Cannot start service.")
107
+
108
+ # Store keys and create the cycling iterator in app.state
109
+ app.state.upstream_api_keys = upstream_keys
110
+ app.state.key_iterator = itertools.cycle(upstream_keys)
111
+ logger.info(f"Initialized key rotation with {len(upstream_keys)} keys.")
112
+
113
+ # --- Initialize HTTPX Client ---
114
+ logger.info("Initializing main HTTPX client...")
115
+ timeout = httpx.Timeout(5.0, read=180.0, write=5.0, connect=5.0)
116
+ client = httpx.AsyncClient(timeout=timeout) # No base_url needed if using full URLs
117
+ app.state.http_client = client # Store client in app.state
118
+ logger.info("HTTPX client initialized.")
119
+
120
+ yield # Application runs here
121
+
122
+ # --- Cleanup ---
123
+ logger.info("Closing HTTPX client...")
124
+ await app.state.http_client.aclose()
125
+ logger.info("HTTPX client closed.")
126
+ app.state.upstream_api_keys = [] # Clear keys
127
+ app.state.key_iterator = None
128
+ logger.info("Upstream keys cleared.")
129
+
130
+ # --- FastAPI App ---
131
+ app = FastAPI(lifespan=lifespan)
132
+
133
+ # --- Streaming Helper ---
134
+ async def yield_openai_chunks(response_body):
135
+ """Asynchronously yields chunks from the upstream response stream."""
136
+ # (Content remains the same as before)
137
+ logger.info("Starting to stream chunks from upstream...")
138
+ try:
139
+ resp_json = json.loads(response_body.decode())
140
+
141
+ for choices in resp_json["choices"]:
142
+ choices["delta"] = choices["message"]
143
+ del choices["message"]
144
+
145
+ yield "data:" + json.dumps(resp_json) + "\n\n"
146
+ yield "data: [DONE]"
147
+ except Exception as e:
148
+ logger.error(f"Error during streaming upstream response: {e}")
149
+ finally:
150
+ logger.info("Upstream streaming response closed.")
151
+
152
+
153
+ # --- Proxy Endpoint ---
154
+ @app.post("/v1/chat/completions")
155
+ async def proxy_openai_chat(request: Request, _client_key: str = Depends(verify_api_key)): # Use Depends for auth
156
+ """
157
+ Proxies requests to the configured Chat Completions endpoint AFTER verifying client API key.
158
+ Uses rotated keys for upstream authentication.
159
+ """
160
+ client: httpx.AsyncClient = request.app.state.http_client
161
+ key_iterator = request.app.state.key_iterator
162
+
163
+ if not client or not key_iterator:
164
+ logger.error("HTTPX client or Key Iterator not available (app state issue).")
165
+ raise HTTPException(status_code=503, detail="Service temporarily unavailable")
166
+
167
+ # --- Get Next Upstream API Key ---
168
+ try:
169
+ current_upstream_key = next(key_iterator)
170
+ # Log rotation (optional, consider security of logging key info)
171
+ # logger.info(f"Using upstream key ending: ...{current_upstream_key[-4:]}")
172
+ except StopIteration:
173
+ # This should not happen if lifespan logic is correct and keys were loaded
174
+ logger.error("Upstream key iterator exhausted unexpectedly.")
175
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
176
+ except Exception as e:
177
+ logger.error(f"Unexpected error getting next key: {e}")
178
+ raise HTTPException(status_code=500, detail="Internal Server Error: Key rotation failed")
179
+
180
+ # --- Get Request Data ---
181
+ try:
182
+ request_body = await request.body()
183
+ payload = json.loads(request_body)
184
+ except json.JSONDecodeError:
185
+ raise HTTPException(status_code=400, detail="Invalid JSON body")
186
+
187
+ is_streaming = payload.get("stream", False)
188
+
189
+ # --- Prepare Upstream Request ---
190
+ upstream_headers = {
191
+ "Content-Type": request.headers.get("Content-Type", "application/json"),
192
+ "Accept": request.headers.get("Accept", "application/json"),
193
+ }
194
+
195
+ # --- Upstream Authentication (Using Rotated Key) ---
196
+ # Decide based on the target API (e.g., freeaichatplayground vs standard OpenAI)
197
+ if "freeaichatplayground.com" in OPENAI_API_BASE:
198
+ logger.debug("Using payload apiKey for upstream authentication (freeaichatplayground specific).")
199
+ payload["apiKey"] = current_upstream_key # Inject ROTATED key into payload
200
+ else:
201
+ # Default to standard Bearer token authentication for upstream
202
+ logger.debug("Using Authorization header for upstream authentication.")
203
+ upstream_headers["Authorization"] = f"Bearer {current_upstream_key}" # Use ROTATED key
204
+
205
+ if is_streaming and "text/event-stream" not in upstream_headers["Accept"]:
206
+ logger.info("Adding 'Accept: text/event-stream' for streaming request")
207
+ upstream_headers["Accept"] = "text/event-stream, application/json"
208
+
209
+ logger.info(f"Forwarding request to {OPENAI_CHAT_ENDPOINT} (Streaming: {is_streaming})")
210
+
211
+ # --- Make Request to Upstream ---
212
+ response = None # Define response here to ensure it's available in finally block
213
+ try:
214
+ req = client.build_request(
215
+ "POST",
216
+ OPENAI_CHAT_ENDPOINT, # Use the full URL
217
+ json=payload,
218
+ headers=upstream_headers,
219
+ )
220
+ response = await client.send(req, stream=True)
221
+
222
+ # Check for immediate errors *before* processing body/stream
223
+ if response.status_code >= 400:
224
+ error_body = await response.aread() # Read error fully
225
+ await response.aclose()
226
+ logger.error(f"Upstream API returned error: {response.status_code} Key ending: ...{current_upstream_key[-4:]} Body: {error_body.decode()}")
227
+ try: detail = json.loads(error_body)
228
+ except json.JSONDecodeError: detail = error_body.decode()
229
+ raise HTTPException(status_code=response.status_code, detail=detail)
230
+ response_body = await response.aread()
231
+
232
+ # --- Handle Streaming Response ---
233
+ if is_streaming:
234
+ logger.info(f"Received OK streaming response from upstream (Status: {response.status_code}). Piping to client.")
235
+ status_code = response.status_code
236
+ if status_code == 201:
237
+ status_code = 200
238
+ return StreamingResponse(
239
+ yield_openai_chunks(response_body), # Generator handles closing response
240
+ status_code=status_code,
241
+ media_type=response.headers.get("content-type", "text/event-stream"),
242
+ )
243
+ # --- Handle Non-Streaming Response ---
244
+ else:
245
+ logger.info(f"Received OK non-streaming response from upstream (Status: {response.status_code}). Reading full body.")
246
+ response_body = await response.aread()
247
+ await response.aclose() # Ensure closed
248
+ content_type = response.headers.get("content-type", "application/json")
249
+ return Response( # Return raw response, FastAPI handles JSON content type
250
+ content=response_body,
251
+ status_code=response.status_code,
252
+ media_type=content_type,
253
+ )
254
+
255
+ except httpx.TimeoutException as e:
256
+ logger.error(f"Request to upstream timed out: {e}")
257
+ if response: await response.aclose()
258
+ raise HTTPException(status_code=504, detail="Request to upstream API timed out.")
259
+ except httpx.RequestError as e:
260
+ logger.error(f"Error requesting upstream API: {e}")
261
+ if response: await response.aclose()
262
+ raise HTTPException(status_code=502, detail=f"Error contacting upstream API: {e}")
263
+ except HTTPException as e:
264
+ # Re-raise FastAPI HTTPExceptions (like the 4xx check above)
265
+ if response and not response.is_closed: await response.aclose()
266
+ raise e
267
+ except Exception as e:
268
+ logger.exception("An unexpected error occurred during response processing.")
269
+ if response and not response.is_closed: await response.aclose()
270
+ raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
271
+
272
+
273
+ # --- Health Check Endpoint ---
274
+ @app.get("/health")
275
+ async def health_check():
276
+ """Simple health check endpoint."""
277
+ # Could add checks here, e.g., if keys were loaded
278
+ key_count = len(app.state.upstream_api_keys) if hasattr(app.state, 'upstream_api_keys') else 0
279
+ return {"status": "ok", "upstream_keys_loaded": key_count > 0, "key_count": key_count}
280
+
281
+ # --- Main Execution Guard ---
282
+ if __name__ == "__main__":
283
+ import uvicorn
284
+ # Startup checks are implicitly handled by config loading at the top
285
+ print("--- Starting FastAPI OpenAI Proxy with Custom Auth & Key Rotation ---")
286
+ print(f"Proxying requests to: {OPENAI_CHAT_ENDPOINT}")
287
+ if UPSTREAM_KEYS_URL:
288
+ print(f"Fetching upstream keys from: {UPSTREAM_KEYS_URL}")
289
+ elif DEFAULT_OPENAI_API_KEY:
290
+ print("Using single OPENAI_API_KEY for upstream.")
291
+ else:
292
+ print("ERROR: No upstream key source configured!") # Should have failed earlier
293
+ print(f"Clients must provide a valid API key in 'Authorization: Bearer <key>' header.")
294
+ print(f"Number of allowed client keys configured: {len(ALLOWED_KEYS)}")
295
+ print("---")
296
+
297
+ uvicorn.run(app, host="0.0.0.0", port=7860)
298
+
requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ fastapi==0.115.12
2
+ httpx==0.28.1
3
+ uvicorn==0.34.2