Meroni-v.0.0.1 / app.py
aaronzwe's picture
Create app.py
3d624b6 verified
import os
import gradio as gr
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
import tempfile
import gtts
from textblob import TextBlob
import json
import psutil
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from contextlib import contextmanager
import plotly.graph_objects as go
import gc
# Setup logging
logging.basicConfig(filename="meroni.log", level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s")
# Configuration
MODEL_REPO = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF"
MODEL_FILE = "tinyllama-1.1b-chat-v1.0.Q2_K.gguf" # Q2_K for minimal memory
MODELS_DIR = "models"
MAX_HISTORY = 5
DEFAULT_N_CTX = 512 # Low to avoid crashes
DEFAULT_N_BATCH = 64 # Low for stability
# Ensure models directory exists
os.makedirs(MODELS_DIR, exist_ok=True)
# Dependency check
def check_dependencies():
required = ["gradio", "huggingface_hub", "llama_cpp", "gtts", "textblob", "psutil", "tenacity", "plotly"]
missing = []
for module in required:
try:
__import__(module)
except ImportError:
missing.append(module)
if missing:
return (False, f"Missing dependencies: {', '.join(missing)}. Please include them in requirements.txt.")
return (True, "All dependencies installed.")
# Check system resources
def suggest_performance_mode():
try:
mem = psutil.virtual_memory()
available_gb = mem.available / (1024 ** 3)
return available_gb > 4, available_gb
except Exception as e:
logging.error(f"Resource check failed: {e}")
return False, 0
# Download model
def download_model():
try:
model_path = os.path.join(MODELS_DIR, MODEL_FILE)
if not os.path.exists(model_path):
logging.info(f"Downloading model {MODEL_FILE}...")
model_path = hf_hub_download(
repo_id=MODEL_REPO,
filename=MODEL_FILE,
local_dir=MODELS_DIR
)
logging.info(f"Model downloaded to {model_path}")
return model_path
except Exception as e:
logging.error(f"Model download failed: {e}")
raise Exception("Failed to download model. Check internet connection.")
# Context manager for Llama
@contextmanager
def llama_context(*args, **kwargs):
llm = None
try:
llm = Llama(*args, **kwargs)
yield llm
except MemoryError:
logging.error("Out of memory during model loading.")
raise Exception("Not enough memory.")
except Exception as e:
logging.error(f"Model loading failed: {e}")
raise Exception("Failed to load model. Check meroni.log.")
finally:
if llm is not None:
try:
llm.close()
del llm
gc.collect()
except Exception as e:
logging.error(f"Model cleanup failed: {e}")
# Initialize model with retry
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=5))
def initialize_model(n_ctx=DEFAULT_N_CTX, n_batch=DEFAULT_N_BATCH):
model_path = download_model()
try:
with llama_context(
model_path=model_path,
n_ctx=n_ctx,
n_threads=2,
n_batch=n_batch,
verbose=False
) as llm:
return llm
except Exception as e:
logging.warning(f"Model initialization failed: {e}")
raise
# Fallback response
def fallback_response(message):
return f"I'm having trouble processing, but I hear you saying: '{message}'. How can I support you further?"
# System prompt
SYSTEM_PROMPT = """You are Meroni, a gentle, emotionally supportive AI companion.
Listen attentively, validate the user's feelings, and respond with warmth, empathy, and understanding.
Provide thoughtful, concise replies (2-4 sentences) that avoid generic phrases or emojis unless requested.
Use the conversation history and summary to make responses personal, and adapt to the user's emotional tone."""
# Sentiment analysis with keywords
def get_sentiment(message):
try:
keywords = {
"sad": ["sad", "down", "hurt"],
"angry": ["mad", "angry", "frustrated"],
"anxious": ["worried", "nervous", "scared"],
}
message_lower = message.lower()
for tone, words in keywords.items():
if any(w in message_lower for w in words):
return tone
analysis = TextBlob(message)
if analysis.sentiment.polarity > 0:
return "positive"
elif analysis.sentiment.polarity < 0:
return "negative"
return "neutral"
except Exception as e:
logging.error(f"Sentiment analysis error: {e}")
return "neutral"
# Summarize history
def summarize_history(history):
if not history:
return ""
summary = []
negative_count = 0
for msg in history[-3:]:
if msg["role"] == "user":
sentiment = get_sentiment(msg["content"])
if sentiment in ["sad", "angry", "anxious"]:
negative_count += 1
if negative_count > 1:
summary.append(f"User has expressed {negative_count} negative feelings recently.")
return "Summary: " + " ".join(summary) + "\n" if summary else ""
# Format prompt
def format_prompt(message, history, sentiment):
prompt = SYSTEM_PROMPT + "\n\n"
prompt += summarize_history(history)
if sentiment in ["sad", "angry", "anxious"]:
prompt += f"The user seems {sentiment}. Provide extra comfort and support.\n"
elif sentiment == "positive":
prompt += "The user seems happy. Reflect their positive mood.\n"
for msg in history[-MAX_HISTORY:]:
if msg["role"] == "user" and msg["content"]:
prompt += f"User: {msg['content']}\n"
if msg["role"] == "assistant" and msg["content"]:
prompt += f"Meroni: {msg['content']}\n\n"
prompt += f"User: {message}\nMeroni: "
return prompt
# Generate response
def generate_response(message, history, llm):
if llm is None:
logging.warning("Model unavailable, using fallback.")
return fallback_response(message)
try:
sentiment = get_sentiment(message)
prompt = format_prompt(message, history, sentiment)
response = llm(
prompt,
max_tokens=128,
temperature=0.7,
top_p=0.9,
stop=["User:", "\n\n"],
echo=False
)
reply = response['choices'][0]['text'].strip()
return reply or "I'm here for you. Could you share a bit more?"
except MemoryError:
logging.error("Memory error during generation.")
return "I'm running low on memory. Try a shorter message."
except Exception as e:
logging.error(f"Response generation error: {e}")
return fallback_response(message)
# Generate reflective prompt
def generate_reflective_prompt(history):
try:
if not history:
return "What's on your mind today? How are you feeling about it?"
last_user_msg = next((m["content"] for m in reversed(history) if m["role"] == "user"), "")
sentiment = get_sentiment(last_user_msg)
if sentiment in ["sad", "angry", "anxious"]:
return f"Why do you think you're feeling {sentiment}? What might help you feel a bit better?"
return f"What’s been going well for you lately? How can we explore that more?"
except Exception as e:
logging.error(f"Reflective prompt error: {e}")
return "Let’s reflect together. What’s on your mind?"
# Text-to-speech with retry
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
def text_to_speech(text):
try:
if not text:
return None
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as fp:
temp_filename = fp.name
tts = gtts.gTTS(text)
tts.save(temp_filename)
return temp_filename
except Exception as e:
logging.error(f"Text-to-speech error: {e}")
raise Exception("Text-to-speech failed.")
# Gradio interface
def create_interface():
global llm
llm = None # Lazy loading
with gr.Blocks(css="""
footer {visibility: hidden}
:root {--primary-color: #4CAF50;}
.dark {--primary-color: #333; --background: #222;}
.blue {--primary-color: #2196F3;}
.yellow {--primary-color: #FFC107;}
""") as demo:
gr.Markdown("""
# 🌸 Meroni – Your Calm AI Companion
A gentle AI for mental wellness. Share your thoughts, feelings, or simply chat.
Meroni remembers your conversations privately on your device!
""")
# Dependency status
dep_status, dep_message = check_dependencies()
if not dep_status:
gr.Markdown(f"⚠️ {dep_message}")
# Settings
with gr.Row():
n_ctx_slider = gr.Slider(minimum=256, maximum=1024, step=256, label="Context Size", value=DEFAULT_N_CTX)
n_batch_slider = gr.Slider(minimum=32, maximum=128, step=32, label="Batch Size", value=DEFAULT_N_BATCH)
theme = gr.Dropdown(choices=["light", "dark", "blue", "yellow"], label="Theme", value="light")
# Hidden state for history
history_loader = gr.State(value="[]")
# Mood tracker
mood_plot = gr.Plot(label="Mood Trends")
# Chatbot
chatbot = gr.Chatbot(
label="Conversation with Meroni",
elem_id="meroni-chat",
height=400,
type="messages"
)
# Audio output
audio_output = gr.Audio(
label="Meroni's Voice",
autoplay=True
)
# File output for export
file_output = gr.File(label="Exported History")
with gr.Row():
msg = gr.Textbox(
placeholder="Type your thoughts here...",
lines=2,
max_lines=10,
container=True
)
submit_btn = gr.Button("Send", variant="primary")
with gr.Row():
clear_btn = gr.Button("New Conversation", variant="secondary")
load_btn = gr.Button("Load Previous Conversation", variant="secondary")
reflect_btn = gr.Button("Reflect", variant="secondary")
export_btn = gr.Button("Download History", variant="secondary")
speak_toggle = gr.Checkbox(label="Enable Speech", value=True)
# JavaScript for local storage, encryption, and theme
gr.HTML("""
<script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/4.1.1/crypto-js.min.js"></script>
<script>
// Encryption key
let encryptionKey = localStorage.getItem('meroni_key');
if (!encryptionKey) {
encryptionKey = prompt('Set a password for your chat history:') || 'default';
localStorage.setItem('meroni_key', encryptionKey);
}
// Local storage
document.addEventListener('DOMContentLoaded', function() {
let savedHistory = localStorage.getItem('meroni_history') || "[]";
try {
savedHistory = CryptoJS.AES.decrypt(savedHistory, encryptionKey).toString(CryptoJS.enc.Utf8);
} catch (e) {
console.error('Decryption failed:', e);
}
try {
document.getElementById('history_loader').value = savedHistory;
} catch (e) {
console.error('History loader not found:', e);
}
});
function saveHistory() {
try {
const history = document.querySelector('[data-testid="chatbot"]').value;
const encrypted = CryptoJS.AES.encrypt(JSON.stringify(history), encryptionKey).toString();
localStorage.setItem('meroni_history', encrypted);
} catch (e) {
console.error('Failed to save history:', e);
alert('Could not save history. Your browser may restrict local storage.');
}
}
const observer = new MutationObserver(saveHistory);
try {
observer.observe(document.querySelector('[data-testid="chatbot"]'), { childList: true, subtree: true });
} catch (e) {
console.error('Observer setup failed:', e);
}
// Theme switching
document.getElementById('theme').addEventListener('change', function() {
const theme = this.value;
document.body.className = theme;
localStorage.setItem('meroni_theme', theme);
});
// Browser TTS fallback
function browserTTS(text) {
try {
const utterance = new SpeechSynthesisUtterance(text);
utterance.lang = 'en-US';
window.speechSynthesis.speak(utterance);
} catch (e) {
console.error('Browser TTS failed:', e);
}
}
</script>
""")
# Onboarding modal
gr.HTML("""
<script>
if (!localStorage.getItem('meroni_onboarded')) {
alert('Welcome to Meroni!\\n1. Chat with your AI companion.\\n2. Save chats privately on your device.\\n3. Reflect anytime with journal prompts.');
localStorage.setItem('meroni_onboarded', 'true');
}
</script>
""")
# Event handlers
def user_input(message, history):
if not message.strip():
return "", history or []
history = history or []
if history and not all("role" in m and "content" in m for m in history):
logging.error("Invalid history format")
history = []
return "", history + [{"role": "user", "content": message}]
def bot_response(history, speak_enabled, n_ctx, n_batch):
global llm
if not history or history[-1].get("role") != "user":
return history or [], None
try:
# Lazy load model
if llm is None:
llm = initialize_model(int(n_ctx), int(n_batch))
user_message = history[-1]["content"]
bot_message = generate_response(user_message, history[:-1], llm)
history.append({"role": "assistant", "content": bot_message})
if speak_enabled:
try:
speech_file = text_to_speech(bot_message)
return history, speech_file
except Exception:
logging.warning("gTTS failed, trying browser TTS.")
return history, "browser_tts:" + bot_message
return history, None
except Exception as e:
logging.error(f"Bot response error: {e}")
history.append({"role": "assistant", "content": f"Sorry, something went wrong: {str(e)}. Try a shorter message."})
return history, None
def update_mood_tracker(history):
try:
moods = [get_sentiment(m["content"]) for m in history if m["role"] == "user"]
data = [{"mood": m, "count": moods.count(m)} for m in set(moods) if m != "neutral"]
if not data:
return None
fig = go.Figure(data=[
go.Bar(x=[d["mood"] for d in data], y=[d["count"] for d in data])
])
fig.update_layout(title="Mood Trends", xaxis_title="Mood", yaxis_title="Count")
return fig
except Exception as e:
logging.error(f"Mood tracker error: {e}")
return None
def clear_history():
return []
def load_history(history_loader):
try:
history = json.loads(history_loader)
return history, history
except Exception as e:
logging.error(f"History load error: {e}")
return [], []
def reflect(history):
try:
prompt = generate_reflective_prompt(history)
history = history or []
history.append({"role": "assistant", "content": prompt})
return history, None
except Exception as e:
logging.error(f"Reflect error: {e}")
return history or [], None
def welcome(speak_enabled):
welcome_msg = "Hello! I'm Meroni, your calm AI companion. How are you feeling today?"
history = [{"role": "assistant", "content": welcome_msg}]
if speak_enabled:
try:
speech_file = text_to_speech(welcome_msg)
return history, speech_file
except Exception:
history[0]["content"] += " (Sorry, no audio.)"
return history, None
return history, None
def export_history(history):
try:
if not history:
return None
with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w') as f:
json.dump(history, f)
return f.name
except Exception as e:
logging.error(f"Export history error: {e}")
return None
def update_theme(theme):
return None
# Connect components
submit_event = msg.submit(
user_input, [msg, chatbot], [msg, chatbot], queue=False
).then(
bot_response, [chatbot, speak_toggle, n_ctx_slider, n_batch_slider], [chatbot, audio_output], queue=True
).then(
update_mood_tracker, chatbot, mood_plot
)
submit_btn.click(
user_input, [msg, chatbot], [msg, chatbot], queue=False
).then(
bot_response, [chatbot, speak_toggle, n_ctx_slider, n_batch_slider], [chatbot, audio_output], queue=True
).then(
update_mood_tracker, chatbot, mood_plot
)
clear_btn.click(clear_history, None, chatbot).then(
welcome, speak_toggle, [chatbot, audio_output]
).then(
update_mood_tracker, chatbot, mood_plot
)
load_btn.click(
load_history, history_loader, [chatbot, history_loader]
).then(
update_mood_tracker, chatbot, mood_plot
)
reflect_btn.click(
reflect, chatbot, [chatbot, audio_output]
).then(
update_mood_tracker, chatbot, mood_plot
)
export_btn.click(
export_history, chatbot, file_output
)
theme.change(
update_theme, theme, None
)
demo.load(welcome, speak_toggle, [chatbot, audio_output]).then(
update_mood_tracker, chatbot, mood_plot
)
return demo
if __name__ == "__main__":
try:
# Check dependencies
dep_status, dep_message = check_dependencies()
if not dep_status:
print(dep_message)
exit(1)
# Launch interface
demo = create_interface()
demo.launch()
except Exception as e:
logging.error(f"App launch failed: {e}")
print(f"Failed to launch Meroni: {e}. Check meroni.log for details.")