|
import os |
|
import json |
|
import httpx |
|
import asyncio |
|
|
|
|
|
|
|
with open("llm/model_config.json", "r") as f: |
|
CONFIG = json.load(f) |
|
|
|
PROVIDERS = CONFIG["providers"] |
|
MODELS = CONFIG["models"] |
|
|
|
|
|
STRUCTURED_ASSISTANT_PROMPT = """You are a helpful AI assistant. |
|
|
|
- Respond to the user’s message in a structured and professional way. |
|
- Match the length and complexity of your response to the user's input. |
|
- If the user's input is simple (e.g., "Hi"), reply politely without overexplaining. |
|
- If the user's input is complex, give a complete and organized answer. |
|
- Do not repeat the user's prompt. |
|
- Be direct, helpful, and clear. |
|
""" |
|
|
|
AGGREGATOR_PROMPT = """You are an AI responsible for combining the outputs of multiple AI assistants. |
|
|
|
- Read their answers carefully. |
|
- Identify the best parts from each. |
|
- Write a single, coherent, and helpful reply. |
|
- Do not simply merge texts or repeat everything. |
|
- Match the depth and tone to the user's original input. |
|
- Keep it natural and conversational. |
|
""" |
|
|
|
|
|
async def query_llm(model_name, user_input, role_prompt): |
|
provider_key = MODELS.get(model_name) |
|
if not provider_key: |
|
return f"Model '{model_name}' is not supported." |
|
|
|
provider = PROVIDERS.get(provider_key) |
|
if not provider: |
|
return f"Provider '{provider_key}' is not configured." |
|
|
|
endpoint = provider["url"] |
|
api_key_env = provider["key_env"] |
|
api_key = os.getenv(api_key_env) |
|
|
|
if not api_key: |
|
return f"API key for provider '{provider_key}' not found." |
|
|
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
payload = { |
|
"model": model_name, |
|
"messages": [ |
|
{"role": "system", "content": role_prompt}, |
|
{"role": "user", "content": user_input} |
|
] |
|
} |
|
|
|
try: |
|
async with httpx.AsyncClient(timeout=60.0) as client: |
|
response = await client.post(endpoint, headers=headers, json=payload) |
|
response.raise_for_status() |
|
data = response.json() |
|
return data["choices"][0]["message"]["content"] |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
async def query_moa_chain(user_input, settings): |
|
"""Queries LLM-A, LLM-B, LLM-C, and Aggregator in sequence.""" |
|
llm_a = settings["models"].get("LLM-A") |
|
llm_b = settings["models"].get("LLM-B") |
|
llm_c = settings["models"].get("LLM-C") |
|
aggregator = settings.get("aggregator") |
|
|
|
|
|
results = await asyncio.gather( |
|
query_llm(llm_a, user_input, STRUCTURED_ASSISTANT_PROMPT), |
|
query_llm(llm_b, user_input, STRUCTURED_ASSISTANT_PROMPT), |
|
query_llm(llm_c, user_input, STRUCTURED_ASSISTANT_PROMPT) |
|
) |
|
|
|
|
|
combined_content = ( |
|
f"[LLM-A] {results[0]}\n\n" |
|
f"[LLM-B] {results[1]}\n\n" |
|
f"[LLM-C] {results[2]}" |
|
) |
|
|
|
|
|
final_response = await query_llm(aggregator, combined_content, AGGREGATOR_PROMPT) |
|
return final_response |
|
|