import streamlit as st from llama_index.core.agent import ReActAgent from llama_index.llms.groq import Groq from llama_index.core.tools import FunctionTool from llama_index.tools.tavily_research.base import TavilyToolSpec import os import json import pandas as pd from datetime import datetime from dotenv import load_dotenv import time import base64 import plotly.graph_objects as go import re from io import StringIO import sys # Load environment variables load_dotenv() # Model rate limits information MODEL_LIMITS = { "allam-2-7b": { "rpm": 30, "rpd": 7000, "tpm": 6000, "tpd": "No limit" }, "deepseek-r1-distill-llama-70b": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": "No limit" }, "deepseek-r1-distill-qwen-32b": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": "No limit" }, "gemma2-9b-it": { "rpm": 30, "rpd": 14400, "tpm": 15000, "tpd": 500000 }, "llama-3.1-8b-instant": { "rpm": 30, "rpd": 14400, "tpm": 6000, "tpd": 500000 }, "llama-3.2-11b-vision-preview": { "rpm": 30, "rpd": 7000, "tpm": 7000, "tpd": 500000 }, "llama-3.2-1b-preview": { "rpm": 30, "rpd": 7000, "tpm": 7000, "tpd": 500000 }, "llama-3.2-3b-preview": { "rpm": 30, "rpd": 7000, "tpm": 7000, "tpd": 500000 }, "llama-3.2-90b-vision-preview": { "rpm": 15, "rpd": 3500, "tpm": 7000, "tpd": 250000 }, "llama-3.3-70b-specdec": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": 100000 }, "llama-3.3-70b-versatile": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": 100000 }, "llama-guard-3-8b": { "rpm": 30, "rpd": 14400, "tpm": 15000, "tpd": 500000 }, "llama3-70b-8192": { "rpm": 30, "rpd": 14400, "tpm": 6000, "tpd": 500000 }, "llama3-8b-8192": { "rpm": 30, "rpd": 14400, "tpm": 6000, "tpd": 500000 }, "mistral-saba-24b": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": 500000 }, "qwen-2.5-32b": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": "No limit" }, "qwen-2.5-coder-32b": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": "No limit" }, "qwen-qwq-32b": { "rpm": 30, "rpd": 1000, "tpm": 6000, "tpd": "No limit" }, "claude-3-5-sonnet-20240620": { "rpm": 30, "rpd": 14400, "tpm": 15000, "tpd": 500000 }, "mixtral-8x7b-32768": { "rpm": 30, "rpd": 14400, "tpm": 15000, "tpd": 500000 } } # Initialize session state if not already done if 'conversation_history' not in st.session_state: st.session_state.conversation_history = [] if 'api_key' not in st.session_state: st.session_state.api_key = "" if 'current_response' not in st.session_state: st.session_state.current_response = None if 'feedback_data' not in st.session_state: st.session_state.feedback_data = [] if 'current_sources' not in st.session_state: st.session_state.current_sources = [] if 'thinking_process' not in st.session_state: st.session_state.thinking_process = "" # Custom CSS for better UI st.markdown(""" """, unsafe_allow_html=True) # Main title and description st.markdown('
TechMatrix AI Web Search Agent
', unsafe_allow_html=True) st.markdown(''' This intelligent agent uses state-of-the-art LLM technology to search the web and provide comprehensive answers to your questions. Simply enter your query, and let our AI handle the rest! ''') # Sidebar for team information with st.sidebar: st.markdown('
TechMatrix Solvers
', unsafe_allow_html=True) st.markdown('
👑 Abhay Gupta (Team Leader)
', unsafe_allow_html=True) st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/abhay-gupta-197b17264/)') st.markdown('
🧠 Mayank Das Bairagi
', unsafe_allow_html=True) st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/mayank-das-bairagi-18639525a/)') st.markdown('
💻 Kripanshu Gupta
', unsafe_allow_html=True) st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/kripanshu-gupta-a66349261/)') st.markdown('
🔍 Bhumika Patel
', unsafe_allow_html=True) st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/bhumika-patel-ml/)') st.markdown('---') # Advanced Settings st.markdown('
Advanced Settings
', unsafe_allow_html=True) # Available models available_models = [ 'gemma2-9b-it', 'llama3-8b-8192', 'mixtral-8x7b-32768', 'llama3-70b-8192', 'claude-3-5-sonnet-20240620', 'llama-3.1-8b-instant', 'llama-3.2-3b-preview', 'llama-3.3-70b-versatile', 'qwen-2.5-32b', 'mistral-saba-24b' ] model_option = st.selectbox( 'LLM Model', available_models, index=0, help="Select from available Groq models" ) # Display the rate limits for the selected model if model_option in MODEL_LIMITS: limits = MODEL_LIMITS[model_option] st.markdown('
', unsafe_allow_html=True) st.markdown(f"#### Rate Limits for {model_option}") # Create a table for the limits st.markdown("""
Limit Type Value
Requests per Minute {rpm}
Requests per Day {rpd}
Tokens per Minute {tpm}
Tokens per Day {tpd}
""".format( rpm=limits['rpm'], rpd=limits['rpd'], tpm=limits['tpm'], tpd=limits['tpd'] ), unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) search_depth = st.slider('Search Depth', min_value=1, max_value=8, value=5, help="Higher values will search more thoroughly but take longer") show_thinking = st.checkbox('Show AI Thinking Process', value=True, help="Display the step-by-step reasoning process of the AI") # Clear history button if st.button('Clear Conversation History'): st.session_state.conversation_history = [] st.success('Conversation history cleared!') # Analytics section in sidebar if st.session_state.feedback_data: st.markdown('---') st.markdown('
Response Analytics
', unsafe_allow_html=True) # Calculate average rating ratings = [item['rating'] for item in st.session_state.feedback_data if 'rating' in item] avg_rating = sum(ratings) / len(ratings) if ratings else 0 # Create a chart fig = go.Figure(go.Indicator( mode="gauge+number", value=avg_rating, title={'text': "Average Rating"}, domain={'x': [0, 1], 'y': [0, 1]}, gauge={ 'axis': {'range': [0, 5]}, 'bar': {'color': "#6200EA"}, 'steps': [ {'range': [0, 2], 'color': "#FFD0D0"}, {'range': [2, 3.5], 'color': "#FFFFCC"}, {'range': [3.5, 5], 'color': "#D0FFD0"} ] } )) fig.update_layout(height=250, margin=dict(l=20, r=20, t=30, b=20)) st.plotly_chart(fig, use_container_width=True) # Show feedback counts feedback_counts = {"👍 Helpful": 0, "👎 Not Helpful": 0} for item in st.session_state.feedback_data: if 'feedback' in item: if item['feedback'] == 'helpful': feedback_counts["👍 Helpful"] += 1 elif item['feedback'] == 'not_helpful': feedback_counts["👎 Not Helpful"] += 1 st.markdown("### Feedback Summary") for key, value in feedback_counts.items(): st.markdown(f"**{key}:** {value}") # API key input section st.markdown('
API Credentials
', unsafe_allow_html=True) with st.expander("Configure API Keys"): st.markdown('
', unsafe_allow_html=True) api_key = st.text_input("Enter your Groq API key:", type="password", value=st.session_state.api_key, help="Get your API key from https://console.groq.com/keys") tavily_key = st.text_input("Enter your Tavily API key (optional):", type="password", help="Get your Tavily API key from https://tavily.com/#api") if api_key: st.session_state.api_key = api_key os.environ['GROQ_API_KEY'] = api_key if tavily_key: os.environ['TAVILY_API_KEY'] = tavily_key st.markdown('
', unsafe_allow_html=True) # Function to create download link for text data def get_download_link(text, filename, link_text): b64 = base64.b64encode(text.encode()).decode() href = f'{link_text}' return href # Function to handle feedback submission def submit_feedback(feedback_type, query, response): feedback_entry = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "query": query, "response": response, "feedback": feedback_type } st.session_state.feedback_data.append(feedback_entry) return True # Function to submit rating def submit_rating(rating, query, response): # Find if there's an existing entry for this query/response for entry in st.session_state.feedback_data: if entry.get('query') == query and entry.get('response') == response: entry['rating'] = rating return True # If not found, create a new entry feedback_entry = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "query": query, "response": response, "rating": rating } st.session_state.feedback_data.append(feedback_entry) return True # Function to extract URLs from text def extract_urls(text): url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+' return re.findall(url_pattern, text) # Custom callback to capture agent's thinking process class ThinkingCapture: def __init__(self): self.thinking_steps = [] def on_agent_step(self, agent_step): # Capture the thought process if hasattr(agent_step, 'thought') and agent_step.thought: self.thinking_steps.append(f"Thought: {agent_step.thought}") if hasattr(agent_step, 'action') and agent_step.action: self.thinking_steps.append(f"Action: {agent_step.action}") if hasattr(agent_step, 'observation') and agent_step.observation: self.thinking_steps.append(f"Observation: {agent_step.observation}") return agent_step def get_thinking_process(self): return "\n".join(self.thinking_steps) # Setup search tools try: if 'TAVILY_API_KEY' in os.environ and os.environ['TAVILY_API_KEY']: search = TavilyToolSpec(api_key=os.environ['TAVILY_API_KEY']) else: # Fallback to a default key or inform the user st.warning("Using default Tavily API key with limited quota. For better results, please provide your own key.") search = TavilyToolSpec(api_key=os.getenv('TAVILY_API_KEY')) def search_tool(prompt: str) -> list: """Search the web for information about the given prompt.""" try: search_results = search.search(prompt, max_results=search_depth) # Store source URLs sources = [] for result in search_results: if hasattr(result, 'url') and result.url: sources.append({ 'title': result.title if hasattr(result, 'title') else "Unknown Source", 'url': result.url }) # Store in session state for later display st.session_state.current_sources = sources return [result.text for result in search_results] except Exception as e: return [f"Error during search: {str(e)}"] search_toolkit = FunctionTool.from_defaults(fn=search_tool) except Exception as e: st.error(f"Error setting up search tools: {str(e)}") search_toolkit = None # Query input query = st.text_input("What would you like to know?", placeholder="Enter your question here...", help="Ask any question, and our AI will search the web for answers") # Search button search_button = st.button("🔍 Search") # Process the search when button is clicked if search_button and query: # Check if API key is provided if not st.session_state.api_key: st.error("Please enter your Groq API key first!") else: try: with st.spinner("🧠 Searching the web and analyzing results..."): # Initialize the LLM and agent llm = Groq(model=model_option) # Initialize the thinking capture thinking_capture = ThinkingCapture() # Create the agent with step callbacks agent = ReActAgent.from_tools( [search_toolkit], llm=llm, verbose=True, step_callbacks=[thinking_capture.on_agent_step] ) # Clear current sources before the new search st.session_state.current_sources = [] # Get the response start_time = time.time() response = agent.chat(query) end_time = time.time() # Store the thinking process st.session_state.thinking_process = thinking_capture.get_thinking_process() # Extract any additional URLs from the response additional_urls = extract_urls(response.response) for url in additional_urls: if not any(source['url'] == url for source in st.session_state.current_sources): st.session_state.current_sources.append({ 'title': "Referenced Source", 'url': url }) # Store the response in session state st.session_state.current_response = { "query": query, "response": response.response, "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "duration": round(end_time - start_time, 2), "sources": st.session_state.current_sources, "thinking": st.session_state.thinking_process } # Add to conversation history st.session_state.conversation_history.append(st.session_state.current_response) # Display success message st.success(f"Found results in {round(end_time - start_time, 2)} seconds!") except Exception as e: st.error(f"An error occurred: {str(e)}") # Display quick source links if available if st.session_state.current_sources: st.markdown("### Source Websites:") st.markdown('', unsafe_allow_html=True) # Display current response if available if st.session_state.current_response: with st.container(): st.markdown('
', unsafe_allow_html=True) st.markdown("### Response:") st.write(st.session_state.current_response["response"]) # Export options col1, col2 = st.columns(2) with col1: st.markdown( get_download_link( st.session_state.current_response["response"], f"search_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", "Download as Text" ), unsafe_allow_html=True ) with col2: # Create JSON with metadata json_data = json.dumps({ "query": st.session_state.current_response["query"], "response": st.session_state.current_response["response"], "timestamp": st.session_state.current_response["time"], "processing_time": st.session_state.current_response["duration"], "sources": st.session_state.current_sources if "sources" in st.session_state.current_response else [], "thinking_process": st.session_state.thinking_process if "thinking" in st.session_state.current_response else "" }, indent=4) st.markdown( get_download_link( json_data, f"search_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", "Download as JSON with Sources" ), unsafe_allow_html=True ) st.markdown('
', unsafe_allow_html=True) # Display thinking process if enabled if show_thinking and "thinking" in st.session_state.current_response: with st.expander("View AI Thinking Process", expanded=True): st.markdown('
', unsafe_allow_html=True) # Create a formatted display of the thinking steps thinking_text = st.session_state.current_response["thinking"] steps = thinking_text.split('\n') for step in steps: if step.strip(): step_type = "" if step.startswith("Thought:"): step_type = "💭" elif step.startswith("Action:"): step_type = "🔍" elif step.startswith("Observation:"): step_type = "📊" st.markdown(f'
{step_type} {step}
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) # Display sources if available if "sources" in st.session_state.current_response and st.session_state.current_response["sources"]: with st.expander("View Detailed Sources", expanded=True): st.markdown('
', unsafe_allow_html=True) for i, source in enumerate(st.session_state.current_response["sources"]): st.markdown(f'
', unsafe_allow_html=True) st.markdown(f"**Source {i+1}:** {source.get('title', 'Unknown Source')}") st.markdown(f'', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) # Feedback section st.markdown('
', unsafe_allow_html=True) st.markdown("### Was this response helpful?") col1, col2 = st.columns(2) with col1: if st.button("👍 Helpful", key="helpful_btn"): if submit_feedback("helpful", st.session_state.current_response["query"], st.session_state.current_response["response"]): st.success("Thank you for your feedback!") with col2: if st.button("👎 Not Helpful", key="not_helpful_btn"): if submit_feedback("not_helpful", st.session_state.current_response["query"], st.session_state.current_response["response"]): st.success("Thank you for your feedback! We'll work to improve our responses.") st.markdown("### Rate this response:") rating = st.slider("", min_value=1, max_value=5, value=4, help="Rate the quality of this response from 1 (poor) to 5 (excellent)") if st.button("Submit Rating"): if submit_rating(rating, st.session_state.current_response["query"], st.session_state.current_response["response"]): st.success("Rating submitted! Thank you for helping us improve.") st.markdown('
', unsafe_allow_html=True) # Display conversation history if st.session_state.conversation_history: with st.expander("View Conversation History"): for i, item in enumerate(reversed(st.session_state.conversation_history)): st.markdown(f'
', unsafe_allow_html=True) st.markdown(f'Q: {item["query"]} ({item["time"]})', unsafe_allow_html=True) st.markdown(f'
A: {item["response"][:200]}{"..." if len(item["response"]) > 200 else ""}
', unsafe_allow_html=True) st.markdown('
', unsafe_allow_html=True) if i < len(st.session_state.conversation_history) - 1: st.markdown('---') # Footer with attribution st.markdown(''' ''', unsafe_allow_html=True)