test24 / main.py
Niansuh's picture
Update main.py
7070320 verified
raw
history blame
9.41 kB
import os
import uuid
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, Response
# Mock implementations for ImageResponse and to_data_uri
class ImageResponse:
def __init__(self, url: str, alt: str):
self.url = url
self.alt = alt
def to_data_uri(image: Any) -> str:
# Placeholder for actual image encoding
return "data:image/png;base64,..." # Replace with actual base64 data
class AsyncGeneratorProvider:
pass
class ProviderModelMixin:
pass
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackbox'
models = [
'blackbox',
'gemini-1.5-flash',
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'ImageGenerationLV45LJp',
'gpt-4o',
'gemini-pro',
'claude-sonnet-3.5',
]
agentMode = {
'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
}
trendingAgentMode = {
"blackbox": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
}
userSelectedModel = {
"gpt-4o": "gpt-4o",
"gemini-pro": "gemini-pro",
'claude-sonnet-3.5': "claude-sonnet-3.5",
}
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"flux": "ImageGenerationLV45LJp",
}
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
elif model in cls.userSelectedModel:
return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
@classmethod
async def create_async_generator(
cls,
model: str,
messages: List[Dict[str, str]],
proxy: Optional[str] = None,
image: Optional[Any] = None,
image_name: Optional[str] = None,
**kwargs
) -> Any:
model = cls.get_model(model)
headers = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": cls.url,
"pragma": "no-cache",
"referer": f"{cls.url}/",
"sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Linux"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
if model in cls.userSelectedModel:
prefix = f"@{cls.userSelectedModel[model]}"
if not messages[0]['content'].startswith(prefix):
messages[0]['content'] = f"{prefix} {messages[0]['content']}"
async with ClientSession(headers=headers) as session:
if image is not None:
messages[-1]["data"] = {
"fileText": image_name,
"imageBase64": to_data_uri(image)
}
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
data = {
"messages": messages,
"id": random_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"userSelectedModel": None,
"userSystemPrompt": None,
"isMicMode": False,
"maxTokens": 1024,
"playgroundTopP": 0.9,
"playgroundTemperature": 0.5,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"webSearchMode": False,
}
if model in cls.agentMode:
data["agentMode"] = cls.agentMode[model]
elif model in cls.trendingAgentMode:
data["trendingAgentMode"] = cls.trendingAgentMode[model]
elif model in cls.userSelectedModel:
data["userSelectedModel"] = cls.userSelectedModel[model]
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status()
if model == 'ImageGenerationLV45LJp':
response_text = await response.text()
url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text)
if url_match:
image_url = url_match.group(0)
yield ImageResponse(image_url, alt=messages[-1]['content'])
else:
raise Exception("Image URL not found in the response")
else:
async for chunk in response.content.iter_any():
if chunk:
decoded_chunk = chunk.decode()
decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk)
if decoded_chunk.strip():
yield decoded_chunk
# FastAPI app setup
app = FastAPI()
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: List[Message]
from fastapi.responses import Response
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatRequest, app_secret: str = Depends(verify_app_secret)
):
logger.info(f"Received chat completion request for model: {request.model}")
if request.model not in [model['id'] for model in ALLOWED_MODELS]:
raise HTTPException(
status_code=400,
detail=f"Model {request.model} is not allowed. Allowed models are: {', '.join(model['id'] for model in ALLOWED_MODELS)}",
)
# Generate a UUID for the conversation
conversation_id = str(uuid.uuid4()).replace("-", "")
json_data = {
"attachments": [],
"conversationId": conversation_id,
"prompt": "\n".join(
[
f"{'User' if msg.role == 'user' else 'Assistant'}: {msg.content}"
for msg in request.messages
]
),
}
headers["uniqueid"] = conversation_id
async def generate():
async with httpx.AsyncClient() as client:
try:
async with client.stream('POST', f'{BASE_URL}/api/chat/gpt4o/chat', headers=headers, json=json_data, timeout=120.0) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line and line != "[DONE]":
content = json.loads(line)["data"]
yield f"data: {json.dumps(create_chat_completion_data(content['message'], request.model))}\n\n"
yield f"data: {json.dumps(create_chat_completion_data('', request.model, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
raise HTTPException(status_code=e.response.status_code, detail=str(e))
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting: {e}")
raise HTTPException(status_code=500, detail=str(e))
if request.stream:
logger.info("Streaming response")
return StreamingResponse(generate(), media_type="text/event-stream")
else:
logger.info("Non-streaming response")
full_response = ""
async for chunk in generate():
if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"):
data = json.loads(chunk[6:])
if data["choices"][0]["delta"].get("content"):
full_response += data["choices"][0]["delta"]["content"]
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": full_response},
"finish_reason": "stop",
}
],
"usage": None,
}