|
from flask import Flask, request, jsonify, send_file |
|
from flask_cors import CORS |
|
import tempfile |
|
import os |
|
import time |
|
import random |
|
import base64 |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
|
|
response_cache = {} |
|
|
|
|
|
TEMP_DIR = "/tmp/ai_responses" |
|
os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
|
|
|
QUICK_RESPONSES = [ |
|
"I understand what you're saying.", |
|
"I'm following your thoughts.", |
|
"I hear you loud and clear.", |
|
"That makes sense to me.", |
|
"I'm processing that information.", |
|
"I hear what you're saying.", |
|
"Interesting point.", |
|
"I see where you're coming from.", |
|
"That's a good perspective.", |
|
"I'm with you on that.", |
|
"Tell me more about that.", |
|
"I'm listening carefully.", |
|
"I appreciate your thoughts on this.", |
|
"That's an interesting way to look at it.", |
|
"I'm taking that into consideration." |
|
] |
|
|
|
|
|
QUESTION_RESPONSES = [ |
|
"That's a good question. Let me think about it.", |
|
"I'm considering different perspectives on that question.", |
|
"That's something I've been thinking about as well.", |
|
"That's an interesting question to explore.", |
|
"I'm processing your question and considering how to respond." |
|
] |
|
|
|
def get_quick_response(user_input): |
|
"""Generate a fast response based on simple rules""" |
|
|
|
cache_key = user_input.strip().lower() |
|
if cache_key in response_cache: |
|
return response_cache[cache_key] |
|
|
|
|
|
if not user_input or len(user_input.strip()) < 3: |
|
response = "I'm listening. Please tell me more." |
|
elif "?" in user_input: |
|
response = random.choice(QUESTION_RESPONSES) |
|
else: |
|
response = random.choice(QUICK_RESPONSES) |
|
|
|
|
|
response_cache[cache_key] = response |
|
|
|
|
|
if len(response_cache) > 100: |
|
keys_to_remove = list(response_cache.keys())[:-50] |
|
for k in keys_to_remove: |
|
response_cache.pop(k, None) |
|
|
|
return response |
|
|
|
@app.route("/chat", methods=["POST"]) |
|
def chat(): |
|
data = request.get_json() |
|
if not data or "text" not in data: |
|
return jsonify({"error": "Missing 'text' in request body"}), 400 |
|
|
|
try: |
|
user_input = data["text"] |
|
print(f"Text input: {user_input}") |
|
|
|
|
|
time.sleep(random.uniform(0.05, 0.15)) |
|
|
|
|
|
final_response = get_quick_response(user_input) |
|
print(f"Text response: {final_response}") |
|
|
|
return jsonify({"response": final_response}) |
|
except Exception as e: |
|
print(f"Error in chat endpoint: {str(e)}") |
|
return jsonify({"response": "I'm listening."}) |
|
|
|
@app.route("/talk", methods=["POST"]) |
|
def talk(): |
|
if "audio" not in request.files: |
|
return jsonify({"error": "No audio file"}), 400 |
|
|
|
audio_file = request.files["audio"] |
|
|
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav", dir=TEMP_DIR) as tmp: |
|
audio_path = tmp.name |
|
audio_file.save(audio_path) |
|
|
|
|
|
|
|
|
|
|
|
final_response = get_quick_response("Hello") |
|
|
|
|
|
|
|
tts_audio_path = audio_path.replace(".wav", "_reply.wav") |
|
|
|
|
|
time.sleep(random.uniform(0.1, 0.3)) |
|
|
|
|
|
import shutil |
|
shutil.copyfile(audio_path, tts_audio_path) |
|
|
|
|
|
try: |
|
response = send_file(tts_audio_path, mimetype="audio/wav") |
|
encoded_response = base64.b64encode(final_response.encode('utf-8')).decode('ascii') |
|
response.headers["X-Response-Text-Base64"] = encoded_response |
|
response.headers["Access-Control-Expose-Headers"] = "X-Response-Text-Base64" |
|
return response |
|
except Exception as e: |
|
print(f"Error sending file: {str(e)}") |
|
return jsonify({ |
|
"error": "Could not send audio response", |
|
"text_response": final_response |
|
}), 500 |
|
except Exception as e: |
|
print(f"Error in talk endpoint: {str(e)}") |
|
return jsonify({"error": str(e)}), 500 |
|
finally: |
|
|
|
try: |
|
if 'audio_path' in locals() and os.path.exists(audio_path): |
|
os.unlink(audio_path) |
|
if 'tts_audio_path' in locals() and os.path.exists(tts_audio_path) and tts_audio_path != audio_path: |
|
os.unlink(tts_audio_path) |
|
except Exception as cleanup_error: |
|
print(f"Error cleaning up files: {str(cleanup_error)}") |
|
|
|
@app.route("/quick_chat", methods=["POST"]) |
|
def quick_chat(): |
|
"""Alias for chat endpoint for compatibility""" |
|
return chat() |
|
|
|
@app.route("/status", methods=["GET"]) |
|
def status(): |
|
"""Simple status endpoint""" |
|
return jsonify({ |
|
"status": "ready", |
|
"message": "Simple response system running and ready" |
|
}) |
|
|
|
@app.route("/") |
|
def index(): |
|
return "Metaverse AI Character API running. Ultra-fast version." |
|
|
|
if __name__ == "__main__": |
|
print("Starting ultra-fast response API...") |
|
|
|
app.run(host="0.0.0.0", port=7860, threaded=True) |