import os import uuid import logging from datetime import datetime from typing import Any, Dict, List, Optional import httpx from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from starlette.middleware.cors import CORSMiddleware from starlette.responses import StreamingResponse, Response # Mock implementations for ImageResponse and to_data_uri class ImageResponse: def __init__(self, url: str, alt: str): self.url = url self.alt = alt def to_data_uri(image: Any) -> str: # Placeholder for actual image encoding return "data:image/png;base64,..." # Replace with actual base64 data class AsyncGeneratorProvider: pass class ProviderModelMixin: pass class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): url = "https://www.blackbox.ai" api_endpoint = "https://www.blackbox.ai/api/chat" working = True supports_stream = True supports_system_message = True supports_message_history = True default_model = 'blackbox' models = [ 'blackbox', 'gemini-1.5-flash', "llama-3.1-8b", 'llama-3.1-70b', 'llama-3.1-405b', 'ImageGenerationLV45LJp', 'gpt-4o', 'gemini-pro', 'claude-sonnet-3.5', ] agentMode = { 'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"}, } trendingAgentMode = { "blackbox": {}, "gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, "llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, 'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, 'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"}, } userSelectedModel = { "gpt-4o": "gpt-4o", "gemini-pro": "gemini-pro", 'claude-sonnet-3.5': "claude-sonnet-3.5", } model_aliases = { "gemini-flash": "gemini-1.5-flash", "flux": "ImageGenerationLV45LJp", } @classmethod def get_model(cls, model: str) -> str: if model in cls.models: return model elif model in cls.userSelectedModel: return model elif model in cls.model_aliases: return cls.model_aliases[model] else: return cls.default_model @classmethod async def create_async_generator( cls, model: str, messages: List[Dict[str, str]], proxy: Optional[str] = None, image: Optional[Any] = None, image_name: Optional[str] = None, **kwargs ) -> Any: model = cls.get_model(model) headers = { "accept": "*/*", "accept-language": "en-US,en;q=0.9", "cache-control": "no-cache", "content-type": "application/json", "origin": cls.url, "pragma": "no-cache", "referer": f"{cls.url}/", "sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Linux"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" } if model in cls.userSelectedModel: prefix = f"@{cls.userSelectedModel[model]}" if not messages[0]['content'].startswith(prefix): messages[0]['content'] = f"{prefix} {messages[0]['content']}" async with ClientSession(headers=headers) as session: if image is not None: messages[-1]["data"] = { "fileText": image_name, "imageBase64": to_data_uri(image) } random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) data = { "messages": messages, "id": random_id, "previewToken": None, "userId": None, "codeModelMode": True, "agentMode": {}, "trendingAgentMode": {}, "userSelectedModel": None, "userSystemPrompt": None, "isMicMode": False, "maxTokens": 1024, "playgroundTopP": 0.9, "playgroundTemperature": 0.5, "isChromeExt": False, "githubToken": None, "clickedAnswer2": False, "clickedAnswer3": False, "clickedForceWebSearch": False, "visitFromDelta": False, "mobileClient": False, "webSearchMode": False, } if model in cls.agentMode: data["agentMode"] = cls.agentMode[model] elif model in cls.trendingAgentMode: data["trendingAgentMode"] = cls.trendingAgentMode[model] elif model in cls.userSelectedModel: data["userSelectedModel"] = cls.userSelectedModel[model] async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response: response.raise_for_status() if model == 'ImageGenerationLV45LJp': response_text = await response.text() url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text) if url_match: image_url = url_match.group(0) yield ImageResponse(image_url, alt=messages[-1]['content']) else: raise Exception("Image URL not found in the response") else: async for chunk in response.content.iter_any(): if chunk: decoded_chunk = chunk.decode() decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk) if decoded_chunk.strip(): yield decoded_chunk # FastAPI app setup app = FastAPI() class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: List[Message] from fastapi.responses import Response @app.post("/v1/chat/completions") async def chat_completions( request: ChatRequest, app_secret: str = Depends(verify_app_secret) ): logger.info(f"Received chat completion request for model: {request.model}") if request.model not in [model['id'] for model in ALLOWED_MODELS]: raise HTTPException( status_code=400, detail=f"Model {request.model} is not allowed. Allowed models are: {', '.join(model['id'] for model in ALLOWED_MODELS)}", ) # Generate a UUID for the conversation conversation_id = str(uuid.uuid4()).replace("-", "") json_data = { "attachments": [], "conversationId": conversation_id, "prompt": "\n".join( [ f"{'User' if msg.role == 'user' else 'Assistant'}: {msg.content}" for msg in request.messages ] ), } headers["uniqueid"] = conversation_id async def generate(): async with httpx.AsyncClient() as client: try: async with client.stream('POST', f'{BASE_URL}/api/chat/gpt4o/chat', headers=headers, json=json_data, timeout=120.0) as response: response.raise_for_status() async for line in response.aiter_lines(): if line and line != "[DONE]": content = json.loads(line)["data"] yield f"data: {json.dumps(create_chat_completion_data(content['message'], request.model))}\n\n" yield f"data: {json.dumps(create_chat_completion_data('', request.model, 'stop'))}\n\n" yield "data: [DONE]\n\n" except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred: {e}") raise HTTPException(status_code=e.response.status_code, detail=str(e)) except httpx.RequestError as e: logger.error(f"An error occurred while requesting: {e}") raise HTTPException(status_code=500, detail=str(e)) if request.stream: logger.info("Streaming response") return StreamingResponse(generate(), media_type="text/event-stream") else: logger.info("Non-streaming response") full_response = "" async for chunk in generate(): if chunk.startswith("data: ") and not chunk[6:].startswith("[DONE]"): data = json.loads(chunk[6:]) if data["choices"][0]["delta"].get("content"): full_response += data["choices"][0]["delta"]["content"] return { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": request.model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": full_response}, "finish_reason": "stop", } ], "usage": None, }