Spaces:
sierrafr
/
Runtime error

hadadrjt commited on
Commit
e864bae
·
1 Parent(s): 452f16b

ai: Switch to production task cancelation.

Browse files
Files changed (1) hide show
  1. jarvis.py +27 -44
jarvis.py CHANGED
@@ -62,6 +62,8 @@ class SessionWithID(requests.Session):
62
  def __init__(sess):
63
  super().__init__()
64
  sess.session_id = str(uuid.uuid4())
 
 
65
 
66
  def create_session():
67
  return SessionWithID()
@@ -69,6 +71,8 @@ def create_session():
69
  def ensure_stop_event(sess):
70
  if not hasattr(sess, "stop_event"):
71
  sess.stop_event = asyncio.Event()
 
 
72
 
73
  def marked_item(item, marked, attempts):
74
  marked.add(item)
@@ -197,7 +201,7 @@ def extract_file_content(fp):
197
  except Exception as e:
198
  return f"{fp}: {e}"
199
 
200
- async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_event):
201
  for t in [0.5, 1]:
202
  try:
203
  async with httpx.AsyncClient(timeout=t) as client:
@@ -206,7 +210,7 @@ async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_eve
206
  marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
207
  return
208
  async for line in response.aiter_lines():
209
- if stop_event.is_set():
210
  return
211
  if not line:
212
  continue
@@ -219,10 +223,10 @@ async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_eve
219
  if isinstance(j, dict) and j.get("choices"):
220
  for ch in j["choices"]:
221
  delta = ch.get("delta", {})
222
- if "reasoning" in delta and delta["reasoning"] is not None and delta["reasoning"] != "":
223
- decoded_reasoning = delta["reasoning"].encode('utf-8').decode('unicode_escape')
224
- yield ("reasoning", decoded_reasoning)
225
- if "content" in delta and delta["content"] is not None and delta["content"] != "":
226
  yield ("content", delta["content"])
227
  except:
228
  continue
@@ -234,57 +238,36 @@ async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_eve
234
  async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt):
235
  ensure_stop_event(sess)
236
  sess.stop_event.clear()
 
237
  if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS:
238
  yield ("content", RESPONSES["RESPONSE_3"])
239
  return
240
  if not hasattr(sess, "session_id") or not sess.session_id:
241
  sess.session_id = str(uuid.uuid4())
242
- sess.stop_event = asyncio.Event()
243
- if not hasattr(sess, "active_candidate"):
244
- sess.active_candidate = None
245
  model_key = get_model_key(model_display)
246
  cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG)
247
  msgs = [{"role": "user", "content": u} for u, _ in history] + [{"role": "assistant", "content": a} for _, a in history if a]
248
  prompt = INTERNAL_TRAINING_DATA if model_key == DEFAULT_MODEL_KEY and INTERNAL_TRAINING_DATA else (custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT))
249
  msgs.insert(0, {"role": "system", "content": prompt})
250
  msgs.append({"role": "user", "content": user_input})
251
- if sess.active_candidate:
252
- async for chunk in fetch_response_stream_async(sess.active_candidate[0], sess.active_candidate[1], model_key, msgs, cfg, sess.session_id, sess.stop_event):
253
- if sess.stop_event.is_set():
 
 
 
 
254
  return
 
255
  yield chunk
256
- return
257
- jarvis = False
258
- responses_success = False
259
- keys = list(LINUX_SERVER_PROVIDER_KEYS)
260
- hosts = list(LINUX_SERVER_HOSTS)
261
- random.shuffle(keys)
262
- random.shuffle(hosts)
263
- for k in keys:
264
- for h in hosts:
265
- jarvis = True
266
- stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event)
267
- responses = ""
268
- got_responses = False
269
- async for chunk in stream_gen:
270
- if sess.stop_event.is_set():
271
- return
272
- if not got_responses:
273
- got_responses = True
274
- sess.active_candidate = (h, k)
275
- responses += chunk[1]
276
- yield chunk
277
- if got_responses and responses.strip():
278
- responses_success = True
279
- return
280
- if not jarvis:
281
- yield ("content", RESPONSES["RESPONSE_3"])
282
- elif not responses_success:
283
- yield ("content", RESPONSES["RESPONSE_2"])
284
 
285
  async def respond_async(multi, history, model_display, sess, custom_prompt):
286
  ensure_stop_event(sess)
287
  sess.stop_event.clear()
 
288
  msg_input = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])}
289
  if not msg_input["text"] and not msg_input["files"]:
290
  yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
@@ -304,7 +287,7 @@ async def respond_async(multi, history, model_display, sess, custom_prompt):
304
  content_started = False
305
  ignore_reasoning = False
306
  async for typ, chunk in chat_with_model_async(history, inp, model_display, sess, custom_prompt):
307
- if sess.stop_event.is_set():
308
  break
309
  if typ == "reasoning":
310
  if ignore_reasoning:
@@ -329,10 +312,10 @@ async def respond_async(multi, history, model_display, sess, custom_prompt):
329
  while True:
330
  done, _ = await asyncio.wait({stop_task, asyncio.create_task(queue.get())}, return_when=asyncio.FIRST_COMPLETED)
331
  if stop_task in done:
 
332
  bg_task.cancel()
333
  history[-1][1] = RESPONSES["RESPONSE_1"]
334
  yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
335
- sess.stop_event.clear()
336
  return
337
  for d in done:
338
  result = d.result()
@@ -356,10 +339,10 @@ def change_model(new):
356
  def stop_response(history, sess):
357
  ensure_stop_event(sess)
358
  sess.stop_event.set()
 
359
  if history:
360
  history[-1][1] = RESPONSES["RESPONSE_1"]
361
- new_sess = create_session()
362
- return history, None, new_sess
363
 
364
  with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis:
365
  user_history = gr.State([])
 
62
  def __init__(sess):
63
  super().__init__()
64
  sess.session_id = str(uuid.uuid4())
65
+ sess.stop_event = asyncio.Event()
66
+ sess.cancel_token = {"cancelled": False}
67
 
68
  def create_session():
69
  return SessionWithID()
 
71
  def ensure_stop_event(sess):
72
  if not hasattr(sess, "stop_event"):
73
  sess.stop_event = asyncio.Event()
74
+ if not hasattr(sess, "cancel_token"):
75
+ sess.cancel_token = {"cancelled": False}
76
 
77
  def marked_item(item, marked, attempts):
78
  marked.add(item)
 
201
  except Exception as e:
202
  return f"{fp}: {e}"
203
 
204
+ async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_event, cancel_token):
205
  for t in [0.5, 1]:
206
  try:
207
  async with httpx.AsyncClient(timeout=t) as client:
 
210
  marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
211
  return
212
  async for line in response.aiter_lines():
213
+ if stop_event.is_set() or cancel_token["cancelled"]:
214
  return
215
  if not line:
216
  continue
 
223
  if isinstance(j, dict) and j.get("choices"):
224
  for ch in j["choices"]:
225
  delta = ch.get("delta", {})
226
+ if "reasoning" in delta and delta["reasoning"]:
227
+ decoded = delta["reasoning"].encode('utf-8').decode('unicode_escape')
228
+ yield ("reasoning", decoded)
229
+ if "content" in delta and delta["content"]:
230
  yield ("content", delta["content"])
231
  except:
232
  continue
 
238
  async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt):
239
  ensure_stop_event(sess)
240
  sess.stop_event.clear()
241
+ sess.cancel_token["cancelled"] = False
242
  if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS:
243
  yield ("content", RESPONSES["RESPONSE_3"])
244
  return
245
  if not hasattr(sess, "session_id") or not sess.session_id:
246
  sess.session_id = str(uuid.uuid4())
 
 
 
247
  model_key = get_model_key(model_display)
248
  cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG)
249
  msgs = [{"role": "user", "content": u} for u, _ in history] + [{"role": "assistant", "content": a} for _, a in history if a]
250
  prompt = INTERNAL_TRAINING_DATA if model_key == DEFAULT_MODEL_KEY and INTERNAL_TRAINING_DATA else (custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT))
251
  msgs.insert(0, {"role": "system", "content": prompt})
252
  msgs.append({"role": "user", "content": user_input})
253
+ candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS]
254
+ random.shuffle(candidates)
255
+ for h, k in candidates:
256
+ stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token)
257
+ got_responses = False
258
+ async for chunk in stream_gen:
259
+ if sess.stop_event.is_set() or sess.cancel_token["cancelled"]:
260
  return
261
+ got_responses = True
262
  yield chunk
263
+ if got_responses:
264
+ return
265
+ yield ("content", RESPONSES["RESPONSE_2"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
 
267
  async def respond_async(multi, history, model_display, sess, custom_prompt):
268
  ensure_stop_event(sess)
269
  sess.stop_event.clear()
270
+ sess.cancel_token["cancelled"] = False
271
  msg_input = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])}
272
  if not msg_input["text"] and not msg_input["files"]:
273
  yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
 
287
  content_started = False
288
  ignore_reasoning = False
289
  async for typ, chunk in chat_with_model_async(history, inp, model_display, sess, custom_prompt):
290
+ if sess.stop_event.is_set() or sess.cancel_token["cancelled"]:
291
  break
292
  if typ == "reasoning":
293
  if ignore_reasoning:
 
312
  while True:
313
  done, _ = await asyncio.wait({stop_task, asyncio.create_task(queue.get())}, return_when=asyncio.FIRST_COMPLETED)
314
  if stop_task in done:
315
+ sess.cancel_token["cancelled"] = True
316
  bg_task.cancel()
317
  history[-1][1] = RESPONSES["RESPONSE_1"]
318
  yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
 
319
  return
320
  for d in done:
321
  result = d.result()
 
339
  def stop_response(history, sess):
340
  ensure_stop_event(sess)
341
  sess.stop_event.set()
342
+ sess.cancel_token["cancelled"] = True
343
  if history:
344
  history[-1][1] = RESPONSES["RESPONSE_1"]
345
+ return history, None, create_session()
 
346
 
347
  with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis:
348
  user_history = gr.State([])