Spaces:
Build error
Build error
import os | |
import gradio as gr | |
from huggingface_hub import hf_hub_download | |
from llama_cpp import Llama | |
import tempfile | |
import gtts | |
from textblob import TextBlob | |
import json | |
import psutil | |
import logging | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
from contextlib import contextmanager | |
import plotly.graph_objects as go | |
import gc | |
# Setup logging | |
logging.basicConfig(filename="meroni.log", level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s") | |
# Configuration | |
MODEL_REPO = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" | |
MODEL_FILE = "tinyllama-1.1b-chat-v1.0.Q2_K.gguf" # Q2_K for minimal memory | |
MODELS_DIR = "models" | |
MAX_HISTORY = 5 | |
DEFAULT_N_CTX = 512 # Low to avoid crashes | |
DEFAULT_N_BATCH = 64 # Low for stability | |
# Ensure models directory exists | |
os.makedirs(MODELS_DIR, exist_ok=True) | |
# Dependency check | |
def check_dependencies(): | |
required = ["gradio", "huggingface_hub", "llama_cpp", "gtts", "textblob", "psutil", "tenacity", "plotly"] | |
missing = [] | |
for module in required: | |
try: | |
__import__(module) | |
except ImportError: | |
missing.append(module) | |
if missing: | |
return (False, f"Missing dependencies: {', '.join(missing)}. Please include them in requirements.txt.") | |
return (True, "All dependencies installed.") | |
# Check system resources | |
def suggest_performance_mode(): | |
try: | |
mem = psutil.virtual_memory() | |
available_gb = mem.available / (1024 ** 3) | |
return available_gb > 4, available_gb | |
except Exception as e: | |
logging.error(f"Resource check failed: {e}") | |
return False, 0 | |
# Download model | |
def download_model(): | |
try: | |
model_path = os.path.join(MODELS_DIR, MODEL_FILE) | |
if not os.path.exists(model_path): | |
logging.info(f"Downloading model {MODEL_FILE}...") | |
model_path = hf_hub_download( | |
repo_id=MODEL_REPO, | |
filename=MODEL_FILE, | |
local_dir=MODELS_DIR | |
) | |
logging.info(f"Model downloaded to {model_path}") | |
return model_path | |
except Exception as e: | |
logging.error(f"Model download failed: {e}") | |
raise Exception("Failed to download model. Check internet connection.") | |
# Context manager for Llama | |
def llama_context(*args, **kwargs): | |
llm = None | |
try: | |
llm = Llama(*args, **kwargs) | |
yield llm | |
except MemoryError: | |
logging.error("Out of memory during model loading.") | |
raise Exception("Not enough memory.") | |
except Exception as e: | |
logging.error(f"Model loading failed: {e}") | |
raise Exception("Failed to load model. Check meroni.log.") | |
finally: | |
if llm is not None: | |
try: | |
llm.close() | |
del llm | |
gc.collect() | |
except Exception as e: | |
logging.error(f"Model cleanup failed: {e}") | |
# Initialize model with retry | |
def initialize_model(n_ctx=DEFAULT_N_CTX, n_batch=DEFAULT_N_BATCH): | |
model_path = download_model() | |
try: | |
with llama_context( | |
model_path=model_path, | |
n_ctx=n_ctx, | |
n_threads=2, | |
n_batch=n_batch, | |
verbose=False | |
) as llm: | |
return llm | |
except Exception as e: | |
logging.warning(f"Model initialization failed: {e}") | |
raise | |
# Fallback response | |
def fallback_response(message): | |
return f"I'm having trouble processing, but I hear you saying: '{message}'. How can I support you further?" | |
# System prompt | |
SYSTEM_PROMPT = """You are Meroni, a gentle, emotionally supportive AI companion. | |
Listen attentively, validate the user's feelings, and respond with warmth, empathy, and understanding. | |
Provide thoughtful, concise replies (2-4 sentences) that avoid generic phrases or emojis unless requested. | |
Use the conversation history and summary to make responses personal, and adapt to the user's emotional tone.""" | |
# Sentiment analysis with keywords | |
def get_sentiment(message): | |
try: | |
keywords = { | |
"sad": ["sad", "down", "hurt"], | |
"angry": ["mad", "angry", "frustrated"], | |
"anxious": ["worried", "nervous", "scared"], | |
} | |
message_lower = message.lower() | |
for tone, words in keywords.items(): | |
if any(w in message_lower for w in words): | |
return tone | |
analysis = TextBlob(message) | |
if analysis.sentiment.polarity > 0: | |
return "positive" | |
elif analysis.sentiment.polarity < 0: | |
return "negative" | |
return "neutral" | |
except Exception as e: | |
logging.error(f"Sentiment analysis error: {e}") | |
return "neutral" | |
# Summarize history | |
def summarize_history(history): | |
if not history: | |
return "" | |
summary = [] | |
negative_count = 0 | |
for msg in history[-3:]: | |
if msg["role"] == "user": | |
sentiment = get_sentiment(msg["content"]) | |
if sentiment in ["sad", "angry", "anxious"]: | |
negative_count += 1 | |
if negative_count > 1: | |
summary.append(f"User has expressed {negative_count} negative feelings recently.") | |
return "Summary: " + " ".join(summary) + "\n" if summary else "" | |
# Format prompt | |
def format_prompt(message, history, sentiment): | |
prompt = SYSTEM_PROMPT + "\n\n" | |
prompt += summarize_history(history) | |
if sentiment in ["sad", "angry", "anxious"]: | |
prompt += f"The user seems {sentiment}. Provide extra comfort and support.\n" | |
elif sentiment == "positive": | |
prompt += "The user seems happy. Reflect their positive mood.\n" | |
for msg in history[-MAX_HISTORY:]: | |
if msg["role"] == "user" and msg["content"]: | |
prompt += f"User: {msg['content']}\n" | |
if msg["role"] == "assistant" and msg["content"]: | |
prompt += f"Meroni: {msg['content']}\n\n" | |
prompt += f"User: {message}\nMeroni: " | |
return prompt | |
# Generate response | |
def generate_response(message, history, llm): | |
if llm is None: | |
logging.warning("Model unavailable, using fallback.") | |
return fallback_response(message) | |
try: | |
sentiment = get_sentiment(message) | |
prompt = format_prompt(message, history, sentiment) | |
response = llm( | |
prompt, | |
max_tokens=128, | |
temperature=0.7, | |
top_p=0.9, | |
stop=["User:", "\n\n"], | |
echo=False | |
) | |
reply = response['choices'][0]['text'].strip() | |
return reply or "I'm here for you. Could you share a bit more?" | |
except MemoryError: | |
logging.error("Memory error during generation.") | |
return "I'm running low on memory. Try a shorter message." | |
except Exception as e: | |
logging.error(f"Response generation error: {e}") | |
return fallback_response(message) | |
# Generate reflective prompt | |
def generate_reflective_prompt(history): | |
try: | |
if not history: | |
return "What's on your mind today? How are you feeling about it?" | |
last_user_msg = next((m["content"] for m in reversed(history) if m["role"] == "user"), "") | |
sentiment = get_sentiment(last_user_msg) | |
if sentiment in ["sad", "angry", "anxious"]: | |
return f"Why do you think you're feeling {sentiment}? What might help you feel a bit better?" | |
return f"What’s been going well for you lately? How can we explore that more?" | |
except Exception as e: | |
logging.error(f"Reflective prompt error: {e}") | |
return "Let’s reflect together. What’s on your mind?" | |
# Text-to-speech with retry | |
def text_to_speech(text): | |
try: | |
if not text: | |
return None | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as fp: | |
temp_filename = fp.name | |
tts = gtts.gTTS(text) | |
tts.save(temp_filename) | |
return temp_filename | |
except Exception as e: | |
logging.error(f"Text-to-speech error: {e}") | |
raise Exception("Text-to-speech failed.") | |
# Gradio interface | |
def create_interface(): | |
global llm | |
llm = None # Lazy loading | |
with gr.Blocks(css=""" | |
footer {visibility: hidden} | |
:root {--primary-color: #4CAF50;} | |
.dark {--primary-color: #333; --background: #222;} | |
.blue {--primary-color: #2196F3;} | |
.yellow {--primary-color: #FFC107;} | |
""") as demo: | |
gr.Markdown(""" | |
# 🌸 Meroni – Your Calm AI Companion | |
A gentle AI for mental wellness. Share your thoughts, feelings, or simply chat. | |
Meroni remembers your conversations privately on your device! | |
""") | |
# Dependency status | |
dep_status, dep_message = check_dependencies() | |
if not dep_status: | |
gr.Markdown(f"⚠️ {dep_message}") | |
# Settings | |
with gr.Row(): | |
n_ctx_slider = gr.Slider(minimum=256, maximum=1024, step=256, label="Context Size", value=DEFAULT_N_CTX) | |
n_batch_slider = gr.Slider(minimum=32, maximum=128, step=32, label="Batch Size", value=DEFAULT_N_BATCH) | |
theme = gr.Dropdown(choices=["light", "dark", "blue", "yellow"], label="Theme", value="light") | |
# Hidden state for history | |
history_loader = gr.State(value="[]") | |
# Mood tracker | |
mood_plot = gr.Plot(label="Mood Trends") | |
# Chatbot | |
chatbot = gr.Chatbot( | |
label="Conversation with Meroni", | |
elem_id="meroni-chat", | |
height=400, | |
type="messages" | |
) | |
# Audio output | |
audio_output = gr.Audio( | |
label="Meroni's Voice", | |
autoplay=True | |
) | |
# File output for export | |
file_output = gr.File(label="Exported History") | |
with gr.Row(): | |
msg = gr.Textbox( | |
placeholder="Type your thoughts here...", | |
lines=2, | |
max_lines=10, | |
container=True | |
) | |
submit_btn = gr.Button("Send", variant="primary") | |
with gr.Row(): | |
clear_btn = gr.Button("New Conversation", variant="secondary") | |
load_btn = gr.Button("Load Previous Conversation", variant="secondary") | |
reflect_btn = gr.Button("Reflect", variant="secondary") | |
export_btn = gr.Button("Download History", variant="secondary") | |
speak_toggle = gr.Checkbox(label="Enable Speech", value=True) | |
# JavaScript for local storage, encryption, and theme | |
gr.HTML(""" | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/4.1.1/crypto-js.min.js"></script> | |
<script> | |
// Encryption key | |
let encryptionKey = localStorage.getItem('meroni_key'); | |
if (!encryptionKey) { | |
encryptionKey = prompt('Set a password for your chat history:') || 'default'; | |
localStorage.setItem('meroni_key', encryptionKey); | |
} | |
// Local storage | |
document.addEventListener('DOMContentLoaded', function() { | |
let savedHistory = localStorage.getItem('meroni_history') || "[]"; | |
try { | |
savedHistory = CryptoJS.AES.decrypt(savedHistory, encryptionKey).toString(CryptoJS.enc.Utf8); | |
} catch (e) { | |
console.error('Decryption failed:', e); | |
} | |
try { | |
document.getElementById('history_loader').value = savedHistory; | |
} catch (e) { | |
console.error('History loader not found:', e); | |
} | |
}); | |
function saveHistory() { | |
try { | |
const history = document.querySelector('[data-testid="chatbot"]').value; | |
const encrypted = CryptoJS.AES.encrypt(JSON.stringify(history), encryptionKey).toString(); | |
localStorage.setItem('meroni_history', encrypted); | |
} catch (e) { | |
console.error('Failed to save history:', e); | |
alert('Could not save history. Your browser may restrict local storage.'); | |
} | |
} | |
const observer = new MutationObserver(saveHistory); | |
try { | |
observer.observe(document.querySelector('[data-testid="chatbot"]'), { childList: true, subtree: true }); | |
} catch (e) { | |
console.error('Observer setup failed:', e); | |
} | |
// Theme switching | |
document.getElementById('theme').addEventListener('change', function() { | |
const theme = this.value; | |
document.body.className = theme; | |
localStorage.setItem('meroni_theme', theme); | |
}); | |
// Browser TTS fallback | |
function browserTTS(text) { | |
try { | |
const utterance = new SpeechSynthesisUtterance(text); | |
utterance.lang = 'en-US'; | |
window.speechSynthesis.speak(utterance); | |
} catch (e) { | |
console.error('Browser TTS failed:', e); | |
} | |
} | |
</script> | |
""") | |
# Onboarding modal | |
gr.HTML(""" | |
<script> | |
if (!localStorage.getItem('meroni_onboarded')) { | |
alert('Welcome to Meroni!\\n1. Chat with your AI companion.\\n2. Save chats privately on your device.\\n3. Reflect anytime with journal prompts.'); | |
localStorage.setItem('meroni_onboarded', 'true'); | |
} | |
</script> | |
""") | |
# Event handlers | |
def user_input(message, history): | |
if not message.strip(): | |
return "", history or [] | |
history = history or [] | |
if history and not all("role" in m and "content" in m for m in history): | |
logging.error("Invalid history format") | |
history = [] | |
return "", history + [{"role": "user", "content": message}] | |
def bot_response(history, speak_enabled, n_ctx, n_batch): | |
global llm | |
if not history or history[-1].get("role") != "user": | |
return history or [], None | |
try: | |
# Lazy load model | |
if llm is None: | |
llm = initialize_model(int(n_ctx), int(n_batch)) | |
user_message = history[-1]["content"] | |
bot_message = generate_response(user_message, history[:-1], llm) | |
history.append({"role": "assistant", "content": bot_message}) | |
if speak_enabled: | |
try: | |
speech_file = text_to_speech(bot_message) | |
return history, speech_file | |
except Exception: | |
logging.warning("gTTS failed, trying browser TTS.") | |
return history, "browser_tts:" + bot_message | |
return history, None | |
except Exception as e: | |
logging.error(f"Bot response error: {e}") | |
history.append({"role": "assistant", "content": f"Sorry, something went wrong: {str(e)}. Try a shorter message."}) | |
return history, None | |
def update_mood_tracker(history): | |
try: | |
moods = [get_sentiment(m["content"]) for m in history if m["role"] == "user"] | |
data = [{"mood": m, "count": moods.count(m)} for m in set(moods) if m != "neutral"] | |
if not data: | |
return None | |
fig = go.Figure(data=[ | |
go.Bar(x=[d["mood"] for d in data], y=[d["count"] for d in data]) | |
]) | |
fig.update_layout(title="Mood Trends", xaxis_title="Mood", yaxis_title="Count") | |
return fig | |
except Exception as e: | |
logging.error(f"Mood tracker error: {e}") | |
return None | |
def clear_history(): | |
return [] | |
def load_history(history_loader): | |
try: | |
history = json.loads(history_loader) | |
return history, history | |
except Exception as e: | |
logging.error(f"History load error: {e}") | |
return [], [] | |
def reflect(history): | |
try: | |
prompt = generate_reflective_prompt(history) | |
history = history or [] | |
history.append({"role": "assistant", "content": prompt}) | |
return history, None | |
except Exception as e: | |
logging.error(f"Reflect error: {e}") | |
return history or [], None | |
def welcome(speak_enabled): | |
welcome_msg = "Hello! I'm Meroni, your calm AI companion. How are you feeling today?" | |
history = [{"role": "assistant", "content": welcome_msg}] | |
if speak_enabled: | |
try: | |
speech_file = text_to_speech(welcome_msg) | |
return history, speech_file | |
except Exception: | |
history[0]["content"] += " (Sorry, no audio.)" | |
return history, None | |
return history, None | |
def export_history(history): | |
try: | |
if not history: | |
return None | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w') as f: | |
json.dump(history, f) | |
return f.name | |
except Exception as e: | |
logging.error(f"Export history error: {e}") | |
return None | |
def update_theme(theme): | |
return None | |
# Connect components | |
submit_event = msg.submit( | |
user_input, [msg, chatbot], [msg, chatbot], queue=False | |
).then( | |
bot_response, [chatbot, speak_toggle, n_ctx_slider, n_batch_slider], [chatbot, audio_output], queue=True | |
).then( | |
update_mood_tracker, chatbot, mood_plot | |
) | |
submit_btn.click( | |
user_input, [msg, chatbot], [msg, chatbot], queue=False | |
).then( | |
bot_response, [chatbot, speak_toggle, n_ctx_slider, n_batch_slider], [chatbot, audio_output], queue=True | |
).then( | |
update_mood_tracker, chatbot, mood_plot | |
) | |
clear_btn.click(clear_history, None, chatbot).then( | |
welcome, speak_toggle, [chatbot, audio_output] | |
).then( | |
update_mood_tracker, chatbot, mood_plot | |
) | |
load_btn.click( | |
load_history, history_loader, [chatbot, history_loader] | |
).then( | |
update_mood_tracker, chatbot, mood_plot | |
) | |
reflect_btn.click( | |
reflect, chatbot, [chatbot, audio_output] | |
).then( | |
update_mood_tracker, chatbot, mood_plot | |
) | |
export_btn.click( | |
export_history, chatbot, file_output | |
) | |
theme.change( | |
update_theme, theme, None | |
) | |
demo.load(welcome, speak_toggle, [chatbot, audio_output]).then( | |
update_mood_tracker, chatbot, mood_plot | |
) | |
return demo | |
if __name__ == "__main__": | |
try: | |
# Check dependencies | |
dep_status, dep_message = check_dependencies() | |
if not dep_status: | |
print(dep_message) | |
exit(1) | |
# Launch interface | |
demo = create_interface() | |
demo.launch() | |
except Exception as e: | |
logging.error(f"App launch failed: {e}") | |
print(f"Failed to launch Meroni: {e}. Check meroni.log for details.") |