Spaces:
Running
Running
# # # import os | |
# # # import time | |
# # # import pandas as pd | |
# # # import gradio as gr | |
# # # from langchain_groq import ChatGroq | |
# # # from langchain_huggingface import HuggingFaceEmbeddings | |
# # # from langchain_community.vectorstores import Chroma | |
# # # from langchain_core.prompts import PromptTemplate | |
# # # from langchain_core.output_parsers import StrOutputParser | |
# # # from langchain_core.runnables import RunnablePassthrough | |
# # # from PyPDF2 import PdfReader | |
# # # # Configuration constants | |
# # # COLLECTION_NAME = "GBVRS" | |
# # # DATA_FOLDER = "./" | |
# # # APP_VERSION = "v1.0.0" | |
# # # APP_NAME = "Ijwi ry'Ubufasha" | |
# # # MAX_HISTORY_MESSAGES = 8 # Limit history to avoid token limits | |
# # # # Global variables for application state | |
# # # llm = None | |
# # # embed_model = None | |
# # # vectorstore = None | |
# # # retriever = None | |
# # # rag_chain = None | |
# # # # User session management | |
# # # class UserSession: | |
# # # def __init__(self, session_id, llm): | |
# # # """Initialize a user session with unique ID and language model.""" | |
# # # self.session_id = session_id | |
# # # self.user_info = {"Nickname": "Guest"} | |
# # # self.conversation_history = [] | |
# # # self.llm = llm | |
# # # self.welcome_message = None | |
# # # self.last_activity = time.time() | |
# # # def set_user(self, user_info): | |
# # # """Set user information and generate welcome message.""" | |
# # # self.user_info = user_info | |
# # # self.generate_welcome_message() | |
# # # # Initialize conversation history with welcome message | |
# # # welcome = self.get_welcome_message() | |
# # # self.conversation_history = [ | |
# # # {"role": "assistant", "content": welcome}, | |
# # # ] | |
# # # def get_user(self): | |
# # # """Get current user information.""" | |
# # # return self.user_info | |
# # # def generate_welcome_message(self): | |
# # # """Generate a dynamic welcome message using the LLM.""" | |
# # # try: | |
# # # nickname = self.user_info.get("Nickname", "Guest") | |
# # # # Use the LLM to generate the message | |
# # # prompt = ( | |
# # # f"Create a brief and warm welcome message for {nickname} that's about 1-2 sentences. " | |
# # # f"Emphasize this is a safe space for discussing gender-based violence issues " | |
# # # f"and that we provide support and resources. Keep it warm and reassuring." | |
# # # ) | |
# # # response = self.llm.invoke(prompt) | |
# # # welcome = response.content.strip() | |
# # # # Format the message with HTML styling | |
# # # self.welcome_message = ( | |
# # # f"<div style='font-size: 18px; color: #4E6BBF;'>" | |
# # # f"{welcome}" | |
# # # f"</div>" | |
# # # ) | |
# # # except Exception as e: | |
# # # # Fallback welcome message | |
# # # nickname = self.user_info.get("Nickname", "Guest") | |
# # # self.welcome_message = ( | |
# # # f"<div style='font-size: 18px; color: #4E6BBF;'>" | |
# # # f"Welcome, {nickname}! You're in a safe space. We're here to provide support with " | |
# # # f"gender-based violence issues and connect you with resources that can help." | |
# # # f"</div>" | |
# # # ) | |
# # # def get_welcome_message(self): | |
# # # """Get the formatted welcome message.""" | |
# # # if not self.welcome_message: | |
# # # self.generate_welcome_message() | |
# # # return self.welcome_message | |
# # # def add_to_history(self, role, message): | |
# # # """Add a message to the conversation history.""" | |
# # # self.conversation_history.append({"role": role, "content": message}) | |
# # # self.last_activity = time.time() | |
# # # # Trim history if it gets too long | |
# # # if len(self.conversation_history) > MAX_HISTORY_MESSAGES * 2: # Keep pairs of messages | |
# # # # Keep the first message (welcome) and the most recent messages | |
# # # self.conversation_history = [self.conversation_history[0]] + self.conversation_history[-MAX_HISTORY_MESSAGES*2+1:] | |
# # # def get_conversation_history(self): | |
# # # """Get the full conversation history.""" | |
# # # return self.conversation_history | |
# # # def get_formatted_history(self): | |
# # # """Get conversation history formatted as a string for the LLM.""" | |
# # # # Skip the welcome message and only include the last few exchanges | |
# # # recent_history = self.conversation_history[1:] if len(self.conversation_history) > 1 else [] | |
# # # # Limit to last MAX_HISTORY_MESSAGES exchanges | |
# # # if len(recent_history) > MAX_HISTORY_MESSAGES * 2: | |
# # # recent_history = recent_history[-MAX_HISTORY_MESSAGES*2:] | |
# # # formatted_history = "" | |
# # # for entry in recent_history: | |
# # # role = "User" if entry["role"] == "user" else "Assistant" | |
# # # # Truncate very long messages to avoid token limits | |
# # # content = entry["content"] | |
# # # if len(content) > 500: # Limit message length | |
# # # content = content[:500] + "..." | |
# # # formatted_history += f"{role}: {content}\n\n" | |
# # # return formatted_history | |
# # # def is_expired(self, timeout_seconds=3600): | |
# # # """Check if the session has been inactive for too long.""" | |
# # # return (time.time() - self.last_activity) > timeout_seconds | |
# # # # Session manager to handle multiple users | |
# # # class SessionManager: | |
# # # def __init__(self): | |
# # # """Initialize the session manager.""" | |
# # # self.sessions = {} | |
# # # self.session_timeout = 3600 # 1 hour timeout | |
# # # def get_session(self, session_id): | |
# # # """Get an existing session or create a new one.""" | |
# # # # Clean expired sessions first | |
# # # self._clean_expired_sessions() | |
# # # # Create new session if needed | |
# # # if session_id not in self.sessions: | |
# # # self.sessions[session_id] = UserSession(session_id, llm) | |
# # # return self.sessions[session_id] | |
# # # def _clean_expired_sessions(self): | |
# # # """Remove expired sessions to free up memory.""" | |
# # # expired_keys = [] | |
# # # for key, session in self.sessions.items(): | |
# # # if session.is_expired(self.session_timeout): | |
# # # expired_keys.append(key) | |
# # # for key in expired_keys: | |
# # # del self.sessions[key] | |
# # # # Initialize the session manager | |
# # # session_manager = SessionManager() | |
# # # def initialize_assistant(): | |
# # # """Initialize the assistant with necessary components and configurations.""" | |
# # # global llm, embed_model, vectorstore, retriever, rag_chain | |
# # # # Initialize API key - try both possible key names | |
# # # groq_api_key = os.environ.get('GBV') or os.environ.get('GBV') | |
# # # if not groq_api_key: | |
# # # print("WARNING: No GROQ API key found in userdata.") | |
# # # # Initialize LLM - Default to Llama model which is more widely available | |
# # # llm = ChatGroq( | |
# # # model="llama-3.3-70b-versatile", # More reliable than whisper model | |
# # # api_key=groq_api_key | |
# # # ) | |
# # # # Set up embedding model | |
# # # try: | |
# # # embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# # # except Exception as e: | |
# # # # Fallback to smaller model | |
# # # embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
# # # # Process data and create vector store | |
# # # print("Processing data files...") | |
# # # data = process_data_files() | |
# # # print("Creating vector store...") | |
# # # vectorstore = create_vectorstore(data) | |
# # # retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) | |
# # # # Create RAG chain | |
# # # print("Setting up RAG chain...") | |
# # # rag_chain = create_rag_chain() | |
# # # print(f"✅ {APP_NAME} initialized successfully") | |
# # # def process_data_files(): | |
# # # """Process all data files from the specified folder.""" | |
# # # context_data = [] | |
# # # try: | |
# # # if not os.path.exists(DATA_FOLDER): | |
# # # print(f"WARNING: Data folder does not exist: {DATA_FOLDER}") | |
# # # return context_data | |
# # # # Get list of data files | |
# # # all_files = os.listdir(DATA_FOLDER) | |
# # # data_files = [f for f in all_files if f.lower().endswith(('.csv', '.xlsx', '.xls'))] | |
# # # if not data_files: | |
# # # print(f"WARNING: No data files found in: {DATA_FOLDER}") | |
# # # return context_data | |
# # # # Process each file | |
# # # for index, file_name in enumerate(data_files, 1): | |
# # # print(f"Processing file {index}/{len(data_files)}: {file_name}") | |
# # # file_path = os.path.join(DATA_FOLDER, file_name) | |
# # # try: | |
# # # # Read file based on extension | |
# # # if file_name.lower().endswith('.csv'): | |
# # # df = pd.read_csv(file_path) | |
# # # else: | |
# # # df = pd.read_excel(file_path) | |
# # # # Check if column 3 exists (source data is in third column) | |
# # # if df.shape[1] > 2: | |
# # # column_data = df.iloc[:, 2].dropna().astype(str).tolist() | |
# # # # Each row becomes one chunk with metadata | |
# # # for i, text in enumerate(column_data): | |
# # # if text and len(text.strip()) > 0: | |
# # # context_data.append({ | |
# # # "page_content": text, | |
# # # "metadata": { | |
# # # "source": file_name, | |
# # # "row": i+1 | |
# # # } | |
# # # }) | |
# # # else: | |
# # # print(f"WARNING: File {file_name} has fewer than 3 columns.") | |
# # # except Exception as e: | |
# # # print(f"ERROR processing file {file_name}: {e}") | |
# # # print(f"✅ Created {len(context_data)} chunks from {len(data_files)} files.") | |
# # # except Exception as e: | |
# # # print(f"ERROR accessing data folder: {e}") | |
# # # return context_data | |
# # # def create_vectorstore(data): | |
# # # """ | |
# # # Creates and returns a Chroma vector store populated with the provided data. | |
# # # Parameters: | |
# # # data (list): A list of dictionaries, each containing 'page_content' and 'metadata'. | |
# # # Returns: | |
# # # Chroma: The populated Chroma vector store instance. | |
# # # """ | |
# # # # Initialize the vector store | |
# # # vectorstore = Chroma( | |
# # # collection_name=COLLECTION_NAME, | |
# # # embedding_function=embed_model, | |
# # # persist_directory="./" | |
# # # ) | |
# # # if not data: | |
# # # print("⚠️ No data provided. Returning an empty vector store.") | |
# # # return vectorstore | |
# # # try: | |
# # # # Extract text and metadata from the data | |
# # # texts = [doc["page_content"] for doc in data] | |
# # # # Add the texts and metadata to the vector store | |
# # # vectorstore.add_texts(texts) | |
# # # except Exception as e: | |
# # # print(f"❌ Failed to add documents to vector store: {e}") | |
# # # # Fix: Return vectorstore instead of vs | |
# # # return vectorstore # Changed from 'return vs' to 'return vectorstore' | |
# # # def create_rag_chain(): | |
# # # """Create the RAG chain for processing user queries.""" | |
# # # # Define the prompt template | |
# # # template = """ | |
# # # You are a compassionate and supportive AI assistant specializing in helping individuals affected by Gender-Based Violence (GBV). Your responses must be based EXCLUSIVELY on the information provided in the context. Your primary goal is to provide emotionally intelligent support while maintaining appropriate boundaries. | |
# # # **Previous conversation:** {conversation_history} | |
# # # **Context information:** {context} | |
# # # **User's Question:** {question} | |
# # # When responding follow these guidelines: | |
# # # 1. **Strict Context Adherence** | |
# # # - Only use information that appears in the provided {context} | |
# # # - If the answer is not found in the context, state "I don't have that information in my available resources" rather than generating a response | |
# # # 2. **Personalized Communication** | |
# # # - Avoid contractions (e.g., use I am instead of I'm) | |
# # # - Incorporate thoughtful pauses or reflective questions when the conversation involves difficult topics | |
# # # - Use selective emojis (😊, 🤗, ❤️) only when tone-appropriate and not during crisis discussions | |
# # # - Balance warmth with professionalism | |
# # # 3. **Emotional Intelligence** | |
# # # - Validate feelings without judgment | |
# # # - Offer reassurance when appropriate, always centered on empowerment | |
# # # - Adjust your tone based on the emotional state conveyed | |
# # # 4. **Conversation Management** | |
# # # - Refer to {conversation_history} to maintain continuity and avoid repetition | |
# # # - Use clear paragraph breaks for readability | |
# # # 5. **Information Delivery** | |
# # # - Extract only relevant information from {context} that directly addresses the question | |
# # # - Present information in accessible, non-technical language | |
# # # - When information is unavailable, respond with: "I don't have that specific information right now, {first_name}. Would it be helpful if I focus on [alternative support option]?" | |
# # # 6. **Safety and Ethics** | |
# # # - Do not generate any speculative content or advice not supported by the context | |
# # # - If the context contains safety information, prioritize sharing that information | |
# # # Your response must come entirely from the provided context, maintaining the supportive tone while never introducing information from outside the provided materials. | |
# # # **Context:** {context} | |
# # # **User's Question:** {question} | |
# # # **Your Response:** | |
# # # """ | |
# # # rag_prompt = PromptTemplate.from_template(template) | |
# # # def get_context_and_question(query_with_session): | |
# # # # Extract query and session_id | |
# # # query = query_with_session["query"] | |
# # # session_id = query_with_session["session_id"] | |
# # # # Get the user session | |
# # # session = session_manager.get_session(session_id) | |
# # # user_info = session.get_user() | |
# # # first_name = user_info.get("Nickname", "User") | |
# # # conversation_hist = session.get_formatted_history() | |
# # # try: | |
# # # # Retrieve relevant documents | |
# # # retrieved_docs = retriever.invoke(query) | |
# # # context_str = format_context(retrieved_docs) | |
# # # except Exception as e: | |
# # # print(f"ERROR retrieving documents: {e}") | |
# # # context_str = "No relevant information found." | |
# # # # Return the combined inputs for the prompt | |
# # # return { | |
# # # "context": context_str, | |
# # # "question": query, | |
# # # "first_name": first_name, | |
# # # "conversation_history": conversation_hist | |
# # # } | |
# # # # Build the chain | |
# # # try: | |
# # # chain = ( | |
# # # RunnablePassthrough() | |
# # # | get_context_and_question | |
# # # | rag_prompt | |
# # # | llm | |
# # # | StrOutputParser() | |
# # # ) | |
# # # return chain | |
# # # except Exception as e: | |
# # # print(f"ERROR creating RAG chain: {e}") | |
# # # # Return a simple function as fallback | |
# # # def fallback_chain(query_with_session): | |
# # # session_id = query_with_session["session_id"] | |
# # # session = session_manager.get_session(session_id) | |
# # # nickname = session.get_user().get("Nickname", "there") | |
# # # return f"I'm here to help you, {nickname}, but I'm experiencing some technical difficulties right now. Please try again shortly." | |
# # # return fallback_chain | |
# # # def format_context(retrieved_docs): | |
# # # """Format retrieved documents into a string context.""" | |
# # # if not retrieved_docs: | |
# # # return "No relevant information available." | |
# # # return "\n\n".join([doc.page_content for doc in retrieved_docs]) | |
# # # def rag_memory_stream(message, history, session_id): | |
# # # """Process user message and generate response with memory.""" | |
# # # # Get the user session | |
# # # session = session_manager.get_session(session_id) | |
# # # # Add user message to history | |
# # # session.add_to_history("user", message) | |
# # # try: | |
# # # # Get response from RAG chain | |
# # # print(f"Processing message for session {session_id}: {message[:50]}...") | |
# # # # Pass both query and session_id to the chain | |
# # # response = rag_chain.invoke({ | |
# # # "query": message, | |
# # # "session_id": session_id | |
# # # }) | |
# # # print(f"Generated response: {response[:50]}...") | |
# # # # Add assistant response to history | |
# # # session.add_to_history("assistant", response) | |
# # # # Yield the response | |
# # # yield response | |
# # # except Exception as e: | |
# # # import traceback | |
# # # print(f"ERROR in rag_memory_stream: {e}") | |
# # # print(f"Detailed error: {traceback.format_exc()}") | |
# # # nickname = session.get_user().get("Nickname", "there") | |
# # # error_msg = f"I'm sorry, {nickname}. I encountered an error processing your request. Let's try a different question." | |
# # # session.add_to_history("assistant", error_msg) | |
# # # yield error_msg | |
# # # def collect_user_info(nickname, session_id): | |
# # # """Store user details and initialize session.""" | |
# # # if not nickname or nickname.strip() == "": | |
# # # return "Nickname is required to proceed.", gr.update(visible=False), gr.update(visible=True), [] | |
# # # # Store user info for chat session | |
# # # user_info = { | |
# # # "Nickname": nickname.strip(), | |
# # # "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
# # # } | |
# # # # Get the session and set user info | |
# # # session = session_manager.get_session(session_id) | |
# # # session.set_user(user_info) | |
# # # # Generate welcome message | |
# # # welcome_message = session.get_welcome_message() | |
# # # # Return welcome message and update UI | |
# # # return welcome_message, gr.update(visible=True), gr.update(visible=False), [(None, welcome_message)] | |
# # # def get_css(): | |
# # # """Define CSS for the UI.""" | |
# # # return """ | |
# # # :root { | |
# # # --primary: #4E6BBF; | |
# # # --primary-light: #697BBF; | |
# # # --text-primary: #333333; | |
# # # --text-secondary: #666666; | |
# # # --background: #F9FAFC; | |
# # # --card-bg: #FFFFFF; | |
# # # --border: #E1E5F0; | |
# # # --shadow: rgba(0, 0, 0, 0.05); | |
# # # } | |
# # # body, .gradio-container { | |
# # # margin: 0; | |
# # # padding: 0; | |
# # # width: 100vw; | |
# # # height: 100vh; | |
# # # display: flex; | |
# # # flex-direction: column; | |
# # # justify-content: center; | |
# # # align-items: center; | |
# # # background: var(--background); | |
# # # color: var(--text-primary); | |
# # # font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
# # # } | |
# # # .gradio-container { | |
# # # max-width: 100%; | |
# # # max-height: 100%; | |
# # # } | |
# # # .gr-box { | |
# # # background: var(--card-bg); | |
# # # color: var(--text-primary); | |
# # # border-radius: 12px; | |
# # # padding: 2rem; | |
# # # border: 1px solid var(--border); | |
# # # box-shadow: 0 4px 12px var(--shadow); | |
# # # } | |
# # # .gr-button-primary { | |
# # # background: var(--primary); | |
# # # color: white; | |
# # # padding: 12px 24px; | |
# # # border-radius: 8px; | |
# # # transition: all 0.3s ease; | |
# # # border: none; | |
# # # font-weight: bold; | |
# # # } | |
# # # .gr-button-primary:hover { | |
# # # transform: translateY(-1px); | |
# # # box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1); | |
# # # background: var(--primary-light); | |
# # # } | |
# # # footer { | |
# # # text-align: center; | |
# # # color: var(--text-secondary); | |
# # # padding: 1rem; | |
# # # font-size: 0.9em; | |
# # # } | |
# # # .gr-markdown h2 { | |
# # # color: var(--primary); | |
# # # margin-bottom: 0.5rem; | |
# # # font-size: 1.8em; | |
# # # } | |
# # # .gr-markdown h3 { | |
# # # color: var(--text-secondary); | |
# # # margin-bottom: 1.5rem; | |
# # # font-weight: normal; | |
# # # } | |
# # # #chatbot_container .chat-title h1, | |
# # # #chatbot_container .empty-chatbot { | |
# # # color: var(--primary); | |
# # # } | |
# # # #input_nickname { | |
# # # padding: 12px; | |
# # # border-radius: 8px; | |
# # # border: 1px solid var(--border); | |
# # # background: var(--card-bg); | |
# # # transition: all 0.3s ease; | |
# # # } | |
# # # #input_nickname:focus { | |
# # # border-color: var(--primary); | |
# # # box-shadow: 0 0 0 2px rgba(78, 107, 191, 0.2); | |
# # # outline: none; | |
# # # } | |
# # # .chatbot-container .message.user { | |
# # # background: #E8F0FE; | |
# # # border-radius: 12px 12px 0 12px; | |
# # # } | |
# # # .chatbot-container .message.bot { | |
# # # background: #F5F7FF; | |
# # # border-radius: 12px 12px 12px 0; | |
# # # } | |
# # # """ | |
# # # def create_ui(): | |
# # # """Create and configure the Gradio UI.""" | |
# # # with gr.Blocks(css=get_css(), theme=gr.themes.Soft()) as demo: | |
# # # # Create a unique session ID for this browser tab | |
# # # session_id = gr.State(value=f"session_{int(time.time())}_{os.urandom(4).hex()}") | |
# # # # Registration section | |
# # # with gr.Column(visible=True, elem_id="registration_container") as registration_container: | |
# # # gr.Markdown(f"## Welcome to {APP_NAME}") | |
# # # gr.Markdown("### Your privacy is important to us. Please provide a nickname to continue.") | |
# # # with gr.Row(): | |
# # # first_name = gr.Textbox( | |
# # # label="Nickname", | |
# # # placeholder="Enter your nickname", | |
# # # scale=1, | |
# # # elem_id="input_nickname" | |
# # # ) | |
# # # with gr.Row(): | |
# # # submit_btn = gr.Button("Start Chatting", variant="primary", scale=2) | |
# # # response_message = gr.Markdown() | |
# # # # Chatbot section (initially hidden) | |
# # # with gr.Column(visible=False, elem_id="chatbot_container") as chatbot_container: | |
# # # # Create a custom chat interface to pass session_id to our function | |
# # # chatbot = gr.Chatbot( | |
# # # elem_id="chatbot", | |
# # # height=500, | |
# # # show_label=False | |
# # # ) | |
# # # with gr.Row(): | |
# # # msg = gr.Textbox( | |
# # # placeholder="Type your message here...", | |
# # # show_label=False, | |
# # # container=False, | |
# # # scale=9 | |
# # # ) | |
# # # submit = gr.Button("Send", scale=1, variant="primary") | |
# # # examples = gr.Examples( | |
# # # examples=[ | |
# # # "What resources are available for GBV victims?", | |
# # # "How can I report an incident?", | |
# # # "What are my legal rights?", | |
# # # "I need help, what should I do first?" | |
# # # ], | |
# # # inputs=msg | |
# # # ) | |
# # # # Footer with version info | |
# # # gr.Markdown(f"{APP_NAME} {APP_VERSION} © 2025") | |
# # # # Handle chat message submission | |
# # # def respond(message, chat_history, session_id): | |
# # # bot_message = "" | |
# # # for chunk in rag_memory_stream(message, chat_history, session_id): | |
# # # bot_message += chunk | |
# # # chat_history.append((message, bot_message)) | |
# # # return "", chat_history | |
# # # msg.submit(respond, [msg, chatbot, session_id], [msg, chatbot]) | |
# # # submit.click(respond, [msg, chatbot, session_id], [msg, chatbot]) | |
# # # # Handle user registration | |
# # # submit_btn.click( | |
# # # collect_user_info, | |
# # # inputs=[first_name, session_id], | |
# # # outputs=[response_message, chatbot_container, registration_container, chatbot] | |
# # # ) | |
# # # return demo | |
# # # def launch_app(): | |
# # # """Launch the Gradio interface.""" | |
# # # ui = create_ui() | |
# # # ui.launch(share=True) | |
# # # # Main execution | |
# # # if __name__ == "__main__": | |
# # # try: | |
# # # # Initialize and launch the assistant | |
# # # initialize_assistant() | |
# # # launch_app() | |
# # # except Exception as e: | |
# # # import traceback | |
# # # print(f"❌ Fatal error initializing GBV Assistant: {e}") | |
# # # print(traceback.format_exc()) | |
# # # # Create a minimal emergency UI to display the error | |
# # # with gr.Blocks() as error_demo: | |
# # # gr.Markdown("## System Error") | |
# # # gr.Markdown(f"An error occurred while initializing the application: {str(e)}") | |
# # # gr.Markdown("Please check your configuration and try again.") | |
# # # error_demo.launch(share=True, inbrowser=True, debug=True) | |
# # ############################################################################################################ | |
# # import os | |
# # from langchain_groq import ChatGroq | |
# # from langchain.prompts import ChatPromptTemplate, PromptTemplate | |
# # from langchain.output_parsers import ResponseSchema, StructuredOutputParser | |
# # from urllib.parse import urljoin, urlparse | |
# # import requests | |
# # from io import BytesIO | |
# # from langchain_chroma import Chroma | |
# # import requests | |
# # from bs4 import BeautifulSoup | |
# # from langchain_core.prompts import ChatPromptTemplate | |
# # import gradio as gr | |
# # from PyPDF2 import PdfReader | |
# # from langchain_huggingface import HuggingFaceEmbeddings | |
# # groq_api_key= os.environ.get('GBV') | |
# # embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# # def scrape_websites(base_urls): | |
# # try: | |
# # visited_links = set() # To avoid revisiting the same link | |
# # content_by_url = {} # Store content from each URL | |
# # for base_url in base_urls: | |
# # if not base_url.strip(): | |
# # continue # Skip empty or invalid URLs | |
# # print(f"Scraping base URL: {base_url}") | |
# # html_content = fetch_page_content(base_url) | |
# # if html_content: | |
# # cleaned_content = clean_body_content(html_content) | |
# # content_by_url[base_url] = cleaned_content | |
# # visited_links.add(base_url) | |
# # # Extract and process all internal links | |
# # soup = BeautifulSoup(html_content, "html.parser") | |
# # links = extract_internal_links(base_url, soup) | |
# # for link in links: | |
# # if link not in visited_links: | |
# # print(f"Scraping link: {link}") | |
# # page_content = fetch_page_content(link) | |
# # if page_content: | |
# # cleaned_content = clean_body_content(page_content) | |
# # content_by_url[link] = cleaned_content | |
# # visited_links.add(link) | |
# # # If the link is a PDF file, extract its content | |
# # if link.lower().endswith('.pdf'): | |
# # print(f"Extracting PDF content from: {link}") | |
# # pdf_content = extract_pdf_text(link) | |
# # if pdf_content: | |
# # content_by_url[link] = pdf_content | |
# # return content_by_url | |
# # except Exception as e: | |
# # print(f"Error during scraping: {e}") | |
# # return {} | |
# # def fetch_page_content(url): | |
# # try: | |
# # response = requests.get(url, timeout=10) | |
# # response.raise_for_status() | |
# # return response.text | |
# # except requests.exceptions.RequestException as e: | |
# # print(f"Error fetching {url}: {e}") | |
# # return None | |
# # def extract_internal_links(base_url, soup): | |
# # links = set() | |
# # for anchor in soup.find_all("a", href=True): | |
# # href = anchor["href"] | |
# # full_url = urljoin(base_url, href) | |
# # if is_internal_link(base_url, full_url): | |
# # links.add(full_url) | |
# # return links | |
# # def is_internal_link(base_url, link_url): | |
# # base_netloc = urlparse(base_url).netloc | |
# # link_netloc = urlparse(link_url).netloc | |
# # return base_netloc == link_netloc | |
# # def extract_pdf_text(pdf_url): | |
# # try: | |
# # response = requests.get(pdf_url) | |
# # response.raise_for_status() | |
# # with BytesIO(response.content) as file: | |
# # reader = PdfReader(file) | |
# # pdf_text = "" | |
# # for page in reader.pages: | |
# # pdf_text += page.extract_text() | |
# # return pdf_text if pdf_text else None | |
# # except requests.exceptions.RequestException as e: | |
# # print(f"Error fetching PDF {pdf_url}: {e}") | |
# # return None | |
# # except Exception as e: | |
# # print(f"Error reading PDF {pdf_url}: {e}") | |
# # return None | |
# # def clean_body_content(html_content): | |
# # soup = BeautifulSoup(html_content, "html.parser") | |
# # for script_or_style in soup(["script", "style"]): | |
# # script_or_style.extract() | |
# # cleaned_content = soup.get_text(separator="\n") | |
# # cleaned_content = "\n".join( | |
# # line.strip() for line in cleaned_content.splitlines() if line.strip() | |
# # ) | |
# # return cleaned_content | |
# # if __name__ == "__main__": | |
# # website = ["https://haguruka.org.rw/" | |
# # ] | |
# # all_content = scrape_websites(website) | |
# # temp_list = [] | |
# # for url, content in all_content.items(): | |
# # temp_list.append((url, content)) | |
# # processed_texts = [] | |
# # for element in temp_list: | |
# # if isinstance(element, tuple): | |
# # url, content = element | |
# # processed_texts.append(f"url: {url}, content: {content}") | |
# # elif isinstance(element, str): | |
# # processed_texts.append(element) | |
# # else: | |
# # processed_texts.append(str(element)) | |
# # def chunk_string(s, chunk_size=1000): | |
# # return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
# # chunked_texts = [] | |
# # for text in processed_texts: | |
# # chunked_texts.extend(chunk_string(text)) | |
# # vectorstore = Chroma( | |
# # collection_name="GBVR_Dataset", | |
# # embedding_function=embed_model, | |
# # persist_directory="./", | |
# # ) | |
# # vectorstore.get().keys() | |
# # vectorstore.add_texts(chunked_texts) | |
# # template = (""" | |
# # You are a friendly, intelligent, and conversational AI assistant designed to provide accurate, engaging, and human-like responses based on the given context. Your goal is to extract relevant details from the provided context: {context} and assist the user effectively. Follow these guidelines: | |
# # 1. **Warm & Natural Interaction** | |
# # - If the user greets you (e.g., "Hello," "Hi," "Good morning"), respond warmly and acknowledge them. | |
# # - Example responses: | |
# # - "😊 Good morning! How can I assist you today?" | |
# # - "Hello! What can I do for you? 🚀" | |
# # 2. **Precise Information Extraction** | |
# # - Provide only the relevant details from the given context: {context}. | |
# # - Do not generate extra content or assumptions beyond the provided information. | |
# # 3. **Conversational & Engaging Tone** | |
# # - Keep responses friendly, natural, and engaging. | |
# # - Use occasional emojis (e.g., 😊, 🚀) to make interactions more lively. | |
# # 4. **Awareness of Real-Time Context** | |
# # - If necessary, acknowledge the current date and time to show awareness of real-world updates. | |
# # 5. **Handling Missing Information** | |
# # - If no relevant information exists in the context, respond politely: | |
# # - "I don't have that information at the moment, but I'm happy to help with something else! 😊" | |
# # 6. **Personalized Interaction** | |
# # - If user history is available, tailor responses based on their previous interactions for a more natural and engaging conversation. | |
# # 7. **Direct, Concise Responses** | |
# # - If the user requests specific data, provide only the requested details without unnecessary explanations unless asked. | |
# # 8. **Extracting Relevant Links** | |
# # - If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
# # - Example response: | |
# # - "Here is the link you requested: [URL]" | |
# # **Context:** {context} | |
# # **User's Question:** {question} | |
# # **Your Response:** | |
# # """) | |
# # rag_prompt = PromptTemplate.from_template(template) | |
# # retriever = vectorstore.as_retriever() | |
# # from langchain_core.output_parsers import StrOutputParser | |
# # from langchain_core.runnables import RunnablePassthrough | |
# # llm = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key ) | |
# # rag_chain = ( | |
# # {"context": retriever, "question": RunnablePassthrough()} | |
# # | rag_prompt | |
# # | llm | |
# # | StrOutputParser() | |
# # ) | |
# # # Define the RAG memory stream function | |
# # def rag_memory_stream(message, history): | |
# # partial_text = "" | |
# # for new_text in rag_chain.stream(message): # Replace with actual streaming logic | |
# # partial_text += new_text | |
# # yield partial_text | |
# # # Title with emojis | |
# # title = "GBVR Chatbot" | |
# # # Custom CSS for styling the interface | |
# # custom_css = """ | |
# # body { | |
# # font-family: "Arial", serif; | |
# # } | |
# # .gradio-container { | |
# # font-family: "Times New Roman", serif; | |
# # } | |
# # .gr-button { | |
# # background-color: #007bff; /* Blue button */ | |
# # color: white; | |
# # border: none; | |
# # border-radius: 5px; | |
# # font-size: 16px; | |
# # padding: 10px 20px; | |
# # cursor: pointer; | |
# # } | |
# # .gr-textbox:focus, .gr-button:focus { | |
# # outline: none; /* Remove outline focus for a cleaner look */ | |
# # } | |
# # """ | |
# # # Create the Chat Interface | |
# # demo = gr.ChatInterface( | |
# # fn=rag_memory_stream, | |
# # title=title, | |
# # fill_height=True, | |
# # theme="soft", | |
# # css=custom_css, # Apply the custom CSS | |
# # ) | |
# # # Launch the app | |
# # if __name__ == "__main__": | |
# # demo.launch(share=True, inbrowser=True, debug=True) | |
# import os | |
# from langchain_groq import ChatGroq | |
# from langchain.prompts import ChatPromptTemplate, PromptTemplate | |
# from langchain.output_parsers import ResponseSchema, StructuredOutputParser | |
# from urllib.parse import urljoin, urlparse | |
# import requests | |
# from io import BytesIO | |
# from langchain_chroma import Chroma | |
# import requests | |
# from bs4 import BeautifulSoup | |
# from langchain_core.prompts import ChatPromptTemplate | |
# import gradio as gr | |
# from PyPDF2 import PdfReader | |
# from langchain_huggingface import HuggingFaceEmbeddings | |
# from langchain_core.output_parsers import StrOutputParser | |
# from langchain_core.runnables import RunnablePassthrough | |
# # Simple session management | |
# class SessionManager: | |
# def __init__(self): | |
# self.sessions = {} | |
# def get_or_create_session(self, session_id): | |
# if session_id not in self.sessions: | |
# self.sessions[session_id] = [] | |
# return self.sessions[session_id] | |
# def add_interaction(self, session_id, user_message, ai_response): | |
# session = self.get_or_create_session(session_id) | |
# session.append({"user": user_message, "ai": ai_response}) | |
# def get_history(self, session_id, max_turns=5): | |
# session = self.get_or_create_session(session_id) | |
# recent_history = session[-max_turns:] if len(session) > max_turns else session | |
# history_text = "" | |
# for interaction in recent_history: | |
# history_text += f"User: {interaction['user']}\n" | |
# history_text += f"Assistant: {interaction['ai']}\n\n" | |
# return history_text.strip() | |
# # Initialize session manager | |
# session_manager = SessionManager() | |
# groq_api_key= os.environ.get('GBV') | |
# embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# def scrape_websites(base_urls): | |
# try: | |
# visited_links = set() # To avoid revisiting the same link | |
# content_by_url = {} # Store content from each URL | |
# for base_url in base_urls: | |
# if not base_url.strip(): | |
# continue # Skip empty or invalid URLs | |
# print(f"Scraping base URL: {base_url}") | |
# html_content = fetch_page_content(base_url) | |
# if html_content: | |
# cleaned_content = clean_body_content(html_content) | |
# content_by_url[base_url] = cleaned_content | |
# visited_links.add(base_url) | |
# # Extract and process all internal links | |
# soup = BeautifulSoup(html_content, "html.parser") | |
# links = extract_internal_links(base_url, soup) | |
# for link in links: | |
# if link not in visited_links: | |
# print(f"Scraping link: {link}") | |
# page_content = fetch_page_content(link) | |
# if page_content: | |
# cleaned_content = clean_body_content(page_content) | |
# content_by_url[link] = cleaned_content | |
# visited_links.add(link) | |
# # If the link is a PDF file, extract its content | |
# if link.lower().endswith('.pdf'): | |
# print(f"Extracting PDF content from: {link}") | |
# pdf_content = extract_pdf_text(link) | |
# if pdf_content: | |
# content_by_url[link] = pdf_content | |
# return content_by_url | |
# except Exception as e: | |
# print(f"Error during scraping: {e}") | |
# return {} | |
# def fetch_page_content(url): | |
# try: | |
# response = requests.get(url, timeout=10) | |
# response.raise_for_status() | |
# return response.text | |
# except requests.exceptions.RequestException as e: | |
# print(f"Error fetching {url}: {e}") | |
# return None | |
# def extract_internal_links(base_url, soup): | |
# links = set() | |
# for anchor in soup.find_all("a", href=True): | |
# href = anchor["href"] | |
# full_url = urljoin(base_url, href) | |
# if is_internal_link(base_url, full_url): | |
# links.add(full_url) | |
# return links | |
# def is_internal_link(base_url, link_url): | |
# base_netloc = urlparse(base_url).netloc | |
# link_netloc = urlparse(link_url).netloc | |
# return base_netloc == link_netloc | |
# def extract_pdf_text(pdf_url): | |
# try: | |
# response = requests.get(pdf_url) | |
# response.raise_for_status() | |
# with BytesIO(response.content) as file: | |
# reader = PdfReader(file) | |
# pdf_text = "" | |
# for page in reader.pages: | |
# pdf_text += page.extract_text() | |
# return pdf_text if pdf_text else None | |
# except requests.exceptions.RequestException as e: | |
# print(f"Error fetching PDF {pdf_url}: {e}") | |
# return None | |
# except Exception as e: | |
# print(f"Error reading PDF {pdf_url}: {e}") | |
# return None | |
# def clean_body_content(html_content): | |
# soup = BeautifulSoup(html_content, "html.parser") | |
# for script_or_style in soup(["script", "style"]): | |
# script_or_style.extract() | |
# cleaned_content = soup.get_text(separator="\n") | |
# cleaned_content = "\n".join( | |
# line.strip() for line in cleaned_content.splitlines() if line.strip() | |
# ) | |
# return cleaned_content | |
# if __name__ == "__main__": | |
# website = ["https://haguruka.org.rw/" | |
# ] | |
# all_content = scrape_websites(website) | |
# temp_list = [] | |
# for url, content in all_content.items(): | |
# temp_list.append((url, content)) | |
# processed_texts = [] | |
# for element in temp_list: | |
# if isinstance(element, tuple): | |
# url, content = element | |
# processed_texts.append(f"url: {url}, content: {content}") | |
# elif isinstance(element, str): | |
# processed_texts.append(element) | |
# else: | |
# processed_texts.append(str(element)) | |
# def chunk_string(s, chunk_size=1000): | |
# return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
# chunked_texts = [] | |
# for text in processed_texts: | |
# chunked_texts.extend(chunk_string(text)) | |
# vectorstore = Chroma( | |
# collection_name="GBVR_Dataset", | |
# embedding_function=embed_model, | |
# persist_directory="./", | |
# ) | |
# vectorstore.get().keys() | |
# vectorstore.add_texts(chunked_texts) | |
# # Updated template to include conversation history | |
# template = (""" | |
# You are a friendly, intelligent, and conversational AI assistant designed to provide accurate, engaging, and human-like responses based on the given context. Your goal is to extract relevant details from the provided context: {context} and assist the user effectively. Follow these guidelines: | |
# 1. **Warm & Natural Interaction** | |
# - If the user greets you (e.g., "Hello," "Hi," "Good morning"), respond warmly and acknowledge them. | |
# - Example responses: | |
# - "😊 Good morning! How can I assist you today?" | |
# - "Hello! What can I do for you? 🚀" | |
# 2. **Precise Information Extraction** | |
# - Provide only the relevant details from the given context: {context}. | |
# - Do not generate extra content or assumptions beyond the provided information. | |
# 3. **Conversational & Engaging Tone** | |
# - Keep responses friendly, natural, and engaging. | |
# - Use occasional emojis (e.g., 😊, 🚀) to make interactions more lively. | |
# 4. **Awareness of Real-Time Context** | |
# - If necessary, acknowledge the current date and time to show awareness of real-world updates. | |
# 5. **Handling Missing Information** | |
# - If no relevant information exists in the context, respond politely: | |
# - "I don't have that information at the moment, but I'm happy to help with something else! 😊" | |
# 6. **Personalized Interaction** | |
# - Use the conversation history to provide more personalized and contextually relevant responses. | |
# - Previous conversation history: {conversation_history} | |
# 7. **Direct, Concise Responses** | |
# - If the user requests specific data, provide only the requested details without unnecessary explanations unless asked. | |
# 8. **Extracting Relevant Links** | |
# - If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
# - Example response: | |
# - "Here is the link you requested: [URL]" | |
# **Context:** {context} | |
# **User's Question:** {question} | |
# **Your Response:** | |
# """) | |
# rag_prompt = PromptTemplate.from_template(template) | |
# retriever = vectorstore.as_retriever() | |
# llm = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key) | |
# # Dictionary to store user sessions with session IDs | |
# user_sessions = {} | |
# # Define the RAG chain with session history | |
# def rag_chain(question, session_id="default"): | |
# # Get conversation history if available | |
# conversation_history = session_manager.get_history(session_id) | |
# # Get context from retriever | |
# context_docs = retriever.invoke(question) | |
# context = "\n".join(doc.page_content for doc in context_docs) | |
# # Create prompt with history | |
# prompt = rag_prompt.format( | |
# context=context, | |
# question=question, | |
# conversation_history=conversation_history | |
# ) | |
# # Generate response | |
# response = llm.invoke(prompt).content | |
# # Store the interaction | |
# session_manager.add_interaction(session_id, question, response) | |
# return response | |
# # Define the RAG memory stream function | |
# def rag_memory_stream(message, history): | |
# # Generate a session ID based on the first message if not exists | |
# session_id = None | |
# for msg in history: | |
# if msg[0]: # If there's a user message | |
# # Use first few characters of first message as simple session ID | |
# session_id = hash(msg[0][:20]) if session_id is None else session_id | |
# break | |
# # Default session ID if history is empty | |
# if session_id is None: | |
# session_id = "default_session" | |
# # Process the message and get response | |
# response = rag_chain(message, str(session_id)) | |
# # Stream the response word by word | |
# partial_text = "" | |
# words = response.split(' ') | |
# for word in words: | |
# partial_text += word + " " | |
# yield partial_text.strip() | |
# # Title with emojis | |
# title = "GBVR Chatbot" | |
# # Custom CSS for styling the interface | |
# custom_css = """ | |
# body { | |
# font-family: "Arial", serif; | |
# } | |
# .gradio-container { | |
# font-family: "Times New Roman", serif; | |
# } | |
# .gr-button { | |
# background-color: #007bff; /* Blue button */ | |
# color: white; | |
# border: none; | |
# border-radius: 5px; | |
# font-size: 16px; | |
# padding: 10px 20px; | |
# cursor: pointer; | |
# } | |
# .gr-textbox:focus, .gr-button:focus { | |
# outline: none; /* Remove outline focus for a cleaner look */ | |
# } | |
# """ | |
# # Create the Chat Interface | |
# demo = gr.ChatInterface( | |
# fn=rag_memory_stream, | |
# title=title, | |
# fill_height=True, | |
# theme="soft", | |
# css=custom_css, # Apply the custom CSS | |
# ) | |
# # Launch the app | |
# if __name__ == "__main__": | |
# demo.launch(share=True, inbrowser=True, debug=True) | |
import os | |
from langchain_groq import ChatGroq | |
from langchain.prompts import ChatPromptTemplate, PromptTemplate | |
from langchain.output_parsers import ResponseSchema, StructuredOutputParser | |
from urllib.parse import urljoin, urlparse | |
import requests | |
from io import BytesIO | |
from langchain_chroma import Chroma | |
import requests | |
from bs4 import BeautifulSoup | |
from langchain_core.prompts import ChatPromptTemplate | |
import gradio as gr | |
from PyPDF2 import PdfReader | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
# Simple session management | |
class SessionManager: | |
def __init__(self): | |
self.sessions = {} | |
def get_or_create_session(self, session_id): | |
if session_id not in self.sessions: | |
self.sessions[session_id] = [] | |
return self.sessions[session_id] | |
def add_interaction(self, session_id, user_message, ai_response): | |
session = self.get_or_create_session(session_id) | |
session.append({"user": user_message, "ai": ai_response}) | |
def get_history(self, session_id, max_turns=5): | |
session = self.get_or_create_session(session_id) | |
recent_history = session[-max_turns:] if len(session) > max_turns else session | |
history_text = "" | |
for interaction in recent_history: | |
history_text += f"User: {interaction['user']}\n" | |
history_text += f"Assistant: {interaction['ai']}\n\n" | |
return history_text.strip() | |
# Initialize session manager | |
session_manager = SessionManager() | |
groq_api_key= os.environ.get('GBV') | |
embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
def scrape_websites(base_urls): | |
try: | |
visited_links = set() # To avoid revisiting the same link | |
content_by_url = {} # Store content from each URL | |
for base_url in base_urls: | |
if not base_url.strip(): | |
continue # Skip empty or invalid URLs | |
print(f"Scraping base URL: {base_url}") | |
html_content = fetch_page_content(base_url) | |
if html_content: | |
cleaned_content = clean_body_content(html_content) | |
content_by_url[base_url] = cleaned_content | |
visited_links.add(base_url) | |
# Extract and process all internal links | |
soup = BeautifulSoup(html_content, "html.parser") | |
links = extract_internal_links(base_url, soup) | |
for link in links: | |
if link not in visited_links: | |
print(f"Scraping link: {link}") | |
page_content = fetch_page_content(link) | |
if page_content: | |
cleaned_content = clean_body_content(page_content) | |
content_by_url[link] = cleaned_content | |
visited_links.add(link) | |
# If the link is a PDF file, extract its content | |
if link.lower().endswith('.pdf'): | |
print(f"Extracting PDF content from: {link}") | |
pdf_content = extract_pdf_text(link) | |
if pdf_content: | |
content_by_url[link] = pdf_content | |
return content_by_url | |
except Exception as e: | |
print(f"Error during scraping: {e}") | |
return {} | |
def fetch_page_content(url): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
return response.text | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
def extract_internal_links(base_url, soup): | |
links = set() | |
for anchor in soup.find_all("a", href=True): | |
href = anchor["href"] | |
full_url = urljoin(base_url, href) | |
if is_internal_link(base_url, full_url): | |
links.add(full_url) | |
return links | |
def is_internal_link(base_url, link_url): | |
base_netloc = urlparse(base_url).netloc | |
link_netloc = urlparse(link_url).netloc | |
return base_netloc == link_netloc | |
def extract_pdf_text(pdf_url): | |
try: | |
response = requests.get(pdf_url) | |
response.raise_for_status() | |
with BytesIO(response.content) as file: | |
reader = PdfReader(file) | |
pdf_text = "" | |
for page in reader.pages: | |
pdf_text += page.extract_text() | |
return pdf_text if pdf_text else None | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching PDF {pdf_url}: {e}") | |
return None | |
except Exception as e: | |
print(f"Error reading PDF {pdf_url}: {e}") | |
return None | |
def clean_body_content(html_content): | |
soup = BeautifulSoup(html_content, "html.parser") | |
for script_or_style in soup(["script", "style"]): | |
script_or_style.extract() | |
cleaned_content = soup.get_text(separator="\n") | |
cleaned_content = "\n".join( | |
line.strip() for line in cleaned_content.splitlines() if line.strip() | |
) | |
return cleaned_content | |
if __name__ == "__main__": | |
website = ["https://haguruka.org.rw/" | |
] | |
all_content = scrape_websites(website) | |
temp_list = [] | |
for url, content in all_content.items(): | |
temp_list.append((url, content)) | |
processed_texts = [] | |
for element in temp_list: | |
if isinstance(element, tuple): | |
url, content = element | |
processed_texts.append(f"url: {url}, content: {content}") | |
elif isinstance(element, str): | |
processed_texts.append(element) | |
else: | |
processed_texts.append(str(element)) | |
def chunk_string(s, chunk_size=1000): | |
return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
chunked_texts = [] | |
for text in processed_texts: | |
chunked_texts.extend(chunk_string(text)) | |
vectorstore = Chroma( | |
collection_name="GBVR_Datast", | |
embedding_function=embed_model, | |
persist_directory="./", | |
) | |
vectorstore.get().keys() | |
vectorstore.add_texts(chunked_texts) | |
# Updated template to include conversation history | |
template = (""" | |
You are a friendly, intelligent, and conversational AI assistant designed to provide accurate, engaging, and human-like responses based on the given context. Your goal is to extract relevant details from the provided context: {context} and assist the user effectively. Follow these guidelines: | |
1. **Warm & Natural Interaction** | |
- If the user greets you (e.g., "Hello," "Hi," "Good morning"), respond warmly and acknowledge them. | |
- Example responses: | |
- "😊 Good morning! How can I assist you today?" | |
- "Hello! What can I do for you? 🚀" | |
2. **Precise Information Extraction** | |
- Provide only the relevant details from the given context: {context}. | |
- Do not generate extra content or assumptions beyond the provided information. | |
3. **Conversational & Engaging Tone** | |
- Keep responses friendly, natural, and engaging. | |
- Use occasional emojis (e.g., 😊, 🚀) to make interactions more lively. | |
4. **Awareness of Real-Time Context** | |
- If necessary, acknowledge the current date and time to show awareness of real-world updates. | |
5. **Handling Missing Information** | |
- If no relevant information exists in the context, respond politely: | |
- "I don't have that information at the moment, but I'm happy to help with something else! 😊" | |
6. **Personalized Interaction** | |
- Use the conversation history to provide more personalized and contextually relevant responses. | |
- Previous conversation history: {conversation_history} | |
7. **Direct, Concise Responses** | |
- If the user requests specific data, provide only the requested details without unnecessary explanations unless asked. | |
8. **Extracting Relevant Links** | |
- If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
- Example response: | |
- "Here is the link you requested: [URL]" | |
**Context:** {context} | |
**User's Question:** {question} | |
**Your Response:** | |
""") | |
rag_prompt = PromptTemplate.from_template(template) | |
retriever = vectorstore.as_retriever() | |
llm = ChatGroq(model="llama-3.3-70b-versatile", api_key=groq_api_key) | |
# Dictionary to store user sessions with session IDs | |
user_sessions = {} | |
# Define the RAG chain with session history | |
def rag_chain(question, session_id="default"): | |
# Get conversation history if available | |
conversation_history = session_manager.get_history(session_id) | |
# Get context from retriever | |
context_docs = retriever.invoke(question) | |
context = "\n".join(doc.page_content for doc in context_docs) | |
# Create prompt with history | |
prompt = rag_prompt.format( | |
context=context, | |
question=question, | |
conversation_history=conversation_history | |
) | |
# Generate response | |
response = llm.invoke(prompt).content | |
# Store the interaction | |
session_manager.add_interaction(session_id, question, response) | |
return response | |
# Define the RAG memory stream function | |
def rag_memory_stream(message, history): | |
# Generate a session ID based on the first message if not exists | |
session_id = None | |
for msg in history: | |
if msg[0]: # If there's a user message | |
# Use first few characters of first message as simple session ID | |
session_id = hash(msg[0][:20]) if session_id is None else session_id | |
break | |
# Default session ID if history is empty | |
if session_id is None: | |
session_id = "default_session" | |
# Process the message and get response | |
response = rag_chain(message, str(session_id)) | |
# Stream the response word by word | |
partial_text = "" | |
words = response.split(' ') | |
for word in words: | |
partial_text += word + " " | |
yield partial_text.strip() | |
# Title with emojis | |
title = "GBVR Chatbot" | |
# Custom CSS for styling the interface | |
custom_css = """ | |
/* Custom CSS for styling the interface */ | |
body { | |
font-family: "Arial", serif; | |
} | |
.gradio-container { | |
font-family: "Times New Roman", serif; | |
} | |
.gr-button { | |
background-color: #007bff; /* Blue button */ | |
color: white; | |
border: none; | |
border-radius: 5px; | |
font-size: 16px; | |
padding: 10px 20px; | |
cursor: pointer; | |
} | |
.gr-textbox:focus, .gr-button:focus { | |
outline: none; /* Remove outline focus for a cleaner look */ | |
} | |
/* Specific CSS for the welcome message */ | |
.gradio-description { | |
font-size: 30px; /* Set font size for the welcome message */ | |
font-family: "Arial", sans-serif; | |
text-align: center; /* Optional: Center-align the text */ | |
padding: 20px; /* Optional: Add padding around the welcome message */ | |
} | |
""" | |
# Generate a simple welcome message using the LLM | |
def generate_welcome_message(): | |
welcome_prompt = """ | |
Generate a short, simple welcome message for a chatbot about Gender-Based Violence Resources in Rwanda. | |
Keep it under 3 sentences, and use simple language. | |
Make it warm and supportive but direct and easy to read. | |
""" | |
# Get the welcome message from the LLM | |
welcome_message = llm.invoke(welcome_prompt).content | |
return welcome_message | |
# Create simple welcome message | |
welcome_msg = generate_welcome_message() | |
# Create the Chat Interface with welcome message | |
demo = gr.ChatInterface( | |
fn=rag_memory_stream, | |
title=title, | |
fill_height=True, | |
theme="soft", | |
css=custom_css, # Apply the custom CSS | |
description=welcome_msg | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch(share=True, inbrowser=True, debug=True) |