# Simple in-memory rate limiter based solely on IP addresses async def rate_limiter_per_ip(request: Request): client_ip = request.client.host current_time = time.time() # Initialize or update the count and timestamp if current_time - rate_limit_store[client_ip]["timestamp"] > RATE_LIMIT_WINDOW: rate_limit_store[client_ip] = {"count": 1, "timestamp": current_time} else: if rate_limit_store[client_ip]["count"] >= RATE_LIMIT: logger.warning(f"Rate limit exceeded for IP address: {client_ip}") raise HTTPException(status_code=429, detail='Rate limit exceeded for IP address') rate_limit_store[client_ip]["count"] += 1 CLEANUP_INTERVAL = 60 # seconds RATE_LIMIT_WINDOW = 60 # seconds class Blackbox: label = "Blackbox AI" url = "https://www.blackbox.ai" api_endpoint = "https://www.blackbox.ai/api/chat" working = True supports_gpt_4 = True supports_stream = True supports_system_message = True supports_message_history = True default_model = 'blackboxai' image_models = ['ImageGeneration'] models = [ default_model, 'blackboxai-pro', *image_models, "llama-3.1-8b", 'llama-3.1-70b', 'llama-3.1-405b', 'gpt-4o', 'gemini-pro', 'gemini-1.5-flash', 'claude-sonnet-3.5', 'PythonAgent', 'JavaAgent', 'JavaScriptAgent', 'HTMLAgent', 'GoogleCloudAgent', 'AndroidDeveloper', 'SwiftDeveloper', 'Next.jsAgent', 'MongoDBAgent', 'PyTorchAgent', 'ReactAgent', 'XcodeAgent', 'AngularJSAgent', ] agentMode = { 'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, } trendingAgentMode = { "blackboxai": {}, "gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, "llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, 'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, 'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, 'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"}, 'PythonAgent': {'mode': True, 'id': "Python Agent"}, 'JavaAgent': {'mode': True, 'id': "Java Agent"}, 'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"}, 'HTMLAgent': {'mode': True, 'id': "HTML Agent"}, 'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"}, 'AndroidDeveloper': {'mode': True, 'id': "Android Developer"}, 'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"}, 'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"}, 'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"}, 'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"}, 'ReactAgent': {'mode': True, 'id': "React Agent"}, 'XcodeAgent': {'mode': True, 'id': "Xcode Agent"}, 'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"}, } userSelectedModel = { "gpt-4o": "gpt-4o", "gemini-pro": "gemini-pro", 'claude-sonnet-3.5': "claude-sonnet-3.5", } model_prefixes = { 'gpt-4o': '@GPT-4o', 'gemini-pro': '@Gemini-PRO', 'claude-sonnet-3.5': '@Claude-Sonnet-3.5', 'PythonAgent': '@Python Agent', 'JavaAgent': '@Java Agent', 'JavaScriptAgent': '@JavaScript Agent', 'HTMLAgent': '@HTML Agent', 'GoogleCloudAgent': '@Google Cloud Agent', 'AndroidDeveloper': '@Android Developer', 'SwiftDeveloper': '@Swift Developer', 'Next.jsAgent': '@Next.js Agent', 'MongoDBAgent': '@MongoDB Agent', 'PyTorchAgent': '@PyTorch Agent', 'ReactAgent': '@React Agent', 'XcodeAgent': '@Xcode Agent', 'AngularJSAgent': '@AngularJS Agent', 'blackboxai-pro': '@BLACKBOXAI-PRO', 'ImageGeneration': '@Image Generation', } model_referers = { "blackboxai": "/?model=blackboxai", "gpt-4o": "/?model=gpt-4o", "gemini-pro": "/?model=gemini-pro", "claude-sonnet-3.5": "/?model=claude-sonnet-3.5" } model_aliases = { "gemini-flash": "gemini-1.5-flash", "claude-3.5-sonnet": "claude-sonnet-3.5", "flux": "ImageGeneration", } @classmethod def get_model(cls, model: str) -> str: if model in cls.models: return model elif model in cls.model_aliases: return cls.model_aliases[model] else: return cls.default_model @staticmethod def generate_random_string(length: int = 7) -> str: characters = string.ascii_letters + string.digits return ''.join(random.choices(characters, k=length)) @staticmethod def generate_next_action() -> str: return uuid.uuid4().hex @staticmethod def generate_next_router_state_tree() -> str: router_state = [ "", { "children": [ "(chat)", { "children": [ "__PAGE__", {} ] } ] }, None, None, True ] return json.dumps(router_state) @staticmethod def clean_response(text: str) -> str: pattern = r'^\$\@\$v=undefined-rv1\$\@\$' cleaned_text = re.sub(pattern, '', text) return cleaned_text @classmethod async def create_async_generator( cls, model: str, messages: List[Dict[str, str]], proxy: Optional[str] = None, websearch: bool = False, **kwargs ) -> AsyncGenerator[Union[str, Dict[str, Any]], None]: """ Creates an asynchronous generator for streaming responses from Blackbox AI. """ model = cls.get_model(model) chat_id = cls.generate_random_string() next_action = cls.generate_next_action() next_router_state_tree = cls.generate_next_router_state_tree() agent_mode = cls.agentMode.get(model, {}) trending_agent_mode = cls.trendingAgentMode.get(model, {}) prefix = cls.model_prefixes.get(model, "") formatted_prompt = "" for message in messages: role = message.get('role', '').capitalize() content = message.get('content', '') if role and content: formatted_prompt += f"{role}: {content}\n" if prefix: formatted_prompt = f"{prefix} {formatted_prompt}".strip() referer_path = cls.model_referers.get(model, f"/?model={model}") referer_url = f"{cls.url}{referer_path}" common_headers = { 'accept': '*/*', 'accept-language': 'en-US,en;q=0.9', 'cache-control': 'no-cache', 'origin': cls.url, 'pragma': 'no-cache', 'priority': 'u=1, i', 'sec-ch-ua': '"Chromium";v="129", "Not=A?Brand";v="8"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Linux"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/129.0.0.0 Safari/537.36' } headers_api_chat = { 'Content-Type': 'application/json', 'Referer': referer_url } headers_api_chat_combined = {**common_headers, **headers_api_chat} payload_api_chat = { "messages": [ { "id": chat_id, "content": formatted_prompt, "role": "user" } ], "id": chat_id, "previewToken": None, "userId": None, "codeModelMode": True, "agentMode": agent_mode, "trendingAgentMode": trending_agent_mode, "isMicMode": False, "userSystemPrompt": None, "maxTokens": 1024, "playgroundTopP": 0.9, "playgroundTemperature": 0.5, "isChromeExt": False, "githubToken": None, "clickedAnswer2": False, "clickedAnswer3": False, "clickedForceWebSearch": False, "visitFromDelta": False, "mobileClient": False, "webSearchMode": websearch, "userSelectedModel": cls.userSelectedModel.get(model, model) } async with ClientSession(headers=common_headers) as session: try: async with session.post( cls.api_endpoint, headers=headers_api_chat_combined, json=payload_api_chat, proxy=proxy ) as response_api_chat: response_api_chat.raise_for_status() text = await response_api_chat.text() cleaned_response = cls.clean_response(text) if model in cls.image_models: match = re.search(r'!\[.*?\]\((https?://[^\)]+)\)', cleaned_response) if match: image_url = match.group(1) yield {"type": "image", "url": image_url, "alt": "Generated Image"} else: yield cleaned_response else: if websearch: match = re.search(r'\$~~~\$(.*?)\$~~~\$', cleaned_response, re.DOTALL) if match: source_part = match.group(1).strip() answer_part = cleaned_response[match.end():].strip() try: sources = json.loads(source_part) source_formatted = "**Source:**\n" for item in sources: title = item.get('title', 'No Title') link = item.get('link', '#') position = item.get('position', '') source_formatted += f"{position}. [{title}]({link})\n" final_response = f"{answer_part}\n\n{source_formatted}" except json.JSONDecodeError: final_response = f"{answer_part}\n\nSource information is unavailable." else: final_response = cleaned_response else: if '$~~~$' in cleaned_response: final_response = cleaned_response.split('$~~~$')[0].strip() else: final_response = cleaned_response yield final_response except ClientResponseError as e: error_text = f"Error {e.status}: {e.message}" try: error_response = await e.response.text() cleaned_error = cls.clean_response(error_response) error_text += f" - {cleaned_error}" except Exception: pass yield error_text except Exception as e: yield f"Unexpected error during /api/chat request: {str(e)}" # FastAPI app setup app = FastAPI() # Add the cleanup task when the app starts @app.on_event("startup") async def startup_event(): asyncio.create_task(cleanup_rate_limit_stores()) logger.info("Started rate limit store cleanup task.") # Middleware to enhance security and enforce Content-Type for specific endpoints @app.middleware("http") async def security_middleware(request: Request, call_next): client_ip = request.client.host # Enforce that POST requests to /v1/chat/completions must have Content-Type: application/json if request.method == "POST" and request.url.path == "/v1/chat/completions": content_type = request.headers.get("Content-Type") if content_type != "application/json": logger.warning(f"Invalid Content-Type from IP: {client_ip} for path: {request.url.path}") return JSONResponse( status_code=400, content={ "error": { "message": "Content-Type must be application/json", "type": "invalid_request_error", "param": None, "code": None } }, ) response = await call_next(request) return response # Request Models class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: List[Message] temperature: Optional[float] = 1.0 top_p: Optional[float] = 1.0 n: Optional[int] = 1 max_tokens: Optional[int] = None presence_penalty: Optional[float] = 0.0 frequency_penalty: Optional[float] = 0.0 logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None @app.post("/v1/chat/completions", dependencies=[Depends(rate_limiter_per_ip: Request)]) async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)): client_ip = req.client.host # Redact user messages only for logging purposes redacted_messages = [{"role": msg.role, "content": "[redacted]"} for msg in request.messages] logger.info(f"Received chat completions request from API key: {api_key} | IP: {client_ip} | Model: {request.model} | Messages: {redacted_messages}") try: # Validate that the requested model is available if request.model not in Blackbox.models and request.model not in Blackbox.model_aliases: logger.warning(f"Attempt to use unavailable model: {request.model} from IP: {client_ip}") raise HTTPException(status_code=400, detail="Requested model is not available.") # Process the request with actual message content, but don't log it response_content = await Blackbox.create_async_generator( model=request.model, messages=[{"role": msg.role, "content": msg.content} for msg in request.messages], temperature=request.temperature, max_tokens=request.max_tokens ) logger.info(f"Completed response generation for API key: {api_key} | IP: {client_ip}") return { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": request.model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": response_content }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": sum(len(msg.content.split()) for msg in request.messages), "completion_tokens": len(response_content.split()), "total_tokens": sum(len(msg.content.split()) for msg in request.messages) + len(response_content.split()) }, } except ModelNotWorkingException as e: logger.warning(f"Model not working: {e} | IP: {client_ip}") raise HTTPException(status_code=503, detail=str(e)) except HTTPException as he: logger.warning(f"HTTPException: {he.detail} | IP: {client_ip}") raise he except Exception as e: logger.exception(f"An unexpected error occurred while processing the chat completions request from IP: {client_ip}.") raise HTTPException(status_code=500, detail=str(e)) # Endpoint: GET /v1/models @app.get("/v1/models", dependencies=[Depends(rate_limiter_per_ip)]) async def get_models(req: Request): client_ip = req.client.host logger.info(f"Fetching available models from IP: {client_ip}") return {"data": [{"id": model, "object": "model"} for model in Blackbox.models]} # Endpoint: GET /v1/health @app.get("/v1/health", dependencies=[Depends(rate_limiter_per_ip)]) async def health_check(req: Request): client_ip = req.client.host logger.info(f"Health check requested from IP: {client_ip}") return {"status": "ok"} # Custom exception handler to match OpenAI's error format @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): client_ip = request.client.host logger.error(f"HTTPException: {exc.detail} | Path: {request.url.path} | IP: {client_ip}") return JSONResponse( status_code=exc.status_code, content={ "error": { "message": exc.detail, "type": "invalid_request_error", "param": None, "code": None } }, )