File size: 5,058 Bytes
5456854
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import gradio as gr
import requests
import json
import os
from dotenv import load_dotenv
import threading
import random
import time

LINUX_SERVER_HOST = os.getenv("LINUX_SERVER_HOST")
LINUX_SERVER_PROVIDER_KEY = [key for key in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if key]

AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 6)}
RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 10)}

MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}"))
MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}"))
MODEL_CHOICES = list(MODEL_MAPPING.values())
DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}"))

META_TAGS = os.getenv("META_TAGS")

stop_event = threading.Event()
session = requests.Session()

def get_model_key(display_name):
    return next((k for k, v in MODEL_MAPPING.items() if v == display_name), MODEL_CHOICES[0])

def simulate_streaming_response(text):
    for line in text.splitlines():
        if stop_event.is_set():
            return
        yield line + "\n"
        time.sleep(0.01)

def chat_with_model(history, user_input, selected_model_display):
    if stop_event.is_set():
        yield RESPONSES["RESPONSE_1"]
        return

    if not LINUX_SERVER_PROVIDER_KEY or not LINUX_SERVER_HOST:
        yield RESPONSES["RESPONSE_3"]
        return

    selected_model = get_model_key(selected_model_display)
    model_config = MODEL_CONFIG.get(selected_model, DEFAULT_CONFIG)

    messages = [{"role": "user", "content": user} for user, _ in history]
    messages += [{"role": "assistant", "content": assistant} for _, assistant in history if assistant]
    messages.append({"role": "user", "content": user_input})

    data = {"model": selected_model, "messages": messages, **model_config}
    random.shuffle(LINUX_SERVER_PROVIDER_KEY)

    for api_key in LINUX_SERVER_PROVIDER_KEY[:2]:
        if stop_event.is_set():
            yield RESPONSES["RESPONSE_1"]
            return
        try:
            response = session.post(LINUX_SERVER_HOST, json=data, headers={"Authorization": f"Bearer {api_key}"})
            if stop_event.is_set():
                yield RESPONSES["RESPONSE_1"]
                return
            if response.status_code < 400:
                ai_text = response.json().get("choices", [{}])[0].get("message", {}).get("content", RESPONSES["RESPONSE_2"])
                yield from simulate_streaming_response(ai_text)
                return
        except requests.exceptions.RequestException:
            continue

    yield RESPONSES["RESPONSE_3"]

def respond(user_input, history, selected_model_display):
    if not user_input.strip():
        yield history, gr.update(value=""), gr.update(visible=False, interactive=False), gr.update(visible=True)
        return

    stop_event.clear()
    history.append([user_input, RESPONSES["RESPONSE_8"]])

    yield history, gr.update(value=""), gr.update(visible=False), gr.update(visible=True)

    ai_response = ""
    for chunk in chat_with_model(history, user_input, selected_model_display):
        if stop_event.is_set():
            session.close()
            history[-1][1] = RESPONSES["RESPONSE_1"]
            yield history, gr.update(value=""), gr.update(visible=True), gr.update(visible=False)
            return
        ai_response += chunk
        history[-1][1] = ai_response
        yield history, gr.update(value=""), gr.update(visible=False), gr.update(visible=True)

    session.close()
    yield history, gr.update(value=""), gr.update(visible=True), gr.update(visible=False)

def stop_response():
    stop_event.set()
    session.close()

def change_model(new_model_display):
    return [], new_model_display

def check_send_button_enabled(msg):
    return gr.update(visible=bool(msg.strip()), interactive=bool(msg.strip()))

with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as demo:
    user_history = gr.State([])
    selected_model = gr.State(MODEL_CHOICES[0])

    chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], height=600, show_copy_button=True, show_share_button=False, elem_id=AI_TYPES["AI_TYPE_2"])
    model_dropdown = gr.Dropdown(label=AI_TYPES["AI_TYPE_3"], show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0], interactive=True)
    msg = gr.Textbox(label=RESPONSES["RESPONSE_4"], show_label=False, placeholder=RESPONSES["RESPONSE_5"])

    with gr.Row():
        send_btn = gr.Button(RESPONSES["RESPONSE_6"], visible=True, interactive=False)
        stop_btn = gr.Button(RESPONSES["RESPONSE_7"], variant=RESPONSES["RESPONSE_9"], visible=False)

    model_dropdown.change(fn=change_model, inputs=[model_dropdown], outputs=[user_history, selected_model])
    send_btn.click(respond, inputs=[msg, user_history, selected_model], outputs=[chatbot, msg, send_btn, stop_btn])
    msg.change(fn=check_send_button_enabled, inputs=[msg], outputs=[send_btn])
    stop_btn.click(fn=stop_response, outputs=[send_btn, stop_btn])

demo.launch(share=True, show_api=False, favicon_path=AI_TYPES["AI_TYPE_5"])