test24 / main.py
Niansuh's picture
Update main.py
45670a8 verified
raw
history blame
9.39 kB
from __future__ import annotations
import re
import random
import string
import uuid # Already added
from datetime import datetime # Add this line
from aiohttp import ClientSession
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
# Mock implementations for ImageResponse and to_data_uri
class ImageResponse:
def __init__(self, url: str, alt: str):
self.url = url
self.alt = alt
def to_data_uri(image: Any) -> str:
# Placeholder for actual image encoding
return "data:image/png;base64,..." # Replace with actual base64 data
class AsyncGeneratorProvider:
pass
class ProviderModelMixin:
pass
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackbox'
models = [
'blackbox',
'gemini-1.5-flash',
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'ImageGenerationLV45LJp',
'gpt-4o',
'gemini-pro',
'claude-sonnet-3.5',
]
agentMode = {
'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
}
trendingAgentMode = {
"blackbox": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
}
userSelectedModel = {
"gpt-4o": "gpt-4o",
"gemini-pro": "gemini-pro",
'claude-sonnet-3.5': "claude-sonnet-3.5",
}
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"flux": "ImageGenerationLV45LJp",
}
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
elif model in cls.userSelectedModel:
return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
@classmethod
async def create_async_generator(
cls,
model: str,
messages: List[Dict[str, str]],
proxy: Optional[str] = None,
image: Optional[Any] = None,
image_name: Optional[str] = None,
**kwargs
) -> Any:
model = cls.get_model(model)
headers = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": cls.url,
"pragma": "no-cache",
"referer": f"{cls.url}/",
"sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Linux"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
if model in cls.userSelectedModel:
prefix = f"@{cls.userSelectedModel[model]}"
if not messages[0]['content'].startswith(prefix):
messages[0]['content'] = f"{prefix} {messages[0]['content']}"
async with ClientSession(headers=headers) as session:
if image is not None:
messages[-1]["data"] = {
"fileText": image_name,
"imageBase64": to_data_uri(image)
}
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
data = {
"messages": messages,
"id": random_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"userSelectedModel": None,
"userSystemPrompt": None,
"isMicMode": False,
"maxTokens": 1024,
"playgroundTopP": 0.9,
"playgroundTemperature": 0.5,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"webSearchMode": False,
}
if model in cls.agentMode:
data["agentMode"] = cls.agentMode[model]
elif model in cls.trendingAgentMode:
data["trendingAgentMode"] = cls.trendingAgentMode[model]
elif model in cls.userSelectedModel:
data["userSelectedModel"] = cls.userSelectedModel[model]
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status()
if model == 'ImageGenerationLV45LJp':
response_text = await response.text()
url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text)
if url_match:
image_url = url_match.group(0)
yield ImageResponse(image_url, alt=messages[-1]['content'])
else:
raise Exception("Image URL not found in the response")
else:
async for chunk in response.content.iter_any():
if chunk:
decoded_chunk = chunk.decode()
decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk)
if decoded_chunk.strip():
yield decoded_chunk
# FastAPI app setup
app = FastAPI()
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
from fastapi.responses import Response
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatRequest, app_secret: str = Depends(verify_app_secret)
):
logger.info(f"Received chat completion request for model: {request.model}")
if request.model not in [model['id'] for model in ALLOWED_MODELS]:
raise HTTPException(
status_code=400,
detail=f"Model {request.model} is not allowed. Allowed models are: {', '.join(model['id'] for model in ALLOWED_MODELS)}",
)
# Generate a UUID for the conversation
conversation_id = str(uuid.uuid4()).replace("-", "")
json_data = {
"attachments": [],
"conversationId": conversation_id,
"prompt": "\n".join(
[
f"{'User' if msg.role == 'user' else 'Assistant'}: {msg.content}"
for msg in request.messages
]
),
}
headers["uniqueid"] = conversation_id
async def generate():
async with httpx.AsyncClient() as client:
try:
async with client.stream('POST', f'{BASE_URL}/api/chat/gpt4o/chat', headers=headers, json=json_data, timeout=120.0) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line and line != "[DONE]":
content = json.loads(line)["data"]
yield f"data: {json.dumps(create_chat_completion_data(content['message'], request.model))}\n\n"
yield f"data: {json.dumps(create_chat_completion_data('', request.model, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
raise HTTPException(status_code=e.response.status_code, detail=str(e))
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting: {e}")
raise HTTPException(status_code=500, detail=str(e))
if request.stream:
logger.info("Streaming response")
return StreamingResponse(generate(), media_type="text/event-stream")
else:
logger.info("Non-streaming response")
full_response = ""
async for chunk in generate():
if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"):
data = json.loads(chunk[6:])
if data["choices"][0]["delta"].get("content"):
full_response += data["choices"][0]["delta"]["content"]
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": full_response},
"finish_reason": "stop",
}
],
"usage": None,
}