test24 / main.py
Niansuh's picture
Update main.py
9cf0d3b verified
raw
history blame
9.54 kB
import os
import uuid
import logging
import json
import re
import random
import string
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# FastAPI app setup
app = FastAPI()
# CORS middleware setup
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mock implementations for ImageResponse and to_data_uri
class ImageResponse:
def __init__(self, url: str, alt: str):
self.url = url
self.alt = alt
def to_data_uri(image: Any) -> str:
# Placeholder for actual image encoding
return "data:image/png;base64,..." # Replace with actual base64 data
# Define models and providers
class AsyncGeneratorProvider:
pass
class ProviderModelMixin:
pass
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackbox'
models = [
'blackbox',
'gemini-1.5-flash',
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'ImageGenerationLV45LJp',
'gpt-4o',
'gemini-pro',
'claude-sonnet-3.5',
]
agentMode = {
'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
}
trendingAgentMode = {
"blackbox": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
}
userSelectedModel = {
"gpt-4o": "gpt-4o",
"gemini-pro": "gemini-pro",
'claude-sonnet-3.5': "claude-sonnet-3.5",
}
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"flux": "ImageGenerationLV45LJp",
}
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
elif model in cls.userSelectedModel:
return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
@classmethod
async def create_async_generator(
cls,
model: str,
messages: List[Dict[str, str]],
proxy: Optional[str] = None,
image: Optional[Any] = None,
image_name: Optional[str] = None,
**kwargs
) -> Any:
model = cls.get_model(model)
headers = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": cls.url,
"pragma": "no-cache",
"referer": f"{cls.url}/",
"sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Linux"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
if model in cls.userSelectedModel:
prefix = f"@{cls.userSelectedModel[model]}"
if not messages[0]['content'].startswith(prefix):
messages[0]['content'] = f"{prefix} {messages[0]['content']}"
async with httpx.AsyncClient(headers=headers) as session:
if image is not None:
messages[-1]["data"] = {
"fileText": image_name,
"imageBase64": to_data_uri(image)
}
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
data = {
"messages": messages,
"id": random_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"userSelectedModel": None,
"userSystemPrompt": None,
"isMicMode": False,
"maxTokens": 1024,
"playgroundTopP": 0.9,
"playgroundTemperature": 0.5,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"webSearchMode": False,
}
if model in cls.agentMode:
data["agentMode"] = cls.agentMode[model]
elif model in cls.trendingAgentMode:
data["trendingAgentMode"] = cls.trendingAgentMode[model]
elif model in cls.userSelectedModel:
data["userSelectedModel"] = cls.userSelectedModel[model]
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status()
if model == 'ImageGenerationLV45LJp':
response_text = await response.text()
url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text)
if url_match:
image_url = url_match.group(0)
yield ImageResponse(image_url, alt=messages[-1]['content'])
else:
raise Exception("Image URL not found in the response")
else:
async for chunk in response.aiter_bytes():
if chunk:
decoded_chunk = chunk.decode()
decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk)
if decoded_chunk.strip():
yield decoded_chunk
# Message and chat request models
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
stream: Optional[bool] = False
# Verify app secret (placeholder)
async def verify_app_secret(app_secret: str):
if app_secret != os.getenv("APP_SECRET"):
raise HTTPException(status_code=403, detail="Forbidden")
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest, app_secret: str = Depends(verify_app_secret)):
logger.info(f"Received chat completion request for model: {request.model}")
# Validate model
if request.model not in Blackbox.models:
raise HTTPException(
status_code=400,
detail=f"Model {request.model} is not allowed. Allowed models are: {', '.join(Blackbox.models)}",
)
# Generate a UUID for the conversation
conversation_id = str(uuid.uuid4()).replace("-", "")
json_data = {
"attachments": [],
"conversationId": conversation_id,
"prompt": "\n".join(
[f"{msg.role}: {msg.content}" for msg in request.messages]
),
}
headers["uniqueid"] = conversation_id
async def generate():
async with httpx.AsyncClient() as client:
try:
async with client.stream('POST', f'{Blackbox.api_endpoint}', headers=headers, json=json_data, timeout=120.0) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line and line != "[DONE]":
content = json.loads(line)["data"]
yield f"data: {json.dumps(content)}\n\n"
yield "data: [DONE]\n\n"
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
raise HTTPException(status_code=e.response.status_code, detail=str(e))
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting: {e}")
raise HTTPException(status_code=500, detail=str(e))
if request.stream:
return StreamingResponse(generate(), media_type="text/event-stream")
else:
full_response = ""
async for chunk in generate():
if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"):
data = json.loads(chunk[6:])
full_response += data.get("choices", [{}])[0].get("delta", {}).get("content", "")
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": full_response},
"finish_reason": "stop",
}
],
"usage": None,
}