import streamlit as st
from llama_index.core.agent import ReActAgent
from llama_index.llms.groq import Groq
from llama_index.core.tools import FunctionTool
from llama_index.tools.tavily_research.base import TavilyToolSpec
import os
import json
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
import time
import base64
import plotly.graph_objects as go
import re
from io import StringIO
import sys
# Load environment variables
load_dotenv()
# Model rate limits information
MODEL_LIMITS = {
"allam-2-7b": {
"rpm": 30,
"rpd": 7000,
"tpm": 6000,
"tpd": "No limit"
},
"deepseek-r1-distill-llama-70b": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": "No limit"
},
"deepseek-r1-distill-qwen-32b": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": "No limit"
},
"gemma2-9b-it": {
"rpm": 30,
"rpd": 14400,
"tpm": 15000,
"tpd": 500000
},
"llama-3.1-8b-instant": {
"rpm": 30,
"rpd": 14400,
"tpm": 6000,
"tpd": 500000
},
"llama-3.2-11b-vision-preview": {
"rpm": 30,
"rpd": 7000,
"tpm": 7000,
"tpd": 500000
},
"llama-3.2-1b-preview": {
"rpm": 30,
"rpd": 7000,
"tpm": 7000,
"tpd": 500000
},
"llama-3.2-3b-preview": {
"rpm": 30,
"rpd": 7000,
"tpm": 7000,
"tpd": 500000
},
"llama-3.2-90b-vision-preview": {
"rpm": 15,
"rpd": 3500,
"tpm": 7000,
"tpd": 250000
},
"llama-3.3-70b-specdec": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": 100000
},
"llama-3.3-70b-versatile": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": 100000
},
"llama-guard-3-8b": {
"rpm": 30,
"rpd": 14400,
"tpm": 15000,
"tpd": 500000
},
"llama3-70b-8192": {
"rpm": 30,
"rpd": 14400,
"tpm": 6000,
"tpd": 500000
},
"llama3-8b-8192": {
"rpm": 30,
"rpd": 14400,
"tpm": 6000,
"tpd": 500000
},
"mistral-saba-24b": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": 500000
},
"qwen-2.5-32b": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": "No limit"
},
"qwen-2.5-coder-32b": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": "No limit"
},
"qwen-qwq-32b": {
"rpm": 30,
"rpd": 1000,
"tpm": 6000,
"tpd": "No limit"
},
"claude-3-5-sonnet-20240620": {
"rpm": 30,
"rpd": 14400,
"tpm": 15000,
"tpd": 500000
},
"mixtral-8x7b-32768": {
"rpm": 30,
"rpd": 14400,
"tpm": 15000,
"tpd": 500000
}
}
# Initialize session state if not already done
if 'conversation_history' not in st.session_state:
st.session_state.conversation_history = []
if 'api_key' not in st.session_state:
st.session_state.api_key = ""
if 'current_response' not in st.session_state:
st.session_state.current_response = None
if 'feedback_data' not in st.session_state:
st.session_state.feedback_data = []
if 'current_sources' not in st.session_state:
st.session_state.current_sources = []
if 'thinking_process' not in st.session_state:
st.session_state.thinking_process = ""
# Custom CSS for better UI
st.markdown("""
""", unsafe_allow_html=True)
# Main title and description
st.markdown('
TechMatrix AI Web Search Agent
', unsafe_allow_html=True)
st.markdown('''
This intelligent agent uses state-of-the-art LLM technology to search the web and provide comprehensive answers to your questions.
Simply enter your query, and let our AI handle the rest!
''')
# Sidebar for team information
with st.sidebar:
st.markdown('', unsafe_allow_html=True)
st.markdown('👑 Abhay Gupta (Team Leader)
', unsafe_allow_html=True)
st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/abhay-gupta-197b17264/)')
st.markdown('🧠 Mayank Das Bairagi
', unsafe_allow_html=True)
st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/mayank-das-bairagi-18639525a/)')
st.markdown('💻 Kripanshu Gupta
', unsafe_allow_html=True)
st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/kripanshu-gupta-a66349261/)')
st.markdown('🔍 Bhumika Patel
', unsafe_allow_html=True)
st.markdown('[LinkedIn Profile](https://www.linkedin.com/in/bhumika-patel-ml/)')
st.markdown('---')
# Advanced Settings
st.markdown('', unsafe_allow_html=True)
# Available models
available_models = [
'gemma2-9b-it',
'llama3-8b-8192',
'mixtral-8x7b-32768',
'llama3-70b-8192',
'claude-3-5-sonnet-20240620',
'llama-3.1-8b-instant',
'llama-3.2-3b-preview',
'llama-3.3-70b-versatile',
'qwen-2.5-32b',
'mistral-saba-24b'
]
model_option = st.selectbox(
'LLM Model',
available_models,
index=0,
help="Select from available Groq models"
)
# Display the rate limits for the selected model
if model_option in MODEL_LIMITS:
limits = MODEL_LIMITS[model_option]
st.markdown('', unsafe_allow_html=True)
st.markdown(f"#### Rate Limits for {model_option}")
# Create a table for the limits
st.markdown("""
Limit Type |
Value |
Requests per Minute |
{rpm} |
Requests per Day |
{rpd} |
Tokens per Minute |
{tpm} |
Tokens per Day |
{tpd} |
""".format(
rpm=limits['rpm'],
rpd=limits['rpd'],
tpm=limits['tpm'],
tpd=limits['tpd']
), unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
search_depth = st.slider('Search Depth', min_value=1, max_value=8, value=5,
help="Higher values will search more thoroughly but take longer")
show_thinking = st.checkbox('Show AI Thinking Process', value=True,
help="Display the step-by-step reasoning process of the AI")
# Clear history button
if st.button('Clear Conversation History'):
st.session_state.conversation_history = []
st.success('Conversation history cleared!')
# Analytics section in sidebar
if st.session_state.feedback_data:
st.markdown('---')
st.markdown('', unsafe_allow_html=True)
# Calculate average rating
ratings = [item['rating'] for item in st.session_state.feedback_data if 'rating' in item]
avg_rating = sum(ratings) / len(ratings) if ratings else 0
# Create a chart
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=avg_rating,
title={'text': "Average Rating"},
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 5]},
'bar': {'color': "#6200EA"},
'steps': [
{'range': [0, 2], 'color': "#FFD0D0"},
{'range': [2, 3.5], 'color': "#FFFFCC"},
{'range': [3.5, 5], 'color': "#D0FFD0"}
]
}
))
fig.update_layout(height=250, margin=dict(l=20, r=20, t=30, b=20))
st.plotly_chart(fig, use_container_width=True)
# Show feedback counts
feedback_counts = {"👍 Helpful": 0, "👎 Not Helpful": 0}
for item in st.session_state.feedback_data:
if 'feedback' in item:
if item['feedback'] == 'helpful':
feedback_counts["👍 Helpful"] += 1
elif item['feedback'] == 'not_helpful':
feedback_counts["👎 Not Helpful"] += 1
st.markdown("### Feedback Summary")
for key, value in feedback_counts.items():
st.markdown(f"**{key}:** {value}")
# API key input section
st.markdown('', unsafe_allow_html=True)
with st.expander("Configure API Keys"):
st.markdown('', unsafe_allow_html=True)
api_key = st.text_input("Enter your Groq API key:",
type="password",
value=st.session_state.api_key,
help="Get your API key from https://console.groq.com/keys")
tavily_key = st.text_input("Enter your Tavily API key (optional):",
type="password",
help="Get your Tavily API key from https://tavily.com/#api")
if api_key:
st.session_state.api_key = api_key
os.environ['GROQ_API_KEY'] = api_key
if tavily_key:
os.environ['TAVILY_API_KEY'] = tavily_key
st.markdown('
', unsafe_allow_html=True)
# Function to create download link for text data
def get_download_link(text, filename, link_text):
b64 = base64.b64encode(text.encode()).decode()
href = f'{link_text}'
return href
# Function to handle feedback submission
def submit_feedback(feedback_type, query, response):
feedback_entry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"query": query,
"response": response,
"feedback": feedback_type
}
st.session_state.feedback_data.append(feedback_entry)
return True
# Function to submit rating
def submit_rating(rating, query, response):
# Find if there's an existing entry for this query/response
for entry in st.session_state.feedback_data:
if entry.get('query') == query and entry.get('response') == response:
entry['rating'] = rating
return True
# If not found, create a new entry
feedback_entry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"query": query,
"response": response,
"rating": rating
}
st.session_state.feedback_data.append(feedback_entry)
return True
# Function to extract URLs from text
def extract_urls(text):
url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
return re.findall(url_pattern, text)
# Custom callback to capture agent's thinking process
class ThinkingCapture:
def __init__(self):
self.thinking_steps = []
def on_agent_step(self, agent_step):
# Capture the thought process
if hasattr(agent_step, 'thought') and agent_step.thought:
self.thinking_steps.append(f"Thought: {agent_step.thought}")
if hasattr(agent_step, 'action') and agent_step.action:
self.thinking_steps.append(f"Action: {agent_step.action}")
if hasattr(agent_step, 'observation') and agent_step.observation:
self.thinking_steps.append(f"Observation: {agent_step.observation}")
return agent_step
def get_thinking_process(self):
return "\n".join(self.thinking_steps)
# Setup search tools
try:
if 'TAVILY_API_KEY' in os.environ and os.environ['TAVILY_API_KEY']:
search = TavilyToolSpec(api_key=os.environ['TAVILY_API_KEY'])
else:
# Fallback to a default key or inform the user
st.warning("Using default Tavily API key with limited quota. For better results, please provide your own key.")
search = TavilyToolSpec(api_key=os.getenv('TAVILY_API_KEY'))
def search_tool(prompt: str) -> list:
"""Search the web for information about the given prompt."""
try:
search_results = search.search(prompt, max_results=search_depth)
# Store source URLs
sources = []
for result in search_results:
if hasattr(result, 'url') and result.url:
sources.append({
'title': result.title if hasattr(result, 'title') else "Unknown Source",
'url': result.url
})
# Store in session state for later display
st.session_state.current_sources = sources
return [result.text for result in search_results]
except Exception as e:
return [f"Error during search: {str(e)}"]
search_toolkit = FunctionTool.from_defaults(fn=search_tool)
except Exception as e:
st.error(f"Error setting up search tools: {str(e)}")
search_toolkit = None
# Query input
query = st.text_input("What would you like to know?",
placeholder="Enter your question here...",
help="Ask any question, and our AI will search the web for answers")
# Search button
search_button = st.button("🔍 Search")
# Process the search when button is clicked
if search_button and query:
# Check if API key is provided
if not st.session_state.api_key:
st.error("Please enter your Groq API key first!")
else:
try:
with st.spinner("🧠 Searching the web and analyzing results..."):
# Initialize the LLM and agent
llm = Groq(model=model_option)
# Initialize the thinking capture
thinking_capture = ThinkingCapture()
# Create the agent with step callbacks
agent = ReActAgent.from_tools(
[search_toolkit],
llm=llm,
verbose=True,
step_callbacks=[thinking_capture.on_agent_step]
)
# Clear current sources before the new search
st.session_state.current_sources = []
# Get the response
start_time = time.time()
response = agent.chat(query)
end_time = time.time()
# Store the thinking process
st.session_state.thinking_process = thinking_capture.get_thinking_process()
# Extract any additional URLs from the response
additional_urls = extract_urls(response.response)
for url in additional_urls:
if not any(source['url'] == url for source in st.session_state.current_sources):
st.session_state.current_sources.append({
'title': "Referenced Source",
'url': url
})
# Store the response in session state
st.session_state.current_response = {
"query": query,
"response": response.response,
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"duration": round(end_time - start_time, 2),
"sources": st.session_state.current_sources,
"thinking": st.session_state.thinking_process
}
# Add to conversation history
st.session_state.conversation_history.append(st.session_state.current_response)
# Display success message
st.success(f"Found results in {round(end_time - start_time, 2)} seconds!")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
# Display quick source links if available
if st.session_state.current_sources:
st.markdown("### Source Websites:")
st.markdown('', unsafe_allow_html=True)
# Display current response if available
if st.session_state.current_response:
with st.container():
st.markdown('', unsafe_allow_html=True)
st.markdown("### Response:")
st.write(st.session_state.current_response["response"])
# Export options
col1, col2 = st.columns(2)
with col1:
st.markdown(
get_download_link(
st.session_state.current_response["response"],
f"search_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
"Download as Text"
),
unsafe_allow_html=True
)
with col2:
# Create JSON with metadata
json_data = json.dumps({
"query": st.session_state.current_response["query"],
"response": st.session_state.current_response["response"],
"timestamp": st.session_state.current_response["time"],
"processing_time": st.session_state.current_response["duration"],
"sources": st.session_state.current_sources if "sources" in st.session_state.current_response else [],
"thinking_process": st.session_state.thinking_process if "thinking" in st.session_state.current_response else ""
}, indent=4)
st.markdown(
get_download_link(
json_data,
f"search_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
"Download as JSON with Sources"
),
unsafe_allow_html=True
)
st.markdown('
', unsafe_allow_html=True)
# Display thinking process if enabled
if show_thinking and "thinking" in st.session_state.current_response:
with st.expander("View AI Thinking Process", expanded=True):
st.markdown('', unsafe_allow_html=True)
# Create a formatted display of the thinking steps
thinking_text = st.session_state.current_response["thinking"]
steps = thinking_text.split('\n')
for step in steps:
if step.strip():
step_type = ""
if step.startswith("Thought:"):
step_type = "💭"
elif step.startswith("Action:"):
step_type = "🔍"
elif step.startswith("Observation:"):
step_type = "📊"
st.markdown(f'
{step_type} {step}
', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
# Display sources if available
if "sources" in st.session_state.current_response and st.session_state.current_response["sources"]:
with st.expander("View Detailed Sources", expanded=True):
st.markdown('', unsafe_allow_html=True)
for i, source in enumerate(st.session_state.current_response["sources"]):
st.markdown(f'
', unsafe_allow_html=True)
st.markdown(f"**Source {i+1}:** {source.get('title', 'Unknown Source')}")
st.markdown(f'
', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
# Feedback section
st.markdown('', unsafe_allow_html=True)
st.markdown("### Was this response helpful?")
col1, col2 = st.columns(2)
with col1:
if st.button("👍 Helpful", key="helpful_btn"):
if submit_feedback("helpful", st.session_state.current_response["query"], st.session_state.current_response["response"]):
st.success("Thank you for your feedback!")
with col2:
if st.button("👎 Not Helpful", key="not_helpful_btn"):
if submit_feedback("not_helpful", st.session_state.current_response["query"], st.session_state.current_response["response"]):
st.success("Thank you for your feedback! We'll work to improve our responses.")
st.markdown("### Rate this response:")
rating = st.slider("", min_value=1, max_value=5, value=4,
help="Rate the quality of this response from 1 (poor) to 5 (excellent)")
if st.button("Submit Rating"):
if submit_rating(rating, st.session_state.current_response["query"], st.session_state.current_response["response"]):
st.success("Rating submitted! Thank you for helping us improve.")
st.markdown('
', unsafe_allow_html=True)
# Display conversation history
if st.session_state.conversation_history:
with st.expander("View Conversation History"):
for i, item in enumerate(reversed(st.session_state.conversation_history)):
st.markdown(f'', unsafe_allow_html=True)
st.markdown(f'
Q: {item["query"]} ({item["time"]})', unsafe_allow_html=True)
st.markdown(f'
A: {item["response"][:200]}{"..." if len(item["response"]) > 200 else ""}
', unsafe_allow_html=True)
st.markdown('
', unsafe_allow_html=True)
if i < len(st.session_state.conversation_history) - 1:
st.markdown('---')
# Footer with attribution
st.markdown('''
''', unsafe_allow_html=True)