|
import os |
|
import re |
|
import random |
|
import string |
|
import uuid |
|
import json |
|
import logging |
|
import asyncio |
|
from aiohttp import ClientSession, ClientTimeout, ClientError |
|
from fastapi import FastAPI, HTTPException, Request, Depends, Header, status |
|
from fastapi.responses import StreamingResponse, JSONResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel, Field, validator |
|
from typing import List, Dict, Any, Optional, Union, AsyncGenerator |
|
from datetime import datetime |
|
from slowapi import Limiter, _rate_limit_exceeded_handler |
|
from slowapi.util import get_remote_address |
|
from slowapi.errors import RateLimitExceeded |
|
import tiktoken |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
|
handlers=[ |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI(title="OpenAI-Compatible API") |
|
|
|
|
|
origins = [ |
|
"*", |
|
] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
RATE_LIMIT = os.getenv("RATE_LIMIT", "60/minute") |
|
limiter = Limiter(key_func=get_remote_address, default_limits=[RATE_LIMIT]) |
|
app.state.limiter = limiter |
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) |
|
|
|
|
|
API_KEYS = set(api_key.strip() for api_key in os.getenv("API_KEYS", "").split(",") if api_key.strip()) |
|
|
|
async def get_api_key(authorization: Optional[str] = Header(None)): |
|
""" |
|
Dependency to validate API Key from the Authorization header. |
|
""" |
|
if authorization is None: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Authorization header missing", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
parts = authorization.split() |
|
if parts[0].lower() != "bearer" or len(parts) != 2: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Invalid authorization header format", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
token = parts[1] |
|
if token not in API_KEYS: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="Invalid API Key", |
|
headers={"WWW-Authenticate": "Bearer"}, |
|
) |
|
return token |
|
|
|
|
|
class ModelNotWorkingException(Exception): |
|
def __init__(self, model: str): |
|
self.model = model |
|
self.message = f"The model '{model}' is currently not working. Please try another model or wait for it to be fixed." |
|
super().__init__(self.message) |
|
|
|
|
|
class ImageResponse: |
|
def __init__(self, url: str, alt: str): |
|
self.url = url |
|
self.alt = alt |
|
|
|
def to_data_uri(image: Any) -> str: |
|
return "data:image/png;base64,..." |
|
|
|
|
|
def count_tokens(messages: List[Dict[str, Any]], model: str) -> int: |
|
""" |
|
Counts the number of tokens in the messages using tiktoken. |
|
Adjust the encoding based on the model. |
|
""" |
|
try: |
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
except: |
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
tokens = 0 |
|
for message in messages: |
|
if isinstance(message['content'], list): |
|
for content_part in message['content']: |
|
if content_part.get('type') == 'text': |
|
tokens += len(encoding.encode(content_part['text'])) |
|
elif content_part.get('type') == 'image_url': |
|
tokens += len(encoding.encode(content_part['image_url']['url'])) |
|
else: |
|
tokens += len(encoding.encode(message['content'])) |
|
return tokens |
|
|
|
|
|
class Blackbox: |
|
url = "https://www.blackbox.ai" |
|
api_endpoint = os.getenv("EXTERNAL_API_ENDPOINT", "https://www.blackbox.ai/api/chat") |
|
working = True |
|
supports_stream = True |
|
supports_system_message = True |
|
supports_message_history = True |
|
|
|
default_model = 'blackboxai' |
|
image_models = ['ImageGeneration'] |
|
models = [ |
|
default_model, |
|
'blackboxai-pro', |
|
"llama-3.1-8b", |
|
'llama-3.1-70b', |
|
'llama-3.1-405b', |
|
'gpt-4o', |
|
'gemini-pro', |
|
'gemini-1.5-flash', |
|
'claude-sonnet-3.5', |
|
'PythonAgent', |
|
'JavaAgent', |
|
'JavaScriptAgent', |
|
'HTMLAgent', |
|
'GoogleCloudAgent', |
|
'AndroidDeveloper', |
|
'SwiftDeveloper', |
|
'Next.jsAgent', |
|
'MongoDBAgent', |
|
'PyTorchAgent', |
|
'ReactAgent', |
|
'XcodeAgent', |
|
'AngularJSAgent', |
|
*image_models, |
|
'Niansuh', |
|
] |
|
|
|
agentMode = { |
|
'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, |
|
'Niansuh': {'mode': True, 'id': "NiansuhAIk1HgESy", 'name': "Niansuh"}, |
|
} |
|
trendingAgentMode = { |
|
"blackboxai": {}, |
|
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, |
|
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, |
|
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, |
|
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, |
|
'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"}, |
|
'PythonAgent': {'mode': True, 'id': "Python Agent"}, |
|
'JavaAgent': {'mode': True, 'id': "Java Agent"}, |
|
'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"}, |
|
'HTMLAgent': {'mode': True, 'id': "HTML Agent"}, |
|
'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"}, |
|
'AndroidDeveloper': {'mode': True, 'id': "Android Developer"}, |
|
'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"}, |
|
'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"}, |
|
'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"}, |
|
'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"}, |
|
'ReactAgent': {'mode': True, 'id': "React Agent"}, |
|
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"}, |
|
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"}, |
|
} |
|
|
|
userSelectedModel = { |
|
"gpt-4o": "gpt-4o", |
|
"gemini-pro": "gemini-pro", |
|
'claude-sonnet-3.5': "claude-sonnet-3.5", |
|
} |
|
|
|
model_prefixes = { |
|
'gpt-4o': '@GPT-4o', |
|
'gemini-pro': '@Gemini-PRO', |
|
'claude-sonnet-3.5': '@Claude-Sonnet-3.5', |
|
'PythonAgent': '@Python Agent', |
|
'JavaAgent': '@Java Agent', |
|
'JavaScriptAgent': '@JavaScript Agent', |
|
'HTMLAgent': '@HTML Agent', |
|
'GoogleCloudAgent': '@Google Cloud Agent', |
|
'AndroidDeveloper': '@Android Developer', |
|
'SwiftDeveloper': '@Swift Developer', |
|
'Next.jsAgent': '@Next.js Agent', |
|
'MongoDBAgent': '@MongoDB Agent', |
|
'PyTorchAgent': '@PyTorch Agent', |
|
'ReactAgent': '@React Agent', |
|
'XcodeAgent': '@Xcode Agent', |
|
'AngularJSAgent': '@AngularJS Agent', |
|
'blackboxai-pro': '@BLACKBOXAI-PRO', |
|
'ImageGeneration': '@Image Generation', |
|
'Niansuh': '@Niansuh', |
|
} |
|
|
|
model_referers = { |
|
"blackboxai": f"{url}/?model=blackboxai", |
|
"gpt-4o": f"{url}/?model=gpt-4o", |
|
"gemini-pro": f"{url}/?model=gemini-pro", |
|
"claude-sonnet-3.5": f"{url}/?model=claude-sonnet-3.5" |
|
} |
|
|
|
model_aliases = { |
|
"gemini-flash": "gemini-1.5-flash", |
|
"claude-3.5-sonnet": "claude-sonnet-3.5", |
|
"flux": "ImageGeneration", |
|
"niansuh": "Niansuh", |
|
} |
|
|
|
@classmethod |
|
def get_model(cls, model: str) -> str: |
|
if model in cls.models: |
|
return model |
|
elif model in cls.userSelectedModel: |
|
return model |
|
elif model in cls.model_aliases: |
|
return cls.model_aliases[model] |
|
else: |
|
return cls.default_model |
|
|
|
@classmethod |
|
async def create_async_generator( |
|
cls, |
|
model: str, |
|
messages: List[Dict[str, Any]], |
|
proxy: Optional[str] = None, |
|
image: Any = None, |
|
image_name: Optional[str] = None, |
|
webSearchMode: bool = False, |
|
**kwargs |
|
) -> AsyncGenerator[Any, None]: |
|
model = cls.get_model(model) |
|
logger.info(f"Selected model: {model}") |
|
|
|
if not cls.working or model not in cls.models: |
|
logger.error(f"Model {model} is not working or not supported.") |
|
raise ModelNotWorkingException(model) |
|
|
|
headers = { |
|
"accept": "*/*", |
|
"accept-language": "en-US,en;q=0.9", |
|
"cache-control": "no-cache", |
|
"content-type": "application/json", |
|
"origin": cls.url, |
|
"pragma": "no-cache", |
|
"priority": "u=1, i", |
|
"referer": cls.model_referers.get(model, cls.url), |
|
"sec-ch-ua": '"Chromium";v="129", "Not=A?Brand";v="8"', |
|
"sec-ch-ua-mobile": "?0", |
|
"sec-ch-ua-platform": '"Linux"', |
|
"sec-fetch-dest": "empty", |
|
"sec-fetch-mode": "cors", |
|
"sec-fetch-site": "same-origin", |
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36", |
|
} |
|
|
|
if model in cls.model_prefixes: |
|
prefix = cls.model_prefixes[model] |
|
if messages and isinstance(messages[0]['content'], list): |
|
|
|
for content_part in messages[0]['content']: |
|
if content_part.get('type') == 'text' and not content_part['text'].startswith(prefix): |
|
logger.debug(f"Adding prefix '{prefix}' to the first text message.") |
|
content_part['text'] = f"{prefix} {content_part['text']}" |
|
break |
|
elif messages and isinstance(messages[0]['content'], str) and not messages[0]['content'].startswith(prefix): |
|
messages[0]['content'] = f"{prefix} {messages[0]['content']}" |
|
|
|
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) |
|
|
|
if messages: |
|
last_message = messages[-1] |
|
if isinstance(last_message['content'], list): |
|
for content_part in last_message['content']: |
|
if content_part.get('type') == 'text': |
|
content_part['role'] = 'user' |
|
else: |
|
last_message['id'] = random_id |
|
last_message['role'] = 'user' |
|
|
|
if image is not None: |
|
|
|
|
|
pass |
|
|
|
data = { |
|
"messages": messages, |
|
"id": random_id, |
|
"previewToken": None, |
|
"userId": None, |
|
"codeModelMode": True, |
|
"agentMode": {}, |
|
"trendingAgentMode": {}, |
|
"isMicMode": False, |
|
"userSystemPrompt": None, |
|
"maxTokens": int(os.getenv("MAX_TOKENS", "4096")), |
|
"playgroundTopP": 0.9, |
|
"playgroundTemperature": 0.5, |
|
"isChromeExt": False, |
|
"githubToken": None, |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"mobileClient": False, |
|
"userSelectedModel": None, |
|
"webSearchMode": webSearchMode, |
|
} |
|
|
|
if model in cls.agentMode: |
|
data["agentMode"] = cls.agentMode[model] |
|
elif model in cls.trendingAgentMode: |
|
data["trendingAgentMode"] = cls.trendingAgentMode[model] |
|
elif model in cls.userSelectedModel: |
|
data["userSelectedModel"] = cls.userSelectedModel[model] |
|
logger.info(f"Sending request to {cls.api_endpoint} with data: {data}") |
|
|
|
timeout = ClientTimeout(total=60) |
|
retry_attempts = 10 |
|
|
|
for attempt in range(retry_attempts): |
|
try: |
|
async with ClientSession(headers=headers, timeout=timeout) as session: |
|
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response: |
|
response.raise_for_status() |
|
logger.info(f"Received response with status {response.status}") |
|
if model == 'ImageGeneration': |
|
response_text = await response.text() |
|
url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text) |
|
if url_match: |
|
image_url = url_match.group(0) |
|
logger.info(f"Image URL found: {image_url}") |
|
yield ImageResponse(image_url, alt=messages[-1]['content']) |
|
else: |
|
logger.error("Image URL not found in the response.") |
|
raise Exception("Image URL not found in the response") |
|
else: |
|
async for chunk in response.content.iter_chunks(): |
|
if chunk: |
|
decoded_chunk = chunk.decode(errors='ignore') |
|
decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk) |
|
if decoded_chunk.strip(): |
|
yield decoded_chunk |
|
break |
|
except ClientError as ce: |
|
logger.error(f"Client error occurred: {ce}. Retrying attempt {attempt + 1}/{retry_attempts}") |
|
if attempt == retry_attempts - 1: |
|
raise HTTPException(status_code=502, detail="Error communicating with the external API.") |
|
except asyncio.TimeoutError: |
|
logger.error(f"Request timed out. Retrying attempt {attempt + 1}/{retry_attempts}") |
|
if attempt == retry_attempts - 1: |
|
raise HTTPException(status_code=504, detail="External API request timed out.") |
|
except Exception as e: |
|
logger.error(f"Unexpected error: {e}. Retrying attempt {attempt + 1}/{retry_attempts}") |
|
if attempt == retry_attempts - 1: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
class TextContent(BaseModel): |
|
type: str = Field(..., description="Type of content, e.g., 'text'.") |
|
text: str = Field(..., description="The text content.") |
|
|
|
class ImageURLContent(BaseModel): |
|
type: str = Field(..., description="Type of content, e.g., 'image_url'.") |
|
image_url: Dict[str, str] = Field(..., description="Dictionary containing the image URL.") |
|
|
|
Content = Union[TextContent, ImageURLContent] |
|
|
|
class Message(BaseModel): |
|
role: str = Field(..., description="The role of the message author.") |
|
content: Union[str, List[Content]] = Field(..., description="The content of the message. Can be a string or a list of content parts.") |
|
|
|
@validator('content', pre=True) |
|
def validate_content(cls, v): |
|
if isinstance(v, list): |
|
return [Content(**item) for item in v] |
|
elif isinstance(v, str): |
|
return v |
|
else: |
|
raise ValueError("Content must be either a string or a list of content parts.") |
|
|
|
class ChatRequest(BaseModel): |
|
model: str = Field(..., description="ID of the model to use.") |
|
messages: List[Message] = Field(..., description="A list of messages comprising the conversation.") |
|
stream: Optional[bool] = Field(False, description="Whether to stream the response.") |
|
webSearchMode: Optional[bool] = Field(False, description="Whether to enable web search mode.") |
|
|
|
class ChatCompletionChoice(BaseModel): |
|
index: int |
|
delta: Dict[str, Any] |
|
finish_reason: Optional[str] = None |
|
|
|
class ChatCompletionResponse(BaseModel): |
|
id: str |
|
object: str |
|
created: int |
|
model: str |
|
choices: List[ChatCompletionChoice] |
|
usage: Optional[Dict[str, int]] = None |
|
|
|
|
|
def create_response(content: str, model: str, finish_reason: Optional[str] = None) -> Dict[str, Any]: |
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion.chunk", |
|
"created": int(datetime.now().timestamp()), |
|
"model": model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"delta": {"content": content, "role": "assistant"}, |
|
"finish_reason": finish_reason, |
|
} |
|
], |
|
"usage": None, |
|
} |
|
|
|
|
|
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse) |
|
@limiter.limit("60/minute") |
|
async def chat_completions( |
|
chat_request: ChatRequest, |
|
request: Request, |
|
api_key: str = Depends(get_api_key) |
|
): |
|
logger.info(f"Received chat completions request: {chat_request}") |
|
try: |
|
|
|
processed_messages = [] |
|
for msg in chat_request.messages: |
|
if isinstance(msg.content, list): |
|
|
|
combined_content = [] |
|
for part in msg.content: |
|
if isinstance(part, TextContent): |
|
combined_content.append({"type": part.type, "text": part.text}) |
|
elif isinstance(part, ImageURLContent): |
|
combined_content.append({"type": part.type, "image_url": part.image_url}) |
|
processed_messages.append({"role": msg.role, "content": combined_content}) |
|
else: |
|
processed_messages.append({"role": msg.role, "content": msg.content}) |
|
|
|
prompt_tokens = count_tokens(processed_messages, chat_request.model) |
|
|
|
async_generator = Blackbox.create_async_generator( |
|
model=chat_request.model, |
|
messages=processed_messages, |
|
image=None, |
|
image_name=None, |
|
webSearchMode=chat_request.webSearchMode |
|
) |
|
|
|
if chat_request.stream: |
|
async def generate(): |
|
try: |
|
completion_tokens = 0 |
|
async for chunk in async_generator: |
|
if isinstance(chunk, ImageResponse): |
|
image_markdown = f"" |
|
response_chunk = create_response(image_markdown, chat_request.model) |
|
yield f"data: {json.dumps(response_chunk)}\n\n" |
|
completion_tokens += len(image_markdown.split()) |
|
else: |
|
response_chunk = create_response(chunk, chat_request.model) |
|
yield f"data: {json.dumps(response_chunk)}\n\n" |
|
completion_tokens += len(chunk.split()) |
|
|
|
|
|
yield "data: [DONE]\n\n" |
|
except HTTPException as he: |
|
error_response = {"error": he.detail} |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
except Exception as e: |
|
logger.exception("Error during streaming response generation.") |
|
error_response = {"error": str(e)} |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream") |
|
else: |
|
response_content = "" |
|
completion_tokens = 0 |
|
async for chunk in async_generator: |
|
if isinstance(chunk, ImageResponse): |
|
response_content += f"\n" |
|
completion_tokens += len(f"\n".split()) |
|
else: |
|
response_content += chunk |
|
completion_tokens += len(chunk.split()) |
|
|
|
total_tokens = prompt_tokens + completion_tokens |
|
|
|
logger.info("Completed non-streaming response generation.") |
|
return ChatCompletionResponse( |
|
id=f"chatcmpl-{uuid.uuid4()}", |
|
object="chat.completion", |
|
created=int(datetime.now().timestamp()), |
|
model=chat_request.model, |
|
choices=[ |
|
ChatCompletionChoice( |
|
index=0, |
|
delta={"content": response_content, "role": "assistant"}, |
|
finish_reason="stop" |
|
) |
|
], |
|
usage={ |
|
"prompt_tokens": prompt_tokens, |
|
"completion_tokens": completion_tokens, |
|
"total_tokens": total_tokens |
|
} |
|
) |
|
except ModelNotWorkingException as e: |
|
logger.warning(f"Model not working: {e}") |
|
raise HTTPException(status_code=503, detail=str(e)) |
|
except HTTPException as he: |
|
logger.warning(f"HTTPException: {he.detail}") |
|
raise he |
|
except Exception as e: |
|
logger.exception("An unexpected error occurred while processing the chat completions request.") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/v1/models", response_model=Dict[str, List[Dict[str, str]]]) |
|
@limiter.limit("60/minute") |
|
async def get_models( |
|
request: Request, |
|
api_key: str = Depends(get_api_key) |
|
): |
|
logger.info("Fetching available models.") |
|
return {"data": [{"id": model} for model in Blackbox.models]} |
|
|
|
|
|
@app.get("/v1/models/{model}/status", response_model=Dict[str, str]) |
|
@limiter.limit("60/minute") |
|
async def model_status( |
|
model: str, |
|
request: Request, |
|
api_key: str = Depends(get_api_key) |
|
): |
|
"""Check if a specific model is available.""" |
|
if model in Blackbox.models: |
|
return {"model": model, "status": "available"} |
|
elif model in Blackbox.model_aliases: |
|
actual_model = Blackbox.model_aliases[model] |
|
return {"model": actual_model, "status": "available via alias"} |
|
else: |
|
raise HTTPException(status_code=404, detail="Model not found") |
|
|
|
|
|
@app.get("/v1/health", response_model=Dict[str, str]) |
|
@limiter.limit("60/minute") |
|
async def health_check( |
|
request: Request |
|
): |
|
"""Health check endpoint to verify the service is running.""" |
|
return {"status": "ok"} |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|