import os import gradio as gr from huggingface_hub import hf_hub_download from llama_cpp import Llama import tempfile import gtts from textblob import TextBlob import json import psutil import logging from tenacity import retry, stop_after_attempt, wait_exponential from contextlib import contextmanager import plotly.graph_objects as go import gc # Setup logging logging.basicConfig(filename="meroni.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Configuration MODEL_REPO = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" MODEL_FILE = "tinyllama-1.1b-chat-v1.0.Q2_K.gguf" # Q2_K for minimal memory MODELS_DIR = "models" MAX_HISTORY = 5 DEFAULT_N_CTX = 512 # Low to avoid crashes DEFAULT_N_BATCH = 64 # Low for stability # Ensure models directory exists os.makedirs(MODELS_DIR, exist_ok=True) # Dependency check def check_dependencies(): required = ["gradio", "huggingface_hub", "llama_cpp", "gtts", "textblob", "psutil", "tenacity", "plotly"] missing = [] for module in required: try: __import__(module) except ImportError: missing.append(module) if missing: return (False, f"Missing dependencies: {', '.join(missing)}. Please include them in requirements.txt.") return (True, "All dependencies installed.") # Check system resources def suggest_performance_mode(): try: mem = psutil.virtual_memory() available_gb = mem.available / (1024 ** 3) return available_gb > 4, available_gb except Exception as e: logging.error(f"Resource check failed: {e}") return False, 0 # Download model def download_model(): try: model_path = os.path.join(MODELS_DIR, MODEL_FILE) if not os.path.exists(model_path): logging.info(f"Downloading model {MODEL_FILE}...") model_path = hf_hub_download( repo_id=MODEL_REPO, filename=MODEL_FILE, local_dir=MODELS_DIR ) logging.info(f"Model downloaded to {model_path}") return model_path except Exception as e: logging.error(f"Model download failed: {e}") raise Exception("Failed to download model. Check internet connection.") # Context manager for Llama @contextmanager def llama_context(*args, **kwargs): llm = None try: llm = Llama(*args, **kwargs) yield llm except MemoryError: logging.error("Out of memory during model loading.") raise Exception("Not enough memory.") except Exception as e: logging.error(f"Model loading failed: {e}") raise Exception("Failed to load model. Check meroni.log.") finally: if llm is not None: try: llm.close() del llm gc.collect() except Exception as e: logging.error(f"Model cleanup failed: {e}") # Initialize model with retry @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=5)) def initialize_model(n_ctx=DEFAULT_N_CTX, n_batch=DEFAULT_N_BATCH): model_path = download_model() try: with llama_context( model_path=model_path, n_ctx=n_ctx, n_threads=2, n_batch=n_batch, verbose=False ) as llm: return llm except Exception as e: logging.warning(f"Model initialization failed: {e}") raise # Fallback response def fallback_response(message): return f"I'm having trouble processing, but I hear you saying: '{message}'. How can I support you further?" # System prompt SYSTEM_PROMPT = """You are Meroni, a gentle, emotionally supportive AI companion. Listen attentively, validate the user's feelings, and respond with warmth, empathy, and understanding. Provide thoughtful, concise replies (2-4 sentences) that avoid generic phrases or emojis unless requested. Use the conversation history and summary to make responses personal, and adapt to the user's emotional tone.""" # Sentiment analysis with keywords def get_sentiment(message): try: keywords = { "sad": ["sad", "down", "hurt"], "angry": ["mad", "angry", "frustrated"], "anxious": ["worried", "nervous", "scared"], } message_lower = message.lower() for tone, words in keywords.items(): if any(w in message_lower for w in words): return tone analysis = TextBlob(message) if analysis.sentiment.polarity > 0: return "positive" elif analysis.sentiment.polarity < 0: return "negative" return "neutral" except Exception as e: logging.error(f"Sentiment analysis error: {e}") return "neutral" # Summarize history def summarize_history(history): if not history: return "" summary = [] negative_count = 0 for msg in history[-3:]: if msg["role"] == "user": sentiment = get_sentiment(msg["content"]) if sentiment in ["sad", "angry", "anxious"]: negative_count += 1 if negative_count > 1: summary.append(f"User has expressed {negative_count} negative feelings recently.") return "Summary: " + " ".join(summary) + "\n" if summary else "" # Format prompt def format_prompt(message, history, sentiment): prompt = SYSTEM_PROMPT + "\n\n" prompt += summarize_history(history) if sentiment in ["sad", "angry", "anxious"]: prompt += f"The user seems {sentiment}. Provide extra comfort and support.\n" elif sentiment == "positive": prompt += "The user seems happy. Reflect their positive mood.\n" for msg in history[-MAX_HISTORY:]: if msg["role"] == "user" and msg["content"]: prompt += f"User: {msg['content']}\n" if msg["role"] == "assistant" and msg["content"]: prompt += f"Meroni: {msg['content']}\n\n" prompt += f"User: {message}\nMeroni: " return prompt # Generate response def generate_response(message, history, llm): if llm is None: logging.warning("Model unavailable, using fallback.") return fallback_response(message) try: sentiment = get_sentiment(message) prompt = format_prompt(message, history, sentiment) response = llm( prompt, max_tokens=128, temperature=0.7, top_p=0.9, stop=["User:", "\n\n"], echo=False ) reply = response['choices'][0]['text'].strip() return reply or "I'm here for you. Could you share a bit more?" except MemoryError: logging.error("Memory error during generation.") return "I'm running low on memory. Try a shorter message." except Exception as e: logging.error(f"Response generation error: {e}") return fallback_response(message) # Generate reflective prompt def generate_reflective_prompt(history): try: if not history: return "What's on your mind today? How are you feeling about it?" last_user_msg = next((m["content"] for m in reversed(history) if m["role"] == "user"), "") sentiment = get_sentiment(last_user_msg) if sentiment in ["sad", "angry", "anxious"]: return f"Why do you think you're feeling {sentiment}? What might help you feel a bit better?" return f"What’s been going well for you lately? How can we explore that more?" except Exception as e: logging.error(f"Reflective prompt error: {e}") return "Let’s reflect together. What’s on your mind?" # Text-to-speech with retry @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) def text_to_speech(text): try: if not text: return None with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as fp: temp_filename = fp.name tts = gtts.gTTS(text) tts.save(temp_filename) return temp_filename except Exception as e: logging.error(f"Text-to-speech error: {e}") raise Exception("Text-to-speech failed.") # Gradio interface def create_interface(): global llm llm = None # Lazy loading with gr.Blocks(css=""" footer {visibility: hidden} :root {--primary-color: #4CAF50;} .dark {--primary-color: #333; --background: #222;} .blue {--primary-color: #2196F3;} .yellow {--primary-color: #FFC107;} """) as demo: gr.Markdown(""" # 🌸 Meroni – Your Calm AI Companion A gentle AI for mental wellness. Share your thoughts, feelings, or simply chat. Meroni remembers your conversations privately on your device! """) # Dependency status dep_status, dep_message = check_dependencies() if not dep_status: gr.Markdown(f"⚠️ {dep_message}") # Settings with gr.Row(): n_ctx_slider = gr.Slider(minimum=256, maximum=1024, step=256, label="Context Size", value=DEFAULT_N_CTX) n_batch_slider = gr.Slider(minimum=32, maximum=128, step=32, label="Batch Size", value=DEFAULT_N_BATCH) theme = gr.Dropdown(choices=["light", "dark", "blue", "yellow"], label="Theme", value="light") # Hidden state for history history_loader = gr.State(value="[]") # Mood tracker mood_plot = gr.Plot(label="Mood Trends") # Chatbot chatbot = gr.Chatbot( label="Conversation with Meroni", elem_id="meroni-chat", height=400, type="messages" ) # Audio output audio_output = gr.Audio( label="Meroni's Voice", autoplay=True ) # File output for export file_output = gr.File(label="Exported History") with gr.Row(): msg = gr.Textbox( placeholder="Type your thoughts here...", lines=2, max_lines=10, container=True ) submit_btn = gr.Button("Send", variant="primary") with gr.Row(): clear_btn = gr.Button("New Conversation", variant="secondary") load_btn = gr.Button("Load Previous Conversation", variant="secondary") reflect_btn = gr.Button("Reflect", variant="secondary") export_btn = gr.Button("Download History", variant="secondary") speak_toggle = gr.Checkbox(label="Enable Speech", value=True) # JavaScript for local storage, encryption, and theme gr.HTML(""" """) # Onboarding modal gr.HTML(""" """) # Event handlers def user_input(message, history): if not message.strip(): return "", history or [] history = history or [] if history and not all("role" in m and "content" in m for m in history): logging.error("Invalid history format") history = [] return "", history + [{"role": "user", "content": message}] def bot_response(history, speak_enabled, n_ctx, n_batch): global llm if not history or history[-1].get("role") != "user": return history or [], None try: # Lazy load model if llm is None: llm = initialize_model(int(n_ctx), int(n_batch)) user_message = history[-1]["content"] bot_message = generate_response(user_message, history[:-1], llm) history.append({"role": "assistant", "content": bot_message}) if speak_enabled: try: speech_file = text_to_speech(bot_message) return history, speech_file except Exception: logging.warning("gTTS failed, trying browser TTS.") return history, "browser_tts:" + bot_message return history, None except Exception as e: logging.error(f"Bot response error: {e}") history.append({"role": "assistant", "content": f"Sorry, something went wrong: {str(e)}. Try a shorter message."}) return history, None def update_mood_tracker(history): try: moods = [get_sentiment(m["content"]) for m in history if m["role"] == "user"] data = [{"mood": m, "count": moods.count(m)} for m in set(moods) if m != "neutral"] if not data: return None fig = go.Figure(data=[ go.Bar(x=[d["mood"] for d in data], y=[d["count"] for d in data]) ]) fig.update_layout(title="Mood Trends", xaxis_title="Mood", yaxis_title="Count") return fig except Exception as e: logging.error(f"Mood tracker error: {e}") return None def clear_history(): return [] def load_history(history_loader): try: history = json.loads(history_loader) return history, history except Exception as e: logging.error(f"History load error: {e}") return [], [] def reflect(history): try: prompt = generate_reflective_prompt(history) history = history or [] history.append({"role": "assistant", "content": prompt}) return history, None except Exception as e: logging.error(f"Reflect error: {e}") return history or [], None def welcome(speak_enabled): welcome_msg = "Hello! I'm Meroni, your calm AI companion. How are you feeling today?" history = [{"role": "assistant", "content": welcome_msg}] if speak_enabled: try: speech_file = text_to_speech(welcome_msg) return history, speech_file except Exception: history[0]["content"] += " (Sorry, no audio.)" return history, None return history, None def export_history(history): try: if not history: return None with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w') as f: json.dump(history, f) return f.name except Exception as e: logging.error(f"Export history error: {e}") return None def update_theme(theme): return None # Connect components submit_event = msg.submit( user_input, [msg, chatbot], [msg, chatbot], queue=False ).then( bot_response, [chatbot, speak_toggle, n_ctx_slider, n_batch_slider], [chatbot, audio_output], queue=True ).then( update_mood_tracker, chatbot, mood_plot ) submit_btn.click( user_input, [msg, chatbot], [msg, chatbot], queue=False ).then( bot_response, [chatbot, speak_toggle, n_ctx_slider, n_batch_slider], [chatbot, audio_output], queue=True ).then( update_mood_tracker, chatbot, mood_plot ) clear_btn.click(clear_history, None, chatbot).then( welcome, speak_toggle, [chatbot, audio_output] ).then( update_mood_tracker, chatbot, mood_plot ) load_btn.click( load_history, history_loader, [chatbot, history_loader] ).then( update_mood_tracker, chatbot, mood_plot ) reflect_btn.click( reflect, chatbot, [chatbot, audio_output] ).then( update_mood_tracker, chatbot, mood_plot ) export_btn.click( export_history, chatbot, file_output ) theme.change( update_theme, theme, None ) demo.load(welcome, speak_toggle, [chatbot, audio_output]).then( update_mood_tracker, chatbot, mood_plot ) return demo if __name__ == "__main__": try: # Check dependencies dep_status, dep_message = check_dependencies() if not dep_status: print(dep_message) exit(1) # Launch interface demo = create_interface() demo.launch() except Exception as e: logging.error(f"App launch failed: {e}") print(f"Failed to launch Meroni: {e}. Check meroni.log for details.")