Bahodir Nematjonov commited on
Commit
39308e2
·
1 Parent(s): 7adef4f

cancel signal handling

Browse files
Files changed (3) hide show
  1. .gitignore +3 -1
  2. main.py +14 -6
  3. utils.py +23 -2
.gitignore CHANGED
@@ -2,4 +2,6 @@
2
  cache
3
  users.db
4
  .env
5
- __pycache__
 
 
 
2
  cache
3
  users.db
4
  .env
5
+ __pycache__
6
+ -d
7
+ -H
main.py CHANGED
@@ -4,7 +4,7 @@ from fastapi.staticfiles import StaticFiles
4
  from jose import JWTError
5
  from schemas import UserRegister, TokenResponse, RefreshTokenRequest, QueryInput
6
  from auth import register_user, get_db, authenticate_user, create_token, verify_token, verify_access_token, Session
7
- from utils import generate_stream, generate_response
8
  from fastapi.security import OAuth2PasswordRequestForm
9
  from pathlib import Path
10
  from datetime import timedelta
@@ -103,11 +103,19 @@ async def generate(
103
  username: str = Depends(verify_access_token),
104
  stream: bool = Query(False, description="Enable streaming response"),
105
  ):
106
- """API endpoint that supports both streaming and non-streaming responses."""
107
- if stream:
108
- return StreamingResponse(generate_stream(query_input.query), media_type="text/event-stream")
109
- else:
110
- return JSONResponse(await generate_response(query_input.query))
 
 
 
 
 
 
 
 
111
  # WebSocket endpoint for streaming
112
  @app.on_event("startup")
113
  async def startup_event():
 
4
  from jose import JWTError
5
  from schemas import UserRegister, TokenResponse, RefreshTokenRequest, QueryInput
6
  from auth import register_user, get_db, authenticate_user, create_token, verify_token, verify_access_token, Session
7
+ from utils import generate_stream, generate_response, shutdown_event
8
  from fastapi.security import OAuth2PasswordRequestForm
9
  from pathlib import Path
10
  from datetime import timedelta
 
103
  username: str = Depends(verify_access_token),
104
  stream: bool = Query(False, description="Enable streaming response"),
105
  ):
106
+ """Handles both streaming and non-streaming responses, with shutdown detection."""
107
+ if shutdown_event.is_set():
108
+ return JSONResponse({"message": "Server shutting down..."})
109
+
110
+ try:
111
+ if stream:
112
+ return StreamingResponse(generate_stream(query_input.query), media_type="text/event-stream")
113
+ else:
114
+ return JSONResponse(await generate_response(query_input.query))
115
+ except Exception as e:
116
+ logging.error(f"Error in generate endpoint: {e}")
117
+ raise HTTPException(status_code=500, detail="Internal server error")
118
+
119
  # WebSocket endpoint for streaming
120
  @app.on_event("startup")
121
  async def startup_event():
utils.py CHANGED
@@ -1,9 +1,21 @@
1
  import asyncio
2
  import ollama
3
  import json
 
 
 
 
 
 
 
 
 
 
 
 
4
 
5
  async def generate_stream(query: str):
6
- """Generates streamed responses from Ollama using LLaMA 3 in JSON format."""
7
  try:
8
  stream = ollama.chat(
9
  model="llama3.2",
@@ -12,9 +24,18 @@ async def generate_stream(query: str):
12
  )
13
 
14
  for chunk in stream:
 
 
 
 
15
  if "message" in chunk and "content" in chunk["message"]:
16
  response_data = json.dumps({"content": chunk["message"]["content"]})
17
- yield f"data: {response_data}\n\n" # SSE format
 
 
 
 
 
18
 
19
  except Exception as e:
20
  error_data = json.dumps({"error": str(e)})
 
1
  import asyncio
2
  import ollama
3
  import json
4
+ import signal
5
+
6
+ shutdown_event = asyncio.Event()
7
+
8
+ def shutdown_handler(sig, frame):
9
+ """Handles shutdown signals like Ctrl + C."""
10
+ print("\n⛔ Shutdown requested! Stopping API...")
11
+ shutdown_event.set()
12
+
13
+ # Attach signal handlers for graceful termination
14
+ signal.signal(signal.SIGINT, shutdown_handler)
15
+ signal.signal(signal.SIGTERM, shutdown_handler)
16
 
17
  async def generate_stream(query: str):
18
+ """Generates streamed responses with cancellation support."""
19
  try:
20
  stream = ollama.chat(
21
  model="llama3.2",
 
24
  )
25
 
26
  for chunk in stream:
27
+ if shutdown_event.is_set():
28
+ print("⛔ Stopping content generation...")
29
+ break # Exit loop when shutdown is requested
30
+
31
  if "message" in chunk and "content" in chunk["message"]:
32
  response_data = json.dumps({"content": chunk["message"]["content"]})
33
+ yield f"data: {response_data}\n\n"
34
+ await asyncio.sleep(0.1) # Allow graceful processing
35
+
36
+ except asyncio.CancelledError:
37
+ print("⛔ Stream cancelled by user.")
38
+ raise # Propagate cancellation
39
 
40
  except Exception as e:
41
  error_data = json.dumps({"error": str(e)})