import os import json # For debug printing import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoTokenizer try: # Try importing llama-cpp-python for GGUF support from llama_cpp import Llama LLAMA_CPP_AVAILABLE = True except ImportError: print("⚠️ WARNING: llama-cpp-python library not found. Local GGUF execution will not be available.") print(" To enable local GGUF, run: pip install llama-cpp-python") Llama = None # Define as None if import fails LLAMA_CPP_AVAILABLE = False # --- Configuration --- # HF Repo ID for the standard model (used in Space and for tokenizer) HF_CHECKPOINT = "ibm-granite/granite-3.3-2b-instruct" # GGUF Settings for Local Execution (Using llama-cpp-python) GGUF_REPO_ID = "ibm-granite/granite-3.3-2b-instruct-gguf" # Official IBM v3.3 GGUF repo GGUF_FILENAME = "granite-3.3-2b-instruct-Q2_K.gguf" # Smallest Q2_K quantization # GGUF_FILENAME = "granite-3.3-2b-instruct-Q4_K_M.gguf" # Fallback if Q2_K fails # Template Filename (Use v3.3 template for both paths now) TEMPLATE_FILENAME = "granite3.3_2b_chat_template.jinja" # --- End Configuration --- # Detect Space environment env = os.environ is_space = env.get("SPACE_ID") is not None print(f"RUNNING IN SPACE? {is_space}") # Device setup (primarily for HF model in Space) device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using device: {device}") # --- Load model function --- def load_model(): primary_checkpoint = HF_CHECKPOINT model_name_display = primary_checkpoint # --- Function to load and apply template --- def apply_template_from_file(tokenizer, template_filename): applied_template = False try: print(f"Attempting to load chat template from: {template_filename}") script_dir = os.path.dirname(os.path.abspath(__file__)) template_path = os.path.join(script_dir, template_filename) if not os.path.exists(template_path): print(f"⚠️ WARNING: Template file not found at: {template_path}") return False with open(template_path, "r", encoding="utf-8") as f: custom_chat_template_content = f.read() if hasattr(tokenizer, 'chat_template'): tokenizer.chat_template = custom_chat_template_content applied_template = True print(f"✅ Loaded and applied chat template from: {template_filename}") else: print(f"⚠️ WARNING: Tokenizer object does not support setting 'chat_template'.") except Exception as e: print(f"❌ ERROR reading or applying template file '{template_filename}': {e}") if not applied_template: print("Falling back to tokenizer's default built-in template (if any).") print("--- Final Chat Template Being Used (by HF Tokenizer) ---") print(tokenizer.chat_template if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template else "No template found or template empty/default.") print("-------------------------------------------------------") return applied_template # --- End function --- # --- Load Tokenizer (Common for both paths now) --- try: print(f"Loading HF Tokenizer: {primary_checkpoint}") tokenizer = AutoTokenizer.from_pretrained(primary_checkpoint, use_fast=True) print("✅ Loaded HF Tokenizer.") # Apply the v3.3 template UNCONDITIONALLY apply_template_from_file(tokenizer, TEMPLATE_FILENAME) except Exception as e: print(f"❌ Failed to load tokenizer {primary_checkpoint}: {e}") raise RuntimeError("Failed to load the necessary tokenizer.") from e # --- End Tokenizer Loading --- if is_space: print(f"🚀 Running in Space. Loading HF model: {primary_checkpoint}") try: # Load HF Model for Space model = AutoModelForCausalLM.from_pretrained( primary_checkpoint, torch_dtype=torch.float16, low_cpu_mem_usage=True, device_map="auto" ) print(f"✅ Loaded HF {primary_checkpoint}") model_name_display = primary_checkpoint # Tokenizer already loaded and template applied return tokenizer, model, model_name_display except Exception as e: print(f"❌ HF Primary load failed: {e}") raise RuntimeError(f"Failed to load primary HF model {primary_checkpoint} in Space.") from e else: # Running Locally - Load GGUF using llama-cpp-python print(f"💻 Running Locally. Attempting GGUF setup via llama-cpp-python.") if not LLAMA_CPP_AVAILABLE: raise RuntimeError("llama-cpp-python library is required but not installed/found.") print(f" GGUF Repo ID: {GGUF_REPO_ID}") print(f" GGUF Filename: {GGUF_FILENAME}") try: # Load GGUF Model using llama-cpp-python print(f"Attempting to load GGUF model using Llama.from_pretrained...") model = Llama.from_pretrained( repo_id=GGUF_REPO_ID, filename=GGUF_FILENAME, n_gpu_layers=0, # Force CPU execution verbose=True, n_ctx=4096 # Increased context window ) print(f"✅ Loaded GGUF model {GGUF_FILENAME} using llama-cpp-python") model_name_display = f"GGUF (llama-cpp): {GGUF_FILENAME}" # Return tokenizer loaded earlier and the Llama model object return tokenizer, model, model_name_display except Exception as e: print(f"❌ Local GGUF load failed using llama-cpp-python: {e}") if "Not Found" in str(e) or "404" in str(e): print(f" File not found. Please ensure Repo ID '{GGUF_REPO_ID}' and Filename '{GGUF_FILENAME}' are correct and the file exists on Hugging Face Hub.") elif "invalid GGUF file" in str(e) or "failed to load model" in str(e): print(f" Model loading failed. The GGUF file '{GGUF_FILENAME}' might be corrupted, incompatible with this version of llama-cpp-python, or the quantization level is unsupported.") print(f" Consider trying a different quantization like 'Q4_K_M'.") # Add other potential error checks based on llama-cpp-python exceptions raise RuntimeError(f"Failed to load local GGUF model '{GGUF_FILENAME}' using llama-cpp-python.") from e # --- Call load_model --- try: # Tokenizer should now be loaded for both paths tokenizer, model, model_name = load_model() if tokenizer is None: # Should not happen now raise RuntimeError("Tokenizer failed to load.") except Exception as load_err: print(f"🚨 CRITICAL ERROR DURING MODEL LOADING: {load_err}") # For UI testing, you might want to create dummy objects instead of raising # tokenizer = None # model = None # model_name = "LOAD FAILED" raise # Re-raise for now # --- Load hotel docs function --- def load_hotel_docs(hotel_id): knowledge_dir = "knowledge" path = os.path.join(knowledge_dir, f"{hotel_id}.txt") if not os.path.exists(path): print(f"⚠️ Knowledge file not found: {path}") return [] try: with open(path, encoding="utf-8") as f: content = f.read().strip() print(f"DEBUG [load_hotel_docs]: Read {len(content)} chars from {path}.") if not content: print(f"⚠️ WARNING [load_hotel_docs]: File {path} is empty.") return [] return [(hotel_id, content)] # Return list with tuple: [(id, content)] except Exception as e: print(f"❌ Error reading knowledge file {path}: {e}") return [] # --- Dynamic Hotel ID Detection --- available_hotels = [] knowledge_dir = "knowledge" if os.path.exists(knowledge_dir): print("🔍 Scanning for available hotels...") files = os.listdir(knowledge_dir) potential_ids = set() for f in files: if f.endswith(".txt") and not f.endswith("-system.txt"): potential_ids.add(f[:-4]) # Add ID without .txt for hotel_id in sorted(list(potential_ids)): doc_file = os.path.join(knowledge_dir, f"{hotel_id}.txt") sys_file = os.path.join(knowledge_dir, f"{hotel_id}-system.txt") if os.path.exists(doc_file) and os.path.exists(sys_file): available_hotels.append(hotel_id) print(f" ✅ Found valid hotel pair: {hotel_id}") else: print(f" ⚠️ Skipping '{hotel_id}': Missing either '{hotel_id}.txt' or '{hotel_id}-system.txt'") print("Hotel scan complete.") else: print(f"⚠️ Knowledge directory '{knowledge_dir}' not found. No hotels loaded.") # --- End Hotel Scanning --- # --- Chat function --- def chat(message, history, hotel_id): if history is None: history = [] # Convert Gradio history history_hf_format = [] for user_msg, assistant_msg in history: if user_msg: history_hf_format.append({"role": "user", "content": user_msg}) if assistant_msg: history_hf_format.append({"role": "assistant", "content": assistant_msg}) current_turn = {"role": "user", "content": message} ui_history = history + [[message, None]] yield ui_history, "" response = "Sorry, an error occurred." input_text = "" # Initialize input_text try: # --- System Prompt Loading --- default_system_prompt = "You are a helpful hotel assistant." system_prompt_filename = f"{hotel_id}-system.txt" system_prompt_path = os.path.join("knowledge", system_prompt_filename) system_prompt_content = default_system_prompt if os.path.exists(system_prompt_path): try: with open(system_prompt_path, "r", encoding="utf-8") as f: loaded_prompt = f.read().strip() if loaded_prompt: system_prompt_content = loaded_prompt else: print(f"⚠️ System prompt file '{system_prompt_path}' is empty. Using default.") except Exception as e: print(f"❌ Error reading system prompt file '{system_prompt_path}': {e}. Using default.") else: print(f"⚠️ System prompt file not found: '{system_prompt_path}'. Using default.") # --- Document Loading --- hotel_docs_list = load_hotel_docs(hotel_id) # --- Message List Construction (Base: System, History, User) --- messages = [{"role": "system", "content": system_prompt_content}] messages.extend(history_hf_format) messages.append(current_turn) print(f"DEBUG [chat]: Base messages list:\n{json.dumps(messages, indent=2)}") # --- Prepare documents kwarg (Used by apply_chat_template in BOTH paths) --- documents_for_kwarg = [] if hotel_docs_list: # Use 'doc_id' and 'text' keys for v3.3 template documents_for_kwarg = [{"doc_id": doc_id, "text": doc_content} for doc_id, doc_content in hotel_docs_list] print(f"DEBUG [chat]: Preparing documents kwarg: {len(documents_for_kwarg)} docs") # --- Template Application (Now UNCONDITIONAL - uses tokenizer) --- input_text = tokenizer.apply_chat_template( messages, documents=documents_for_kwarg, # Use kwarg for v3.3 template tokenize=False, add_generation_prompt=True ) # --- THIS IS THE DEBUG PRINT YOU REQUESTED --- print("\n" + "="*40 + " FINAL PROMPT STRING " + "="*40) print(input_text) print("="*99 + "\n") # --- END DEBUG PRINT --- except Exception as e: print(f"❌ Error during prompt preparation: {e}") ui_history[-1][1] = "Sorry, an error occurred while preparing the prompt." yield ui_history, "" return # --- Generation --- try: if is_space: # --- HF Space Generation (model.generate) --- print("🚀 Generating response using HF model...") inputs = tokenizer(input_text, return_tensors="pt").to(device) input_length = inputs.input_ids.shape[1] with torch.no_grad(): outputs = model.generate( inputs.input_ids, attention_mask=inputs.attention_mask, max_new_tokens=1024, do_sample=False, eos_token_id=tokenizer.eos_token_id ) new_token_ids = outputs[0][input_length:] response = tokenizer.decode(new_token_ids, skip_special_tokens=True).strip() print("✅ HF Generation complete.") else: # Local GGUF Generation using llama-cpp-python's lower-level call print("💻 Generating response using GGUF model (llama-cpp-python)...") # --- Use model(prompt_string, ...) --- output = model( # Call the Llama object directly with the formatted string input_text, max_tokens=512, # Max tokens to generate stop=["<|end_of_text|>"], # Use model's stop token(s) temperature=0.1, # echo=False # Usually default, don't echo the prompt ) # Extract response content if output and 'choices' in output and output['choices'] and 'text' in output['choices'][0]: response = output['choices'][0]['text'].strip() else: print(f"⚠️ Unexpected output format from model call: {output}") response = "Sorry, received an unexpected response structure." # --- End model(prompt_string, ...) --- print("✅ GGUF Generation complete (llama-cpp-python).") if not response: response = "Sorry, I encountered an issue generating a response (empty)." except Exception as e: print(f"❌ Error during model generation or processing: {e}") response = f"Sorry, an error occurred: {e}" print(f"DEBUG: Final response variable before UI append = {repr(response)}") ui_history[-1][1] = response yield ui_history, "" # --- Gradio UI --- with gr.Blocks() as demo: with gr.Column(variant="panel"): gr.Markdown("### 🏨 Multi‑Hotel Chatbot Demo") gr.Markdown(f"**Running:** {model_name}") # Displays HF name or GGUF info hotel_selector = gr.Dropdown( choices=available_hotels, label="Hotel", value=available_hotels[0] if available_hotels else None, interactive=bool(available_hotels) ) with gr.Row(): chatbot = gr.Chatbot(label="Chat History", height=500) msg = gr.Textbox( show_label=False, placeholder="Ask about the hotel..." ) clear_btn = gr.Button("Clear") clear_btn.click(lambda: ([], ""), None, [chatbot, msg]) msg.submit( fn=chat, inputs=[msg, chatbot, hotel_selector], outputs=[chatbot, msg] ) if is_space: gr.Markdown("⚠️ Pause the Space when done to avoid charges.") # Enable streaming queue demo.queue(default_concurrency_limit=2, max_size=32) if __name__ == "__main__": print("Launching Gradio Interface...") demo.launch() print("Gradio Interface closed.")