|
|
|
|
|
import os |
|
import re |
|
import random |
|
import string |
|
import uuid |
|
import json |
|
import logging |
|
import asyncio |
|
import time |
|
from collections import defaultdict |
|
from typing import List, Dict, Any, Optional, AsyncGenerator, Union |
|
|
|
from datetime import datetime |
|
|
|
from aiohttp import ClientSession, ClientTimeout, ClientError |
|
from fastapi import FastAPI, HTTPException, Request, Depends, Header |
|
from fastapi.responses import StreamingResponse, JSONResponse, RedirectResponse |
|
from pydantic import BaseModel |
|
|
|
from .blackbox import Blackbox, ImageResponse |
|
from .image import to_data_uri, ImageType |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
|
handlers=[logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEYS = os.getenv('API_KEYS', '').split(',') |
|
RATE_LIMIT = int(os.getenv('RATE_LIMIT', '60')) |
|
AVAILABLE_MODELS = os.getenv('AVAILABLE_MODELS', '') |
|
|
|
if not API_KEYS or API_KEYS == ['']: |
|
logger.error("No API keys found. Please set the API_KEYS environment variable.") |
|
raise Exception("API_KEYS environment variable not set.") |
|
|
|
|
|
if AVAILABLE_MODELS: |
|
AVAILABLE_MODELS = [model.strip() for model in AVAILABLE_MODELS.split(',') if model.strip()] |
|
else: |
|
AVAILABLE_MODELS = [] |
|
|
|
|
|
rate_limit_store = defaultdict(lambda: {"count": 0, "timestamp": time.time()}) |
|
|
|
|
|
CLEANUP_INTERVAL = 60 |
|
RATE_LIMIT_WINDOW = 60 |
|
|
|
async def cleanup_rate_limit_stores(): |
|
""" |
|
Periodically cleans up stale entries in the rate_limit_store to prevent memory bloat. |
|
""" |
|
while True: |
|
current_time = time.time() |
|
ips_to_delete = [ip for ip, value in rate_limit_store.items() if current_time - value["timestamp"] > RATE_LIMIT_WINDOW * 2] |
|
for ip in ips_to_delete: |
|
del rate_limit_store[ip] |
|
logger.debug(f"Cleaned up rate_limit_store for IP: {ip}") |
|
await asyncio.sleep(CLEANUP_INTERVAL) |
|
|
|
async def rate_limiter_per_ip(request: Request): |
|
""" |
|
Rate limiter that enforces a limit based on the client's IP address. |
|
""" |
|
client_ip = request.client.host |
|
current_time = time.time() |
|
|
|
|
|
if current_time - rate_limit_store[client_ip]["timestamp"] > RATE_LIMIT_WINDOW: |
|
rate_limit_store[client_ip] = {"count": 1, "timestamp": current_time} |
|
else: |
|
if rate_limit_store[client_ip]["count"] >= RATE_LIMIT: |
|
logger.warning(f"Rate limit exceeded for IP address: {client_ip}") |
|
raise HTTPException(status_code=429, detail='Rate limit exceeded for IP address | NiansuhAI') |
|
rate_limit_store[client_ip]["count"] += 1 |
|
|
|
async def get_api_key(request: Request, authorization: str = Header(None)) -> str: |
|
""" |
|
Dependency to extract and validate the API key from the Authorization header. |
|
""" |
|
client_ip = request.client.host |
|
if authorization is None or not authorization.startswith('Bearer '): |
|
logger.warning(f"Invalid or missing authorization header from IP: {client_ip}") |
|
raise HTTPException(status_code=401, detail='Invalid authorization header format') |
|
api_key = authorization[7:] |
|
if api_key not in API_KEYS: |
|
logger.warning(f"Invalid API key attempted: {api_key} from IP: {client_ip}") |
|
raise HTTPException(status_code=401, detail='Invalid API key') |
|
return api_key |
|
|
|
|
|
class ModelNotWorkingException(Exception): |
|
def __init__(self, model: str): |
|
self.model = model |
|
self.message = f"The model '{model}' is currently not working. Please try another model or wait for it to be fixed." |
|
super().__init__(self.message) |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
asyncio.create_task(cleanup_rate_limit_stores()) |
|
logger.info("Started rate limit store cleanup task.") |
|
|
|
|
|
@app.middleware("http") |
|
async def security_middleware(request: Request, call_next): |
|
client_ip = request.client.host |
|
|
|
if request.method == "POST" and request.url.path == "/v1/chat/completions": |
|
content_type = request.headers.get("Content-Type") |
|
if content_type != "application/json": |
|
logger.warning(f"Invalid Content-Type from IP: {client_ip} for path: {request.url.path}") |
|
return JSONResponse( |
|
status_code=400, |
|
content={ |
|
"error": { |
|
"message": "Content-Type must be application/json", |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": None |
|
} |
|
}, |
|
) |
|
response = await call_next(request) |
|
return response |
|
|
|
|
|
class Message(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class ChatRequest(BaseModel): |
|
model: str |
|
messages: List[Message] |
|
temperature: Optional[float] = 1.0 |
|
top_p: Optional[float] = 1.0 |
|
n: Optional[int] = 1 |
|
stream: Optional[bool] = False |
|
stop: Optional[Union[str, List[str]]] = None |
|
max_tokens: Optional[int] = None |
|
presence_penalty: Optional[float] = 0.0 |
|
frequency_penalty: Optional[float] = 0.0 |
|
logit_bias: Optional[Dict[str, float]] = None |
|
user: Optional[str] = None |
|
webSearchMode: Optional[bool] = False |
|
image: Optional[str] = None |
|
|
|
class TokenizerRequest(BaseModel): |
|
text: str |
|
|
|
def calculate_estimated_cost(prompt_tokens: int, completion_tokens: int) -> float: |
|
""" |
|
Calculate the estimated cost based on the number of tokens. |
|
Replace the pricing below with your actual pricing model. |
|
""" |
|
|
|
cost_per_token = 0.00000268 |
|
return round((prompt_tokens + completion_tokens) * cost_per_token, 8) |
|
|
|
def create_response(content: str, model: str, finish_reason: Optional[str] = None) -> Dict[str, Any]: |
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion", |
|
"created": int(datetime.now().timestamp()), |
|
"model": model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"message": { |
|
"role": "assistant", |
|
"content": content |
|
}, |
|
"finish_reason": finish_reason |
|
} |
|
], |
|
"usage": None, |
|
} |
|
|
|
@app.post("/v1/chat/completions", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)): |
|
client_ip = req.client.host |
|
|
|
redacted_messages = [{"role": msg.role, "content": "[redacted]"} for msg in request.messages] |
|
|
|
logger.info(f"Received chat completions request from API key: {api_key} | IP: {client_ip} | Model: {request.model} | Messages: {redacted_messages}") |
|
|
|
try: |
|
|
|
if request.model not in Blackbox.models and request.model not in Blackbox.model_aliases: |
|
logger.warning(f"Attempt to use unavailable model: {request.model} from IP: {client_ip}") |
|
raise HTTPException(status_code=400, detail="Requested model is not available.") |
|
|
|
|
|
image_data = None |
|
image_name = None |
|
if request.image: |
|
try: |
|
|
|
image_data = to_data_uri(request.image) |
|
image_name = "uploaded_image" |
|
logger.info(f"Image data received and processed from IP: {client_ip}") |
|
except Exception as e: |
|
logger.error(f"Image processing failed: {e}") |
|
raise HTTPException(status_code=400, detail="Invalid image data provided.") |
|
|
|
|
|
async_generator = Blackbox.create_async_generator( |
|
model=request.model, |
|
messages=[{"role": msg.role, "content": msg.content} for msg in request.messages], |
|
proxy=None, |
|
image=image_data, |
|
image_name=image_name, |
|
webSearchMode=request.webSearchMode |
|
) |
|
|
|
if request.stream: |
|
async def generate(): |
|
try: |
|
assistant_content = "" |
|
async for chunk in async_generator: |
|
if isinstance(chunk, ImageResponse): |
|
|
|
image_markdown = f"\n" |
|
assistant_content += image_markdown |
|
response_chunk = create_response(image_markdown, request.model, finish_reason=None) |
|
else: |
|
assistant_content += chunk |
|
|
|
response_chunk = { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion.chunk", |
|
"created": int(datetime.now().timestamp()), |
|
"model": request.model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"delta": {"content": chunk, "role": "assistant"}, |
|
"finish_reason": None, |
|
} |
|
], |
|
"usage": None, |
|
} |
|
yield f"data: {json.dumps(response_chunk)}\n\n" |
|
|
|
|
|
prompt_tokens = sum(len(msg.content.split()) for msg in request.messages) |
|
completion_tokens = len(assistant_content.split()) |
|
total_tokens = prompt_tokens + completion_tokens |
|
estimated_cost = calculate_estimated_cost(prompt_tokens, completion_tokens) |
|
|
|
final_response = { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion", |
|
"created": int(datetime.now().timestamp()), |
|
"model": request.model, |
|
"choices": [ |
|
{ |
|
"message": { |
|
"role": "assistant", |
|
"content": assistant_content |
|
}, |
|
"finish_reason": "stop", |
|
"index": 0 |
|
} |
|
], |
|
"usage": { |
|
"prompt_tokens": prompt_tokens, |
|
"completion_tokens": completion_tokens, |
|
"total_tokens": total_tokens, |
|
"estimated_cost": estimated_cost |
|
}, |
|
} |
|
yield f"data: {json.dumps(final_response)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
except HTTPException as he: |
|
error_response = {"error": he.detail} |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
except Exception as e: |
|
logger.exception(f"Error during streaming response generation from IP: {client_ip}.") |
|
error_response = {"error": str(e)} |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream") |
|
else: |
|
response_content = "" |
|
async for chunk in async_generator: |
|
if isinstance(chunk, ImageResponse): |
|
response_content += f"\n" |
|
else: |
|
response_content += chunk |
|
|
|
prompt_tokens = sum(len(msg.content.split()) for msg in request.messages) |
|
completion_tokens = len(response_content.split()) |
|
total_tokens = prompt_tokens + completion_tokens |
|
estimated_cost = calculate_estimated_cost(prompt_tokens, completion_tokens) |
|
|
|
logger.info(f"Completed non-streaming response generation for API key: {api_key} | IP: {client_ip}") |
|
|
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion", |
|
"created": int(datetime.now().timestamp()), |
|
"model": request.model, |
|
"choices": [ |
|
{ |
|
"message": { |
|
"role": "assistant", |
|
"content": response_content |
|
}, |
|
"finish_reason": "stop", |
|
"index": 0 |
|
} |
|
], |
|
"usage": { |
|
"prompt_tokens": prompt_tokens, |
|
"completion_tokens": completion_tokens, |
|
"total_tokens": total_tokens, |
|
"estimated_cost": estimated_cost |
|
}, |
|
} |
|
except ModelNotWorkingException as e: |
|
logger.warning(f"Model not working: {e} | IP: {client_ip}") |
|
raise HTTPException(status_code=503, detail=str(e)) |
|
except HTTPException as he: |
|
logger.warning(f"HTTPException: {he.detail} | IP: {client_ip}") |
|
raise he |
|
except Exception as e: |
|
logger.exception(f"An unexpected error occurred while processing the chat completions request from IP: {client_ip}.") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/v1/tokenizer", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def tokenizer(request: TokenizerRequest, req: Request): |
|
client_ip = req.client.host |
|
text = request.text |
|
token_count = len(text.split()) |
|
logger.info(f"Tokenizer requested from IP: {client_ip} | Text length: {len(text)}") |
|
return {"text": text, "tokens": token_count} |
|
|
|
|
|
@app.get("/v1/models", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def get_models(req: Request): |
|
client_ip = req.client.host |
|
logger.info(f"Fetching available models from IP: {client_ip}") |
|
return {"data": [{"id": model, "object": "model"} for model in Blackbox.models]} |
|
|
|
|
|
@app.get("/v1/models/{model}/status", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def model_status(model: str, req: Request): |
|
client_ip = req.client.host |
|
logger.info(f"Model status requested for '{model}' from IP: {client_ip}") |
|
if model in Blackbox.models: |
|
return {"model": model, "status": "available"} |
|
elif model in Blackbox.model_aliases and Blackbox.model_aliases[model] in Blackbox.models: |
|
actual_model = Blackbox.model_aliases[model] |
|
return {"model": actual_model, "status": "available via alias"} |
|
else: |
|
logger.warning(f"Model not found: {model} from IP: {client_ip}") |
|
raise HTTPException(status_code=404, detail="Model not found") |
|
|
|
|
|
@app.get("/v1/health", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def health_check(req: Request): |
|
client_ip = req.client.host |
|
logger.info(f"Health check requested from IP: {client_ip}") |
|
return {"status": "ok"} |
|
|
|
|
|
@app.get("/v1/chat/completions") |
|
async def chat_completions_get(req: Request): |
|
client_ip = req.client.host |
|
logger.info(f"GET request made to /v1/chat/completions from IP: {client_ip}, redirecting to 'about:blank'") |
|
return RedirectResponse(url='about:blank') |
|
|
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
client_ip = request.client.host |
|
logger.error(f"HTTPException: {exc.detail} | Path: {request.url.path} | IP: {client_ip}") |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={ |
|
"error": { |
|
"message": exc.detail, |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": None |
|
} |
|
}, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True) |
|
|