|
import os |
|
import uuid |
|
import logging |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Optional |
|
|
|
import httpx |
|
from fastapi import FastAPI, HTTPException, Depends |
|
from pydantic import BaseModel |
|
from starlette.middleware.cors import CORSMiddleware |
|
from starlette.responses import StreamingResponse, Response |
|
|
|
|
|
class ImageResponse: |
|
def __init__(self, url: str, alt: str): |
|
self.url = url |
|
self.alt = alt |
|
|
|
def to_data_uri(image: Any) -> str: |
|
|
|
return "data:image/png;base64,..." |
|
|
|
class AsyncGeneratorProvider: |
|
pass |
|
|
|
class ProviderModelMixin: |
|
pass |
|
|
|
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): |
|
url = "https://www.blackbox.ai" |
|
api_endpoint = "https://www.blackbox.ai/api/chat" |
|
working = True |
|
supports_stream = True |
|
supports_system_message = True |
|
supports_message_history = True |
|
|
|
default_model = 'blackbox' |
|
models = [ |
|
'blackbox', |
|
'gemini-1.5-flash', |
|
"llama-3.1-8b", |
|
'llama-3.1-70b', |
|
'llama-3.1-405b', |
|
'ImageGenerationLV45LJp', |
|
'gpt-4o', |
|
'gemini-pro', |
|
'claude-sonnet-3.5', |
|
] |
|
|
|
agentMode = { |
|
'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, |
|
} |
|
|
|
trendingAgentMode = { |
|
"blackbox": {}, |
|
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, |
|
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, |
|
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, |
|
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, |
|
} |
|
|
|
userSelectedModel = { |
|
"gpt-4o": "gpt-4o", |
|
"gemini-pro": "gemini-pro", |
|
'claude-sonnet-3.5': "claude-sonnet-3.5", |
|
} |
|
|
|
model_aliases = { |
|
"gemini-flash": "gemini-1.5-flash", |
|
"flux": "ImageGenerationLV45LJp", |
|
} |
|
|
|
@classmethod |
|
def get_model(cls, model: str) -> str: |
|
if model in cls.models: |
|
return model |
|
elif model in cls.userSelectedModel: |
|
return model |
|
elif model in cls.model_aliases: |
|
return cls.model_aliases[model] |
|
else: |
|
return cls.default_model |
|
|
|
@classmethod |
|
async def create_async_generator( |
|
cls, |
|
model: str, |
|
messages: List[Dict[str, str]], |
|
proxy: Optional[str] = None, |
|
image: Optional[Any] = None, |
|
image_name: Optional[str] = None, |
|
**kwargs |
|
) -> Any: |
|
model = cls.get_model(model) |
|
|
|
headers = { |
|
"accept": "*/*", |
|
"accept-language": "en-US,en;q=0.9", |
|
"cache-control": "no-cache", |
|
"content-type": "application/json", |
|
"origin": cls.url, |
|
"pragma": "no-cache", |
|
"referer": f"{cls.url}/", |
|
"sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"', |
|
"sec-ch-ua-mobile": "?0", |
|
"sec-ch-ua-platform": '"Linux"', |
|
"sec-fetch-dest": "empty", |
|
"sec-fetch-mode": "cors", |
|
"sec-fetch-site": "same-origin", |
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" |
|
} |
|
|
|
if model in cls.userSelectedModel: |
|
prefix = f"@{cls.userSelectedModel[model]}" |
|
if not messages[0]['content'].startswith(prefix): |
|
messages[0]['content'] = f"{prefix} {messages[0]['content']}" |
|
|
|
async with ClientSession(headers=headers) as session: |
|
if image is not None: |
|
messages[-1]["data"] = { |
|
"fileText": image_name, |
|
"imageBase64": to_data_uri(image) |
|
} |
|
|
|
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) |
|
|
|
data = { |
|
"messages": messages, |
|
"id": random_id, |
|
"previewToken": None, |
|
"userId": None, |
|
"codeModelMode": True, |
|
"agentMode": {}, |
|
"trendingAgentMode": {}, |
|
"userSelectedModel": None, |
|
"userSystemPrompt": None, |
|
"isMicMode": False, |
|
"maxTokens": 1024, |
|
"playgroundTopP": 0.9, |
|
"playgroundTemperature": 0.5, |
|
"isChromeExt": False, |
|
"githubToken": None, |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"visitFromDelta": False, |
|
"mobileClient": False, |
|
"webSearchMode": False, |
|
} |
|
|
|
if model in cls.agentMode: |
|
data["agentMode"] = cls.agentMode[model] |
|
elif model in cls.trendingAgentMode: |
|
data["trendingAgentMode"] = cls.trendingAgentMode[model] |
|
elif model in cls.userSelectedModel: |
|
data["userSelectedModel"] = cls.userSelectedModel[model] |
|
|
|
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response: |
|
response.raise_for_status() |
|
if model == 'ImageGenerationLV45LJp': |
|
response_text = await response.text() |
|
url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text) |
|
if url_match: |
|
image_url = url_match.group(0) |
|
yield ImageResponse(image_url, alt=messages[-1]['content']) |
|
else: |
|
raise Exception("Image URL not found in the response") |
|
else: |
|
async for chunk in response.content.iter_any(): |
|
if chunk: |
|
decoded_chunk = chunk.decode() |
|
decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk) |
|
if decoded_chunk.strip(): |
|
yield decoded_chunk |
|
|
|
|
|
app = FastAPI() |
|
|
|
class Message(BaseModel): |
|
role: str |
|
content: str |
|
|
|
class ChatRequest(BaseModel): |
|
model: str |
|
messages: List[Message] |
|
|
|
from fastapi.responses import Response |
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completions( |
|
request: ChatRequest, app_secret: str = Depends(verify_app_secret) |
|
): |
|
logger.info(f"Received chat completion request for model: {request.model}") |
|
|
|
if request.model not in [model['id'] for model in ALLOWED_MODELS]: |
|
raise HTTPException( |
|
status_code=400, |
|
detail=f"Model {request.model} is not allowed. Allowed models are: {', '.join(model['id'] for model in ALLOWED_MODELS)}", |
|
) |
|
|
|
|
|
conversation_id = str(uuid.uuid4()).replace("-", "") |
|
|
|
json_data = { |
|
"attachments": [], |
|
"conversationId": conversation_id, |
|
"prompt": "\n".join( |
|
[ |
|
f"{'User' if msg.role == 'user' else 'Assistant'}: {msg.content}" |
|
for msg in request.messages |
|
] |
|
), |
|
} |
|
|
|
headers["uniqueid"] = conversation_id |
|
|
|
async def generate(): |
|
async with httpx.AsyncClient() as client: |
|
try: |
|
async with client.stream('POST', f'{BASE_URL}/api/chat/gpt4o/chat', headers=headers, json=json_data, timeout=120.0) as response: |
|
response.raise_for_status() |
|
async for line in response.aiter_lines(): |
|
if line and line != "[DONE]": |
|
content = json.loads(line)["data"] |
|
yield f"data: {json.dumps(create_chat_completion_data(content['message'], request.model))}\n\n" |
|
yield f"data: {json.dumps(create_chat_completion_data('', request.model, 'stop'))}\n\n" |
|
yield "data: [DONE]\n\n" |
|
except httpx.HTTPStatusError as e: |
|
logger.error(f"HTTP error occurred: {e}") |
|
raise HTTPException(status_code=e.response.status_code, detail=str(e)) |
|
except httpx.RequestError as e: |
|
logger.error(f"An error occurred while requesting: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
if request.stream: |
|
logger.info("Streaming response") |
|
return StreamingResponse(generate(), media_type="text/event-stream") |
|
else: |
|
logger.info("Non-streaming response") |
|
full_response = "" |
|
async for chunk in generate(): |
|
if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"): |
|
data = json.loads(chunk[6:]) |
|
if data["choices"][0]["delta"].get("content"): |
|
full_response += data["choices"][0]["delta"]["content"] |
|
|
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion", |
|
"created": int(datetime.now().timestamp()), |
|
"model": request.model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"message": {"role": "assistant", "content": full_response}, |
|
"finish_reason": "stop", |
|
} |
|
], |
|
"usage": None, |
|
} |
|
|