Spaces:
Running
Running
import asyncio | |
import ollama | |
import json | |
import signal | |
shutdown_event = asyncio.Event() | |
def shutdown_handler(sig, frame): | |
"""Handles shutdown signals like Ctrl + C.""" | |
print("\nβ Shutdown requested! Stopping API...") | |
shutdown_event.set() | |
# Attach signal handlers for graceful termination | |
signal.signal(signal.SIGINT, shutdown_handler) | |
signal.signal(signal.SIGTERM, shutdown_handler) | |
async def generate_stream(query: str): | |
"""Generates streamed responses with cancellation support.""" | |
try: | |
stream = ollama.chat( | |
model="llama3.2", | |
messages=[{"role": "user", "content": query}], | |
stream=True | |
) | |
for chunk in stream: | |
if shutdown_event.is_set(): | |
print("β Stopping content generation...") | |
break # Exit loop when shutdown is requested | |
if "message" in chunk and "content" in chunk["message"]: | |
response_data = json.dumps({"content": chunk["message"]["content"]}) | |
yield f"data: {response_data}\n\n" | |
await asyncio.sleep(0.1) # Allow graceful processing | |
except asyncio.CancelledError: | |
print("β Stream cancelled by user.") | |
raise # Propagate cancellation | |
except Exception as e: | |
error_data = json.dumps({"error": str(e)}) | |
yield f"data: {error_data}\n\n" | |
async def generate_response(query: str): | |
"""Returns a non-streamed response.""" | |
try: | |
response = ollama.chat( | |
model="llama3.2", | |
messages=[{"role": "user", "content": query}] | |
) | |
return {"content": response["message"]["content"]} | |
except Exception as e: | |
return {"error": str(e)} | |