test24 / main.py
Niansuh's picture
Update main.py
e6f4968 verified
raw
history blame
17.3 kB
# Simple in-memory rate limiter based solely on IP addresses
async def rate_limiter_per_ip(request: Request):
client_ip = request.client.host
current_time = time.time()
# Initialize or update the count and timestamp
if current_time - rate_limit_store[client_ip]["timestamp"] > RATE_LIMIT_WINDOW:
rate_limit_store[client_ip] = {"count": 1, "timestamp": current_time}
else:
if rate_limit_store[client_ip]["count"] >= RATE_LIMIT:
logger.warning(f"Rate limit exceeded for IP address: {client_ip}")
raise HTTPException(status_code=429, detail='Rate limit exceeded for IP address')
rate_limit_store[client_ip]["count"] += 1
CLEANUP_INTERVAL = 60 # seconds
RATE_LIMIT_WINDOW = 60 # seconds
class Blackbox:
label = "Blackbox AI"
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_gpt_4 = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackboxai'
image_models = ['ImageGeneration']
models = [
default_model,
'blackboxai-pro',
*image_models,
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'gpt-4o',
'gemini-pro',
'gemini-1.5-flash',
'claude-sonnet-3.5',
'PythonAgent',
'JavaAgent',
'JavaScriptAgent',
'HTMLAgent',
'GoogleCloudAgent',
'AndroidDeveloper',
'SwiftDeveloper',
'Next.jsAgent',
'MongoDBAgent',
'PyTorchAgent',
'ReactAgent',
'XcodeAgent',
'AngularJSAgent',
]
agentMode = {
'ImageGeneration': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
}
trendingAgentMode = {
"blackboxai": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"},
'PythonAgent': {'mode': True, 'id': "Python Agent"},
'JavaAgent': {'mode': True, 'id': "Java Agent"},
'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"},
'HTMLAgent': {'mode': True, 'id': "HTML Agent"},
'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"},
'AndroidDeveloper': {'mode': True, 'id': "Android Developer"},
'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"},
'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"},
'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"},
'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"},
'ReactAgent': {'mode': True, 'id': "React Agent"},
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"},
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"},
}
userSelectedModel = {
"gpt-4o": "gpt-4o",
"gemini-pro": "gemini-pro",
'claude-sonnet-3.5': "claude-sonnet-3.5",
}
model_prefixes = {
'gpt-4o': '@GPT-4o',
'gemini-pro': '@Gemini-PRO',
'claude-sonnet-3.5': '@Claude-Sonnet-3.5',
'PythonAgent': '@Python Agent',
'JavaAgent': '@Java Agent',
'JavaScriptAgent': '@JavaScript Agent',
'HTMLAgent': '@HTML Agent',
'GoogleCloudAgent': '@Google Cloud Agent',
'AndroidDeveloper': '@Android Developer',
'SwiftDeveloper': '@Swift Developer',
'Next.jsAgent': '@Next.js Agent',
'MongoDBAgent': '@MongoDB Agent',
'PyTorchAgent': '@PyTorch Agent',
'ReactAgent': '@React Agent',
'XcodeAgent': '@Xcode Agent',
'AngularJSAgent': '@AngularJS Agent',
'blackboxai-pro': '@BLACKBOXAI-PRO',
'ImageGeneration': '@Image Generation',
}
model_referers = {
"blackboxai": "/?model=blackboxai",
"gpt-4o": "/?model=gpt-4o",
"gemini-pro": "/?model=gemini-pro",
"claude-sonnet-3.5": "/?model=claude-sonnet-3.5"
}
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"claude-3.5-sonnet": "claude-sonnet-3.5",
"flux": "ImageGeneration",
}
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
@staticmethod
def generate_random_string(length: int = 7) -> str:
characters = string.ascii_letters + string.digits
return ''.join(random.choices(characters, k=length))
@staticmethod
def generate_next_action() -> str:
return uuid.uuid4().hex
@staticmethod
def generate_next_router_state_tree() -> str:
router_state = [
"",
{
"children": [
"(chat)",
{
"children": [
"__PAGE__",
{}
]
}
]
},
None,
None,
True
]
return json.dumps(router_state)
@staticmethod
def clean_response(text: str) -> str:
pattern = r'^\$\@\$v=undefined-rv1\$\@\$'
cleaned_text = re.sub(pattern, '', text)
return cleaned_text
@classmethod
async def create_async_generator(
cls,
model: str,
messages: List[Dict[str, str]],
proxy: Optional[str] = None,
websearch: bool = False,
**kwargs
) -> AsyncGenerator[Union[str, Dict[str, Any]], None]:
"""
Creates an asynchronous generator for streaming responses from Blackbox AI.
"""
model = cls.get_model(model)
chat_id = cls.generate_random_string()
next_action = cls.generate_next_action()
next_router_state_tree = cls.generate_next_router_state_tree()
agent_mode = cls.agentMode.get(model, {})
trending_agent_mode = cls.trendingAgentMode.get(model, {})
prefix = cls.model_prefixes.get(model, "")
formatted_prompt = ""
for message in messages:
role = message.get('role', '').capitalize()
content = message.get('content', '')
if role and content:
formatted_prompt += f"{role}: {content}\n"
if prefix:
formatted_prompt = f"{prefix} {formatted_prompt}".strip()
referer_path = cls.model_referers.get(model, f"/?model={model}")
referer_url = f"{cls.url}{referer_path}"
common_headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'no-cache',
'origin': cls.url,
'pragma': 'no-cache',
'priority': 'u=1, i',
'sec-ch-ua': '"Chromium";v="129", "Not=A?Brand";v="8"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/129.0.0.0 Safari/537.36'
}
headers_api_chat = {
'Content-Type': 'application/json',
'Referer': referer_url
}
headers_api_chat_combined = {**common_headers, **headers_api_chat}
payload_api_chat = {
"messages": [
{
"id": chat_id,
"content": formatted_prompt,
"role": "user"
}
],
"id": chat_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": agent_mode,
"trendingAgentMode": trending_agent_mode,
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": 1024,
"playgroundTopP": 0.9,
"playgroundTemperature": 0.5,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"webSearchMode": websearch,
"userSelectedModel": cls.userSelectedModel.get(model, model)
}
async with ClientSession(headers=common_headers) as session:
try:
async with session.post(
cls.api_endpoint,
headers=headers_api_chat_combined,
json=payload_api_chat,
proxy=proxy
) as response_api_chat:
response_api_chat.raise_for_status()
text = await response_api_chat.text()
cleaned_response = cls.clean_response(text)
if model in cls.image_models:
match = re.search(r'!\[.*?\]\((https?://[^\)]+)\)', cleaned_response)
if match:
image_url = match.group(1)
yield {"type": "image", "url": image_url, "alt": "Generated Image"}
else:
yield cleaned_response
else:
if websearch:
match = re.search(r'\$~~~\$(.*?)\$~~~\$', cleaned_response, re.DOTALL)
if match:
source_part = match.group(1).strip()
answer_part = cleaned_response[match.end():].strip()
try:
sources = json.loads(source_part)
source_formatted = "**Source:**\n"
for item in sources:
title = item.get('title', 'No Title')
link = item.get('link', '#')
position = item.get('position', '')
source_formatted += f"{position}. [{title}]({link})\n"
final_response = f"{answer_part}\n\n{source_formatted}"
except json.JSONDecodeError:
final_response = f"{answer_part}\n\nSource information is unavailable."
else:
final_response = cleaned_response
else:
if '$~~~$' in cleaned_response:
final_response = cleaned_response.split('$~~~$')[0].strip()
else:
final_response = cleaned_response
yield final_response
except ClientResponseError as e:
error_text = f"Error {e.status}: {e.message}"
try:
error_response = await e.response.text()
cleaned_error = cls.clean_response(error_response)
error_text += f" - {cleaned_error}"
except Exception:
pass
yield error_text
except Exception as e:
yield f"Unexpected error during /api/chat request: {str(e)}"
# FastAPI app setup
app = FastAPI()
# Add the cleanup task when the app starts
@app.on_event("startup")
async def startup_event():
asyncio.create_task(cleanup_rate_limit_stores())
logger.info("Started rate limit store cleanup task.")
# Middleware to enhance security and enforce Content-Type for specific endpoints
@app.middleware("http")
async def security_middleware(request: Request, call_next):
client_ip = request.client.host
# Enforce that POST requests to /v1/chat/completions must have Content-Type: application/json
if request.method == "POST" and request.url.path == "/v1/chat/completions":
content_type = request.headers.get("Content-Type")
if content_type != "application/json":
logger.warning(f"Invalid Content-Type from IP: {client_ip} for path: {request.url.path}")
return JSONResponse(
status_code=400,
content={
"error": {
"message": "Content-Type must be application/json",
"type": "invalid_request_error",
"param": None,
"code": None
}
},
)
response = await call_next(request)
return response
# Request Models
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
temperature: Optional[float] = 1.0
top_p: Optional[float] = 1.0
n: Optional[int] = 1
max_tokens: Optional[int] = None
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
logit_bias: Optional[Dict[str, float]] = None
user: Optional[str] = None
@app.post("/v1/chat/completions", dependencies=[Depends(rate_limiter_per_ip: Request)])
async def chat_completions(request: ChatRequest, req: Request, api_key: str = Depends(get_api_key)):
client_ip = req.client.host
# Redact user messages only for logging purposes
redacted_messages = [{"role": msg.role, "content": "[redacted]"} for msg in request.messages]
logger.info(f"Received chat completions request from API key: {api_key} | IP: {client_ip} | Model: {request.model} | Messages: {redacted_messages}")
try:
# Validate that the requested model is available
if request.model not in Blackbox.models and request.model not in Blackbox.model_aliases:
logger.warning(f"Attempt to use unavailable model: {request.model} from IP: {client_ip}")
raise HTTPException(status_code=400, detail="Requested model is not available.")
# Process the request with actual message content, but don't log it
response_content = await Blackbox.create_async_generator(
model=request.model,
messages=[{"role": msg.role, "content": msg.content} for msg in request.messages],
temperature=request.temperature,
max_tokens=request.max_tokens
)
logger.info(f"Completed response generation for API key: {api_key} | IP: {client_ip}")
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": response_content
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": sum(len(msg.content.split()) for msg in request.messages),
"completion_tokens": len(response_content.split()),
"total_tokens": sum(len(msg.content.split()) for msg in request.messages) + len(response_content.split())
},
}
except ModelNotWorkingException as e:
logger.warning(f"Model not working: {e} | IP: {client_ip}")
raise HTTPException(status_code=503, detail=str(e))
except HTTPException as he:
logger.warning(f"HTTPException: {he.detail} | IP: {client_ip}")
raise he
except Exception as e:
logger.exception(f"An unexpected error occurred while processing the chat completions request from IP: {client_ip}.")
raise HTTPException(status_code=500, detail=str(e))
# Endpoint: GET /v1/models
@app.get("/v1/models", dependencies=[Depends(rate_limiter_per_ip)])
async def get_models(req: Request):
client_ip = req.client.host
logger.info(f"Fetching available models from IP: {client_ip}")
return {"data": [{"id": model, "object": "model"} for model in Blackbox.models]}
# Endpoint: GET /v1/health
@app.get("/v1/health", dependencies=[Depends(rate_limiter_per_ip)])
async def health_check(req: Request):
client_ip = req.client.host
logger.info(f"Health check requested from IP: {client_ip}")
return {"status": "ok"}
# Custom exception handler to match OpenAI's error format
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
client_ip = request.client.host
logger.error(f"HTTPException: {exc.detail} | Path: {request.url.path} | IP: {client_ip}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"message": exc.detail,
"type": "invalid_request_error",
"param": None,
"code": None
}
},
)