|
import os |
|
import re |
|
import random |
|
import string |
|
import uuid |
|
import json |
|
import logging |
|
import asyncio |
|
import time |
|
from collections import defaultdict |
|
from typing import List, Dict, Any, Optional, Union, AsyncGenerator |
|
|
|
from aiohttp import ClientSession, ClientResponseError, ClientTimeout |
|
from fastapi import FastAPI, HTTPException, Request, Depends, Header |
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
from pydantic import BaseModel |
|
from datetime import datetime |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
|
handlers=[logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEYS = os.getenv('API_KEYS', '').split(',') |
|
RATE_LIMIT = int(os.getenv('RATE_LIMIT', '60')) |
|
|
|
if not API_KEYS or API_KEYS == ['']: |
|
logger.error("No API keys found. Please set the API_KEYS environment variable.") |
|
raise Exception("API_KEYS environment variable not set.") |
|
|
|
|
|
rate_limit_store = defaultdict(lambda: {"count": 0, "timestamp": time.time()}) |
|
|
|
|
|
CLEANUP_INTERVAL = 60 |
|
RATE_LIMIT_WINDOW = 60 |
|
|
|
|
|
class ImageResponseModel(BaseModel): |
|
images: str |
|
alt: str |
|
|
|
def strip_markdown(text: str) -> str: |
|
""" |
|
Strips markdown syntax from the given text to ensure plain text. |
|
""" |
|
|
|
text = re.sub(r'(\*\*|__)(.*?)\1', r'\2', text) |
|
|
|
text = re.sub(r'(\*|_)(.*?)\1', r'\2', text) |
|
|
|
text = re.sub(r'`(.*?)`', r'\1', text) |
|
|
|
text = re.sub(r'\[(.*?)\]\((.*?)\)', r'\1', text) |
|
|
|
text = re.sub(r'!\[(.*?)\]\((.*?)\)', r'\1', text) |
|
|
|
text = re.sub(r'#+\s+(.*)', r'\1', text) |
|
|
|
text = re.sub(r'[*_`>#]', '', text) |
|
return text |
|
|
|
|
|
class Blackbox: |
|
label = "Blackbox AI" |
|
url = "https://www.blackbox.ai" |
|
api_endpoint = "https://www.blackbox.ai/api/chat" |
|
working = True |
|
supports_gpt_4 = True |
|
supports_stream = True |
|
supports_system_message = True |
|
supports_message_history = True |
|
|
|
default_model = 'blackboxai' |
|
image_models = ['ImageGeneration'] |
|
models = [ |
|
default_model, |
|
'blackboxai-pro', |
|
*image_models, |
|
"llama-3.1-8b", |
|
'llama-3.1-70b', |
|
'llama-3.1-405b', |
|
'gpt-4o', |
|
'gemini-pro', |
|
'gemini-1.5-flash', |
|
'claude-sonnet-3.5', |
|
'PythonAgent', |
|
'JavaAgent', |
|
'JavaScriptAgent', |
|
'HTMLAgent', |
|
'GoogleCloudAgent', |
|
'AndroidDeveloper', |
|
'SwiftDeveloper', |
|
'Next.jsAgent', |
|
'MongoDBAgent', |
|
'PyTorchAgent', |
|
'ReactAgent', |
|
'XcodeAgent', |
|
'AngularJSAgent', |
|
] |
|
|
|
agentMode = { |
|
'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, |
|
} |
|
|
|
trendingAgentMode = { |
|
"blackboxai": {}, |
|
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, |
|
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, |
|
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, |
|
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, |
|
'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"}, |
|
'PythonAgent': {'mode': True, 'id': "Python Agent"}, |
|
'JavaAgent': {'mode': True, 'id': "Java Agent"}, |
|
'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"}, |
|
'HTMLAgent': {'mode': True, 'id': "HTML Agent"}, |
|
'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"}, |
|
'AndroidDeveloper': {'mode': True, 'id': "Android Developer"}, |
|
'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"}, |
|
'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"}, |
|
'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"}, |
|
'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"}, |
|
'ReactAgent': {'mode': True, 'id': "React Agent"}, |
|
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"}, |
|
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"}, |
|
} |
|
|
|
userSelectedModel = { |
|
"gpt-4o": "gpt-4o", |
|
"gemini-pro": "gemini-pro", |
|
'claude-sonnet-3.5': "claude-sonnet-3.5", |
|
} |
|
|
|
model_prefixes = { |
|
'gpt-4o': '@GPT-4o', |
|
'gemini-pro': '@Gemini-PRO', |
|
'claude-sonnet-3.5': '@Claude-Sonnet-3.5', |
|
'PythonAgent': '@Python Agent', |
|
'JavaAgent': '@Java Agent', |
|
'JavaScriptAgent': '@JavaScript Agent', |
|
'HTMLAgent': '@HTML Agent', |
|
'GoogleCloudAgent': '@Google Cloud Agent', |
|
'AndroidDeveloper': '@Android Developer', |
|
'SwiftDeveloper': '@Swift Developer', |
|
'Next.jsAgent': '@Next.js Agent', |
|
'MongoDBAgent': '@MongoDB Agent', |
|
'PyTorchAgent': '@PyTorch Agent', |
|
'ReactAgent': '@React Agent', |
|
'XcodeAgent': '@Xcode Agent', |
|
'AngularJSAgent': '@AngularJS Agent', |
|
'blackboxai-pro': '@BLACKBOXAI-PRO', |
|
'ImageGeneration': '@Image Generation', |
|
} |
|
|
|
model_referers = { |
|
"blackboxai": "/?model=blackboxai", |
|
"gpt-4o": "/?model=gpt-4o", |
|
"gemini-pro": "/?model=gemini-pro", |
|
"claude-sonnet-3.5": "/?model=claude-sonnet-3.5", |
|
"ImageGeneration": "/?model=ImageGeneration", |
|
"PythonAgent": "/?model=PythonAgent", |
|
"JavaAgent": "/?model=JavaAgent", |
|
"JavaScriptAgent": "/?model=JavaScriptAgent", |
|
"HTMLAgent": "/?model=HTMLAgent", |
|
"GoogleCloudAgent": "/?model=GoogleCloudAgent", |
|
"AndroidDeveloper": "/?model=AndroidDeveloper", |
|
"SwiftDeveloper": "/?model=SwiftDeveloper", |
|
"Next.jsAgent": "/?model=Next.jsAgent", |
|
"MongoDBAgent": "/?model=MongoDBAgent", |
|
"PyTorchAgent": "/?model=PyTorchAgent", |
|
"ReactAgent": "/?model=ReactAgent", |
|
"XcodeAgent": "/?model=XcodeAgent", |
|
"AngularJSAgent": "/?model=AngularJSAgent", |
|
} |
|
|
|
model_aliases = { |
|
"gemini-flash": "gemini-1.5-flash", |
|
"claude-3.5-sonnet": "claude-sonnet-3.5", |
|
"flux": "ImageGeneration", |
|
} |
|
|
|
@classmethod |
|
def get_model(cls, model: str) -> str: |
|
if model in cls.models: |
|
return model |
|
elif model in cls.model_aliases: |
|
return cls.model_aliases[model] |
|
else: |
|
return cls.default_model |
|
|
|
@staticmethod |
|
def generate_random_string(length: int = 7) -> str: |
|
characters = string.ascii_letters + string.digits |
|
return ''.join(random.choices(characters, k=length)) |
|
|
|
@staticmethod |
|
def generate_next_action() -> str: |
|
return uuid.uuid4().hex |
|
|
|
@staticmethod |
|
def generate_next_router_state_tree() -> str: |
|
router_state = [ |
|
"", |
|
{ |
|
"children": [ |
|
"(chat)", |
|
{ |
|
"children": [ |
|
"__PAGE__", |
|
{} |
|
] |
|
} |
|
] |
|
}, |
|
None, |
|
None, |
|
True |
|
] |
|
return json.dumps(router_state) |
|
|
|
@staticmethod |
|
def clean_response(text: str) -> str: |
|
pattern = r'^\$\@\$v=undefined-rv1\$\@\$' |
|
cleaned_text = re.sub(pattern, '', text) |
|
|
|
cleaned_text = strip_markdown(cleaned_text) |
|
return cleaned_text |
|
|
|
@classmethod |
|
async def generate_response( |
|
cls, |
|
model: str, |
|
messages: List[Dict[str, str]], |
|
proxy: Optional[str] = None, |
|
**kwargs |
|
) -> str: |
|
model = cls.get_model(model) |
|
chat_id = cls.generate_random_string() |
|
next_action = cls.generate_next_action() |
|
next_router_state_tree = cls.generate_next_router_state_tree() |
|
|
|
agent_mode = cls.agentMode.get(model, {}) |
|
trending_agent_mode = cls.trendingAgentMode.get(model, {}) |
|
|
|
prefix = cls.model_prefixes.get(model, "") |
|
|
|
formatted_prompt = "" |
|
for message in messages: |
|
role = message.get('role', '').capitalize() |
|
content = message.get('content', '') |
|
if role and content: |
|
formatted_prompt += f"{role}: {content}\n" |
|
|
|
if prefix: |
|
formatted_prompt = f"{prefix} {formatted_prompt}".strip() |
|
|
|
referer_path = cls.model_referers.get(model, f"/?model={model}") |
|
referer_url = f"{cls.url}{referer_path}" |
|
|
|
common_headers = { |
|
'accept': '*/*', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'cache-control': 'no-cache', |
|
'origin': cls.url, |
|
'pragma': 'no-cache', |
|
'priority': 'u=1, i', |
|
'sec-ch-ua': '"Chromium";v="129", "Not=A?Brand";v="8"', |
|
'sec-ch-ua-mobile': '?0', |
|
'sec-ch-ua-platform': '"Linux"', |
|
'sec-fetch-dest': 'empty', |
|
'sec-fetch-mode': 'cors', |
|
'sec-fetch-site': 'same-origin', |
|
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) ' |
|
'AppleWebKit/537.36 (KHTML, like Gecko) ' |
|
'Chrome/129.0.0.0 Safari/537.36' |
|
} |
|
|
|
headers_api_chat = { |
|
'Content-Type': 'application/json', |
|
'Referer': referer_url |
|
} |
|
headers_api_chat_combined = {**common_headers, **headers_api_chat} |
|
|
|
payload_api_chat = { |
|
"messages": [ |
|
{ |
|
"id": chat_id, |
|
"content": formatted_prompt, |
|
"role": "user" |
|
} |
|
], |
|
"id": chat_id, |
|
"previewToken": None, |
|
"userId": None, |
|
"codeModelMode": True, |
|
"agentMode": agent_mode, |
|
"trendingAgentMode": trending_agent_mode, |
|
"isMicMode": False, |
|
"userSystemPrompt": None, |
|
"maxTokens": 1024, |
|
"playgroundTopP": 0.9, |
|
"playgroundTemperature": 0.5, |
|
"isChromeExt": False, |
|
"githubToken": None, |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"mobileClient": False, |
|
"webSearchMode": False, |
|
"userSelectedModel": cls.userSelectedModel.get(model, model) |
|
} |
|
|
|
async with ClientSession(headers=common_headers, timeout=ClientTimeout(total=60)) as session: |
|
try: |
|
async with session.post( |
|
cls.api_endpoint, |
|
headers=headers_api_chat_combined, |
|
json=payload_api_chat, |
|
proxy=proxy |
|
) as response_api_chat: |
|
response_api_chat.raise_for_status() |
|
|
|
async for data in response_api_chat.content.iter_chunked(1024): |
|
decoded_data = data.decode('utf-8') |
|
cleaned_response = cls.clean_response(decoded_data) |
|
if model in cls.image_models: |
|
match = re.search(r'!\[.*?\]\((https?://[^\)]+)\)', cleaned_response) |
|
if match: |
|
image_url = match.group(1) |
|
image_response = ImageResponseModel(images=image_url, alt="Generated Image") |
|
return image_response.dict() |
|
else: |
|
return cleaned_response |
|
else: |
|
if '$~~~$' in cleaned_response: |
|
final_response = cleaned_response.split('$~~~$')[0].strip() |
|
else: |
|
final_response = cleaned_response |
|
|
|
return final_response |
|
except ClientResponseError as e: |
|
error_text = f"Error {e.status}: {e.message}" |
|
try: |
|
error_response = await e.response.text() |
|
cleaned_error = cls.clean_response(error_response) |
|
error_text += f" - {cleaned_error}" |
|
except Exception: |
|
pass |
|
return error_text |
|
except Exception as e: |
|
return f"Unexpected error during /api/chat request: {str(e)}" |
|
|
|
@classmethod |
|
async def create_async_generator( |
|
cls, |
|
model: str, |
|
messages: List[Dict[str, str]], |
|
proxy: Optional[str] = None, |
|
websearch: bool = False, |
|
**kwargs |
|
) -> AsyncGenerator[str, None]: |
|
""" |
|
Creates an asynchronous generator for streaming responses from Blackbox AI. |
|
|
|
Parameters: |
|
model (str): Model to use for generating responses. |
|
messages (List[Dict[str, str]]): Message history. |
|
proxy (Optional[str]): Proxy URL, if needed. |
|
websearch (bool): Enables or disables web search mode. |
|
**kwargs: Additional keyword arguments. |
|
|
|
Yields: |
|
str: Segments of the generated response. |
|
""" |
|
model = cls.get_model(model) |
|
|
|
chat_id = cls.generate_random_string() |
|
next_action = cls.generate_next_action() |
|
next_router_state_tree = cls.generate_next_router_state_tree() |
|
|
|
agent_mode = cls.agentMode.get(model, {}) |
|
trending_agent_mode = cls.trendingAgentMode.get(model, {}) |
|
|
|
prefix = cls.model_prefixes.get(model, "") |
|
|
|
formatted_prompt = "" |
|
for message in messages: |
|
role = message.get('role', '').capitalize() |
|
content = message.get('content', '') |
|
if role and content: |
|
formatted_prompt += f"{role}: {content}\n" |
|
|
|
if prefix: |
|
formatted_prompt = f"{prefix} {formatted_prompt}".strip() |
|
|
|
referer_path = cls.model_referers.get(model, f"/?model={model}") |
|
referer_url = f"{cls.url}{referer_path}" |
|
|
|
common_headers = { |
|
'accept': '*/*', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'cache-control': 'no-cache', |
|
'origin': cls.url, |
|
'pragma': 'no-cache', |
|
'priority': 'u=1, i', |
|
'sec-ch-ua': '"Chromium";v="129", "Not=A?Brand";v="8"', |
|
'sec-ch-ua-mobile': '?0', |
|
'sec-ch-ua-platform': '"Linux"', |
|
'sec-fetch-dest': 'empty', |
|
'sec-fetch-mode': 'cors', |
|
'sec-fetch-site': 'same-origin', |
|
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) ' |
|
'AppleWebKit/537.36 (KHTML, like Gecko) ' |
|
'Chrome/129.0.0.0 Safari/537.36' |
|
} |
|
|
|
headers_api_chat = { |
|
'Content-Type': 'application/json', |
|
'Referer': referer_url |
|
} |
|
headers_api_chat_combined = {**common_headers, **headers_api_chat} |
|
|
|
payload_api_chat = { |
|
"messages": [ |
|
{ |
|
"id": chat_id, |
|
"content": formatted_prompt, |
|
"role": "user" |
|
} |
|
], |
|
"id": chat_id, |
|
"previewToken": None, |
|
"userId": None, |
|
"codeModelMode": True, |
|
"agentMode": agent_mode, |
|
"trendingAgentMode": trending_agent_mode, |
|
"isMicMode": False, |
|
"userSystemPrompt": None, |
|
"maxTokens": 1024, |
|
"playgroundTopP": 0.9, |
|
"playgroundTemperature": 0.5, |
|
"isChromeExt": False, |
|
"githubToken": None, |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"mobileClient": False, |
|
"webSearchMode": websearch, |
|
"userSelectedModel": cls.userSelectedModel.get(model, model) |
|
} |
|
|
|
async with ClientSession(headers=common_headers, timeout=ClientTimeout(total=60)) as session: |
|
try: |
|
async with session.post( |
|
cls.api_endpoint, |
|
headers=headers_api_chat_combined, |
|
json=payload_api_chat, |
|
proxy=proxy |
|
) as response_api_chat: |
|
response_api_chat.raise_for_status() |
|
|
|
async for data in response_api_chat.content.iter_any(): |
|
decoded_data = data.decode('utf-8') |
|
cleaned_response = cls.clean_response(decoded_data) |
|
if model in cls.image_models: |
|
match = re.search(r'!\[.*?\]\((https?://[^\)]+)\)', cleaned_response) |
|
if match: |
|
image_url = match.group(1) |
|
image_response = ImageResponseModel(images=image_url, alt="Generated Image") |
|
yield f"Image URL: {image_response.images}\n" |
|
else: |
|
yield cleaned_response |
|
else: |
|
if '$~~~$' in cleaned_response: |
|
final_response = cleaned_response.split('$~~~$')[0].strip() |
|
else: |
|
final_response = cleaned_response |
|
|
|
yield f"{final_response}\n" |
|
except ClientResponseError as e: |
|
error_text = f"Error {e.status}: {e.message}" |
|
try: |
|
error_response = await e.response.text() |
|
cleaned_error = cls.clean_response(error_response) |
|
error_text += f" - {cleaned_error}" |
|
except Exception: |
|
pass |
|
yield error_text |
|
except Exception as e: |
|
yield f"Unexpected error during /api/chat request: {str(e)}" |
|
|
|
|
|
class ModelNotWorkingException(Exception): |
|
def __init__(self, model: str): |
|
self.model = model |
|
self.message = f"The model '{model}' is currently not working. Please try another model or wait for it to be fixed." |
|
super().__init__(self.message) |
|
|
|
async def cleanup_rate_limit_stores(): |
|
""" |
|
Periodically cleans up stale entries in the rate_limit_store to prevent memory bloat. |
|
""" |
|
while True: |
|
current_time = time.time() |
|
ips_to_delete = [ip for ip, value in rate_limit_store.items() if current_time - value["timestamp"] > RATE_LIMIT_WINDOW * 2] |
|
for ip in ips_to_delete: |
|
del rate_limit_store[ip] |
|
logger.debug(f"Cleaned up rate_limit_store for IP: {ip}") |
|
await asyncio.sleep(CLEANUP_INTERVAL) |
|
|
|
async def rate_limiter_per_ip(request: Request): |
|
""" |
|
Rate limiter that enforces a limit based on the client's IP address. |
|
""" |
|
client_ip = request.client.host |
|
current_time = time.time() |
|
|
|
|
|
if current_time - rate_limit_store[client_ip]["timestamp"] > RATE_LIMIT_WINDOW: |
|
rate_limit_store[client_ip] = {"count": 1, "timestamp": current_time} |
|
else: |
|
if rate_limit_store[client_ip]["count"] >= RATE_LIMIT: |
|
logger.warning(f"Rate limit exceeded for IP address: {client_ip}") |
|
raise HTTPException(status_code=429, detail='Rate limit exceeded for IP address | NiansuhAI') |
|
rate_limit_store[client_ip]["count"] += 1 |
|
|
|
async def get_api_key(request: Request, authorization: str = Header(None)) -> str: |
|
""" |
|
Dependency to extract and validate the API key from the Authorization header. |
|
""" |
|
client_ip = request.client.host |
|
if authorization is None or not authorization.startswith('Bearer '): |
|
logger.warning(f"Invalid or missing authorization header from IP: {client_ip}") |
|
raise HTTPException(status_code=401, detail='Invalid authorization header format') |
|
api_key = authorization[7:] |
|
if api_key not in API_KEYS: |
|
logger.warning(f"Invalid API key attempted: {api_key} from IP: {client_ip}") |
|
raise HTTPException(status_code=401, detail='Invalid API key') |
|
return api_key |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
asyncio.create_task(cleanup_rate_limit_stores()) |
|
logger.info("Started rate limit store cleanup task.") |
|
|
|
|
|
@app.middleware("http") |
|
async def security_middleware(request: Request, call_next): |
|
client_ip = request.client.host |
|
|
|
if request.method == "POST" and request.url.path == "/v1/chat/completions": |
|
content_type = request.headers.get("Content-Type") |
|
if content_type != "application/json": |
|
logger.warning(f"Invalid Content-Type from IP: {client_ip} for path: {request.url.path}") |
|
return JSONResponse( |
|
status_code=400, |
|
content={ |
|
"error": { |
|
"message": "Content-Type must be application/json", |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": None |
|
} |
|
}, |
|
) |
|
response = await call_next(request) |
|
return response |
|
|
|
|
|
class Message(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class ChatRequest(BaseModel): |
|
model: str |
|
messages: List[Message] |
|
temperature: Optional[float] = 1.0 |
|
top_p: Optional[float] = 1.0 |
|
n: Optional[int] = 1 |
|
max_tokens: Optional[int] = None |
|
presence_penalty: Optional[float] = 0.0 |
|
frequency_penalty: Optional[float] = 0.0 |
|
logit_bias: Optional[Dict[str, float]] = None |
|
user: Optional[str] = None |
|
stream: Optional[bool] = False |
|
|
|
@app.post("/v1/chat/completions", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)): |
|
client_ip = req.client.host |
|
|
|
redacted_messages = [{"role": msg.role, "content": "[redacted]"} for msg in request.messages] |
|
|
|
logger.info(f"Received chat completions request from API key: {api_key} | IP: {client_ip} | Model: {request.model} | Messages: {redacted_messages} | Stream: {request.stream}") |
|
|
|
try: |
|
|
|
if request.model not in Blackbox.models and request.model not in Blackbox.model_aliases: |
|
logger.warning(f"Attempt to use unavailable model: {request.model} from IP: {client_ip}") |
|
raise HTTPException(status_code=400, detail="Requested model is not available.") |
|
|
|
if request.stream: |
|
|
|
async def content_generator(): |
|
async for chunk in Blackbox.create_async_generator( |
|
model=request.model, |
|
messages=[{"role": msg.role, "content": msg.content} for msg in request.messages], |
|
proxy=None, |
|
websearch=False |
|
): |
|
yield chunk |
|
|
|
logger.info(f"Initiating streaming response for API key: {api_key} | IP: {client_ip}") |
|
return StreamingResponse(content_generator(), media_type='text/plain') |
|
else: |
|
|
|
response_content = await Blackbox.generate_response( |
|
model=request.model, |
|
messages=[{"role": msg.role, "content": msg.content} for msg in request.messages], |
|
temperature=request.temperature, |
|
max_tokens=request.max_tokens |
|
) |
|
|
|
logger.info(f"Completed response generation for API key: {api_key} | IP: {client_ip}") |
|
return { |
|
"content": response_content |
|
} |
|
except ModelNotWorkingException as e: |
|
logger.warning(f"Model not working: {e} | IP: {client_ip}") |
|
raise HTTPException(status_code=503, detail=str(e)) |
|
except HTTPException as he: |
|
logger.warning(f"HTTPException: {he.detail} | IP: {client_ip}") |
|
raise he |
|
except Exception as e: |
|
logger.exception(f"An unexpected error occurred while processing the chat completions request from IP: {client_ip}.") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/v1/models", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def get_models(req: Request): |
|
client_ip = req.client.host |
|
logger.info(f"Fetching available models from IP: {client_ip}") |
|
return {"data": [{"id": model, "object": "model"} for model in Blackbox.models]} |
|
|
|
|
|
@app.get("/v1/health", dependencies=[Depends(rate_limiter_per_ip)]) |
|
async def health_check(req: Request): |
|
client_ip = req.client.host |
|
logger.info(f"Health check requested from IP: {client_ip}") |
|
return {"status": "ok"} |
|
|
|
|
|
@app.exception_handler(HTTPException) |
|
async def http_exception_handler(request: Request, exc: HTTPException): |
|
client_ip = request.client.host |
|
logger.error(f"HTTPException: {exc.detail} | Path: {request.url.path} | IP: {client_ip}") |
|
return JSONResponse( |
|
status_code=exc.status_code, |
|
content={ |
|
"error": { |
|
"message": exc.detail, |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": None |
|
} |
|
}, |
|
) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|