hadadrjt commited on
Commit
1ae1905
·
1 Parent(s): 39e6933

fixup! ai: Better handling of load balancing.

Browse files

98abcc71727c051b4892144f797138cfe8d020c9.

Files changed (1) hide show
  1. jarvis.py +36 -8
jarvis.py CHANGED
@@ -19,6 +19,7 @@ import io
19
  import uuid
20
  import concurrent.futures
21
  import itertools
 
22
 
23
  from openai import OpenAI
24
 
@@ -37,14 +38,19 @@ from pptx import Presentation
37
  os.system("apt-get update -q -y && apt-get install -q -y tesseract-ocr tesseract-ocr-eng tesseract-ocr-ind libleptonica-dev libtesseract-dev")
38
 
39
  LINUX_SERVER_HOSTS = [host for host in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if host]
 
 
 
40
  LINUX_SERVER_PROVIDER_KEYS = [key for key in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if key]
 
 
41
 
42
  AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 7)}
43
  RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 10)}
44
 
45
  MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}"))
46
  MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}"))
47
- MODEL_CHOICES = list(MODEL_MAPPING.values())
48
  DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}"))
49
 
50
  META_TAGS = os.getenv("META_TAGS")
@@ -53,6 +59,21 @@ ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS"))
53
 
54
  ACTIVE_CANDIDATE = None
55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  class SessionWithID(requests.Session):
57
  def __init__(self):
58
  super().__init__()
@@ -121,15 +142,20 @@ def process_ai_response(ai_text):
121
  return ai_text
122
 
123
  def fetch_response(host, provider_key, selected_model, messages, model_config, session_id):
124
- client = OpenAI(base_url=host, api_key=provider_key)
125
- data = {"model": selected_model, "messages": messages, **model_config}
126
- response = client.chat.completions.create(extra_body={"optillm_approach": "rto|re2|cot_reflection|self_consistency|plansearch|leap|z3|bon|moa|mcts|mcp|router|privacy|executecode|json", "session_id": session_id}, **data)
127
- ai_text = response.choices[0].message.content if response.choices and response.choices[0].message and response.choices[0].message.content else RESPONSES["RESPONSE_2"]
128
- return process_ai_response(ai_text)
 
 
 
 
 
129
 
130
  def chat_with_model(history, user_input, selected_model_display, sess):
131
  global ACTIVE_CANDIDATE
132
- if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS:
133
  return RESPONSES["RESPONSE_3"]
134
  if not hasattr(sess, "session_id"):
135
  sess.session_id = str(uuid.uuid4())
@@ -143,7 +169,9 @@ def chat_with_model(history, user_input, selected_model_display, sess):
143
  return fetch_response(ACTIVE_CANDIDATE[0], ACTIVE_CANDIDATE[1], selected_model, messages, model_config, sess.session_id)
144
  except Exception:
145
  ACTIVE_CANDIDATE = None
146
- candidates = [(host, key) for host in LINUX_SERVER_HOSTS for key in LINUX_SERVER_PROVIDER_KEYS]
 
 
147
  random.shuffle(candidates)
148
  with concurrent.futures.ThreadPoolExecutor(max_workers=len(candidates)) as executor:
149
  futures = {executor.submit(fetch_response, host, key, selected_model, messages, model_config, sess.session_id): (host, key) for (host, key) in candidates}
 
19
  import uuid
20
  import concurrent.futures
21
  import itertools
22
+ import threading
23
 
24
  from openai import OpenAI
25
 
 
38
  os.system("apt-get update -q -y && apt-get install -q -y tesseract-ocr tesseract-ocr-eng tesseract-ocr-ind libleptonica-dev libtesseract-dev")
39
 
40
  LINUX_SERVER_HOSTS = [host for host in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if host]
41
+ LINUX_SERVER_HOSTS_MARKED = set()
42
+ LINUX_SERVER_HOSTS_ATTEMPTS = {}
43
+
44
  LINUX_SERVER_PROVIDER_KEYS = [key for key in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if key]
45
+ LINUX_SERVER_PROVIDER_KEYS_MARKED = set()
46
+ LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS = {}
47
 
48
  AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 7)}
49
  RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 10)}
50
 
51
  MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}"))
52
  MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}"))
53
+ MODEL_CHOICES = list(MODEL_MAPPING.values()) if MODEL_MAPPING else []
54
  DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}"))
55
 
56
  META_TAGS = os.getenv("META_TAGS")
 
59
 
60
  ACTIVE_CANDIDATE = None
61
 
62
+ def get_available_items(items, marked):
63
+ available = [item for item in items if item not in marked]
64
+ random.shuffle(available)
65
+ return available
66
+
67
+ def marked_item(item, marked, attempts):
68
+ marked.add(item)
69
+ attempts[item] = attempts.get(item, 0) + 1
70
+ if attempts[item] >= 3:
71
+ def remove_fail():
72
+ marked.discard(item)
73
+ if item in attempts:
74
+ del attempts[item]
75
+ threading.Timer(300, remove_fail).start()
76
+
77
  class SessionWithID(requests.Session):
78
  def __init__(self):
79
  super().__init__()
 
142
  return ai_text
143
 
144
  def fetch_response(host, provider_key, selected_model, messages, model_config, session_id):
145
+ try:
146
+ client = OpenAI(base_url=host, api_key=provider_key)
147
+ data = {"model": selected_model, "messages": messages, **model_config}
148
+ response = client.chat.completions.create(extra_body={"optillm_approach": "rto|re2|cot_reflection|self_consistency|plansearch|leap|z3|bon|moa|mcts|mcp|router|privacy|executecode|json", "session_id": session_id}, **data)
149
+ ai_text = response.choices[0].message.content if response.choices and response.choices[0].message and response.choices[0].message.content else RESPONSES["RESPONSE_2"]
150
+ return process_ai_response(ai_text)
151
+ except Exception:
152
+ marked_item(provider_key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
153
+ marked_item(host, LINUX_SERVER_HOSTS_MARKED, LINUX_SERVER_HOSTS_ATTEMPTS)
154
+ raise
155
 
156
  def chat_with_model(history, user_input, selected_model_display, sess):
157
  global ACTIVE_CANDIDATE
158
+ if not get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED) or not get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_MARKED):
159
  return RESPONSES["RESPONSE_3"]
160
  if not hasattr(sess, "session_id"):
161
  sess.session_id = str(uuid.uuid4())
 
169
  return fetch_response(ACTIVE_CANDIDATE[0], ACTIVE_CANDIDATE[1], selected_model, messages, model_config, sess.session_id)
170
  except Exception:
171
  ACTIVE_CANDIDATE = None
172
+ available_keys = get_available_items(LINUX_SERVER_PROVIDER_KEYS, LINUX_SERVER_PROVIDER_KEYS_MARKED)
173
+ available_servers = get_available_items(LINUX_SERVER_HOSTS, LINUX_SERVER_HOSTS_MARKED)
174
+ candidates = [(host, key) for host in available_servers for key in available_keys]
175
  random.shuffle(candidates)
176
  with concurrent.futures.ThreadPoolExecutor(max_workers=len(candidates)) as executor:
177
  futures = {executor.submit(fetch_response, host, key, selected_model, messages, model_config, sess.session_id): (host, key) for (host, key) in candidates}