Update app.py
Browse files
app.py
CHANGED
@@ -1,159 +1,540 @@
|
|
|
|
1 |
import os
|
2 |
import gradio as gr
|
3 |
from google import genai
|
4 |
-
|
|
|
|
|
5 |
import requests
|
6 |
import markdownify
|
7 |
from urllib.robotparser import RobotFileParser
|
8 |
from urllib.parse import urlparse
|
|
|
|
|
9 |
|
10 |
-
#
|
11 |
-
client = genai.Client(api_key=os.environ.get("GEMINI_API_KEY"))
|
12 |
-
MODEL = "gemini-2.5-pro-exp-03-25"
|
13 |
|
14 |
-
|
15 |
-
def can_crawl_url(url: str, user_agent: str = "*") -> bool:
|
16 |
"""Check robots.txt permissions for a URL"""
|
|
|
|
|
|
|
|
|
17 |
try:
|
18 |
parsed_url = urlparse(url)
|
|
|
|
|
|
|
|
|
19 |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
|
20 |
-
|
|
|
|
|
|
|
|
|
21 |
rp.read()
|
22 |
-
|
|
|
|
|
23 |
except Exception as e:
|
24 |
-
print(f"Error checking robots.txt: {e}")
|
|
|
25 |
return False
|
26 |
|
27 |
def load_page(url: str) -> str:
|
28 |
-
"""
|
29 |
-
|
30 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
31 |
try:
|
32 |
-
|
33 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
34 |
except Exception as e:
|
35 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
36 |
|
37 |
-
def execute_code(code: str) -> str:
|
38 |
-
"""Execute Python code safely"""
|
39 |
try:
|
40 |
-
|
41 |
-
|
42 |
-
|
43 |
-
|
44 |
-
|
45 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
46 |
except Exception as e:
|
47 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
48 |
|
49 |
-
def generate_response(user_input, mode):
|
50 |
try:
|
51 |
-
|
52 |
-
|
53 |
-
|
54 |
-
|
55 |
-
#
|
56 |
-
|
57 |
-
|
58 |
-
|
59 |
-
|
60 |
-
|
61 |
-
|
62 |
-
|
63 |
-
response = client.models.generate_content(
|
64 |
-
model=MODEL,
|
65 |
-
contents=[code_prompt]
|
66 |
-
)
|
67 |
-
|
68 |
-
if response.text:
|
69 |
-
code = response.text.strip()
|
70 |
-
# Execute the generated code
|
71 |
-
execution_result = execute_code(code)
|
72 |
-
return f"Generated Python code:\n```python\n{code}\n```\n\nExecution result:\n{execution_result}"
|
73 |
-
return "No code was generated for this request."
|
74 |
-
|
75 |
-
elif mode == "search":
|
76 |
-
# Simulate search functionality
|
77 |
-
search_prompt = f"""You are an AI assistant with web search capabilities.
|
78 |
-
For the query: "{user_input}"
|
79 |
-
1. Determine if this requires current/live information
|
80 |
-
2. If yes, suggest specific URLs to visit
|
81 |
-
3. If no, answer directly"""
|
82 |
-
|
83 |
-
response = client.models.generate_content(
|
84 |
-
model=MODEL,
|
85 |
-
contents=[search_prompt]
|
86 |
-
)
|
87 |
-
|
88 |
-
if "http" in response.text:
|
89 |
-
# Extract URL if mentioned
|
90 |
-
url = response.text.split("http")[1].split()[0]
|
91 |
-
url = "http" + url.split('"')[0].split("'")[0].split()[0]
|
92 |
-
page_content = load_page(url)
|
93 |
-
return f"Information from {url}:\n\n{page_content[:2000]}..."
|
94 |
-
return response.text
|
95 |
-
|
96 |
-
else: # default mode
|
97 |
-
response = client.models.generate_content(
|
98 |
-
model=MODEL,
|
99 |
-
contents=[user_input]
|
100 |
)
|
101 |
-
|
102 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
103 |
except Exception as e:
|
104 |
-
|
105 |
-
|
106 |
-
|
107 |
-
|
108 |
-
|
109 |
-
|
110 |
-
|
111 |
-
|
112 |
-
|
113 |
-
|
114 |
-
|
115 |
-
|
116 |
-
|
117 |
-
|
118 |
-
|
119 |
-
|
120 |
-
|
121 |
-
|
122 |
-
|
123 |
-
|
124 |
-
|
125 |
-
|
126 |
-
|
127 |
-
|
128 |
-
|
129 |
-
|
130 |
-
|
131 |
-
|
132 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
133 |
)
|
134 |
-
|
135 |
-
|
136 |
-
|
137 |
-
|
138 |
-
|
139 |
-
|
140 |
-
|
141 |
-
"Code Execution": "code",
|
142 |
-
"Search Mode": "search"
|
143 |
-
}
|
144 |
-
return generate_response(user_input, mode_map.get(mode, "default"))
|
145 |
-
|
146 |
-
submit_btn.click(
|
147 |
-
fn=process_input,
|
148 |
-
inputs=[input_box, mode_radio],
|
149 |
-
outputs=output_box
|
150 |
)
|
151 |
-
|
152 |
-
|
153 |
-
|
154 |
-
|
155 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
156 |
)
|
157 |
|
|
|
|
|
|
|
|
|
|
|
|
|
158 |
if __name__ == "__main__":
|
159 |
-
|
|
|
|
|
|
|
|
|
|
1 |
+
# -*- coding: utf-8 -*-
|
2 |
import os
|
3 |
import gradio as gr
|
4 |
from google import genai
|
5 |
+
# Make sure to import necessary types from the SDK
|
6 |
+
from google.generative_ai import types
|
7 |
+
from google.generative_ai.types import HarmCategory, HarmBlockThreshold # For safety settings
|
8 |
import requests
|
9 |
import markdownify
|
10 |
from urllib.robotparser import RobotFileParser
|
11 |
from urllib.parse import urlparse
|
12 |
+
import traceback
|
13 |
+
import json # Although not directly used in the final code, useful for debugging args
|
14 |
|
15 |
+
# --- Browser/Web Tool Functions ---
|
|
|
|
|
16 |
|
17 |
+
def can_crawl_url(url: str, user_agent: str = "PythonGoogleGenAIAgent/1.0") -> bool:
|
|
|
18 |
"""Check robots.txt permissions for a URL"""
|
19 |
+
# Use a more specific user agent, but '*' is a fallback
|
20 |
+
if not url:
|
21 |
+
print("No URL provided to can_crawl_url")
|
22 |
+
return False
|
23 |
try:
|
24 |
parsed_url = urlparse(url)
|
25 |
+
if not parsed_url.scheme or not parsed_url.netloc:
|
26 |
+
print(f"Invalid URL format for robots.txt check: {url}")
|
27 |
+
return False # Cannot determine robots.txt location
|
28 |
+
|
29 |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
|
30 |
+
print(f"Checking robots.txt at: {robots_url} for URL: {url}")
|
31 |
+
|
32 |
+
# Using RobotFileParser's default opener which handles redirects
|
33 |
+
rp = RobotFileParser()
|
34 |
+
rp.set_url(robots_url)
|
35 |
rp.read()
|
36 |
+
can_fetch = rp.can_fetch(user_agent, url)
|
37 |
+
print(f"Can fetch {url} with agent '{user_agent}': {can_fetch}")
|
38 |
+
return can_fetch
|
39 |
except Exception as e:
|
40 |
+
print(f"Error checking robots.txt for {url}: {e}")
|
41 |
+
# Default to false if unsure, to be polite to servers
|
42 |
return False
|
43 |
|
44 |
def load_page(url: str) -> str:
|
45 |
+
"""
|
46 |
+
Load webpage content as markdown. Designed to be used as a Gemini Function.
|
47 |
+
Args:
|
48 |
+
url: The URL of the webpage to load.
|
49 |
+
Returns:
|
50 |
+
Markdown content of the page or an error message.
|
51 |
+
"""
|
52 |
+
print(f"Attempting to load page: {url}")
|
53 |
+
if not url:
|
54 |
+
return "Error: No URL provided."
|
55 |
+
if not url.startswith(('http://', 'https://')):
|
56 |
+
return f"Error: Invalid URL scheme. Please provide http or https URL. Got: {url}"
|
57 |
+
|
58 |
+
USER_AGENT = "PythonGoogleGenAIAgent/1.0 (Function Calling)" # Be identifiable
|
59 |
+
if not can_crawl_url(url, user_agent=USER_AGENT):
|
60 |
+
print(f"URL {url} failed robots.txt check for agent {USER_AGENT}")
|
61 |
+
return f"Error: Access denied by robots.txt for URL {url}"
|
62 |
try:
|
63 |
+
headers = {'User-Agent': USER_AGENT}
|
64 |
+
response = requests.get(url, timeout=15, headers=headers, allow_redirects=True)
|
65 |
+
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
|
66 |
+
|
67 |
+
# Check content type - try to only process HTML
|
68 |
+
content_type = response.headers.get('content-type', '').lower()
|
69 |
+
if 'html' not in content_type:
|
70 |
+
print(f"Non-HTML content type '{content_type}' at {url}. Returning summary.")
|
71 |
+
# Return limited info for non-html types
|
72 |
+
return f"Content at {url} is of type '{content_type}'. Size: {len(response.content)} bytes. Cannot convert to Markdown."
|
73 |
+
|
74 |
+
# Limit content size before markdown conversion to avoid excessive memory/CPU
|
75 |
+
MAX_CONTENT_SIZE = 1_000_000 # 1MB limit
|
76 |
+
if len(response.content) > MAX_CONTENT_SIZE:
|
77 |
+
print(f"Content size {len(response.content)} exceeds limit {MAX_CONTENT_SIZE}. Truncating.")
|
78 |
+
# Decode potentially large content carefully
|
79 |
+
try:
|
80 |
+
html_content = response.content[:MAX_CONTENT_SIZE].decode(response.apparent_encoding or 'utf-8', errors='ignore')
|
81 |
+
except Exception as decode_err:
|
82 |
+
print(f"Decoding error after truncation: {decode_err}. Falling back to utf-8 ignore.")
|
83 |
+
html_content = response.content[:MAX_CONTENT_SIZE].decode('utf-8', errors='ignore')
|
84 |
+
truncated_msg = "\n\n[Content truncated due to size limit]"
|
85 |
+
else:
|
86 |
+
html_content = response.text # Use response.text which handles encoding better for smaller content
|
87 |
+
truncated_msg = ""
|
88 |
+
|
89 |
+
# Convert to Markdown
|
90 |
+
# Added heading_style for potentially better formatting
|
91 |
+
markdown_content = markdownify.markdownify(html_content, heading_style="ATX", strip=['script', 'style'], escape_underscores=False)
|
92 |
+
|
93 |
+
# Simple cleaning (optional, can be expanded)
|
94 |
+
markdown_content = '\n'.join([line.strip() for line in markdown_content.splitlines() if line.strip()])
|
95 |
+
|
96 |
+
print(f"Successfully loaded and converted {url} to markdown.")
|
97 |
+
# Add URL source attribution
|
98 |
+
return f"Content from {url}:\n\n" + markdown_content + truncated_msg
|
99 |
+
|
100 |
+
except requests.exceptions.Timeout:
|
101 |
+
print(f"Timeout error loading page: {url}")
|
102 |
+
return f"Error: Timeout while trying to load {url}"
|
103 |
+
except requests.exceptions.RequestException as e:
|
104 |
+
print(f"Request error loading page {url}: {str(e)}")
|
105 |
+
return f"Error loading page {url}: {str(e)}"
|
106 |
except Exception as e:
|
107 |
+
print(f"General error loading page {url}: {str(e)}")
|
108 |
+
traceback.print_exc() # Print full traceback for debugging
|
109 |
+
return f"Error loading page {url}: An unexpected error occurred ({type(e).__name__})."
|
110 |
+
|
111 |
+
|
112 |
+
# --- Gemini Client Initialization and Configuration ---
|
113 |
+
try:
|
114 |
+
api_key = os.environ.get("GEMINI_API_KEY")
|
115 |
+
if not api_key:
|
116 |
+
raise ValueError("GEMINI_API_KEY environment variable not set.")
|
117 |
+
genai.configure(api_key=api_key)
|
118 |
+
|
119 |
+
# *** Use the requested experimental model ***
|
120 |
+
MODEL_NAME = "gemini-2.5-pro-exp-03-25"
|
121 |
+
print(f"Attempting to use EXPERIMENTAL model: {MODEL_NAME}")
|
122 |
+
|
123 |
+
# Define the browse tool using FunctionDeclaration
|
124 |
+
browse_tool = types.Tool(
|
125 |
+
function_declarations=[
|
126 |
+
types.FunctionDeclaration(
|
127 |
+
name='load_page',
|
128 |
+
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
129 |
+
parameters=types.Schema(
|
130 |
+
type=types.Type.OBJECT,
|
131 |
+
properties={
|
132 |
+
'url': types.Schema(type=types.Type.STRING, description="The *full* URL of the webpage to load (must start with http:// or https://).")
|
133 |
+
},
|
134 |
+
required=['url']
|
135 |
+
)
|
136 |
+
)
|
137 |
+
]
|
138 |
+
)
|
139 |
+
# Define the code execution tool
|
140 |
+
# Enables the model to suggest and potentially execute Python code.
|
141 |
+
code_execution_tool = types.Tool(code_execution=types.ToolCodeExecution())
|
142 |
+
|
143 |
+
# Combine tools that the model can use
|
144 |
+
tools = [browse_tool, code_execution_tool]
|
145 |
+
|
146 |
+
# Create the model instance
|
147 |
+
model = genai.GenerativeModel(
|
148 |
+
model_name=MODEL_NAME,
|
149 |
+
tools=tools,
|
150 |
+
# Relax safety settings slightly *if needed* for code/complex generation,
|
151 |
+
# but be aware of the implications. BLOCK_NONE is risky. Use with caution.
|
152 |
+
# Consider BLOCK_LOW_AND_ABOVE or MEDIUM as safer alternatives.
|
153 |
+
safety_settings={
|
154 |
+
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
155 |
+
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
156 |
+
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
157 |
+
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
158 |
+
# Adjust specific categories if you face frequent blocking for safe content.
|
159 |
+
},
|
160 |
+
# System instruction (optional but recommended for setting context)
|
161 |
+
system_instruction="You are a helpful AI assistant called Gemini-Toolkit. You can browse specific web pages provided by the user via the 'load_page' tool. You can also execute Python code using the 'code_execution' tool to perform calculations, analyze data, or demonstrate programming concepts. Explain your reasoning and the steps you take. If asked to browse, confirm the URL you are accessing. If providing code, explain what it does.",
|
162 |
+
)
|
163 |
+
print(f"Gemini client initialized with model: {MODEL_NAME} and tools.")
|
164 |
+
|
165 |
+
except Exception as e:
|
166 |
+
print(f"CRITICAL ERROR: Error initializing Gemini client: {e}")
|
167 |
+
traceback.print_exc()
|
168 |
+
# Provide a fallback model or handle the error gracefully in the UI
|
169 |
+
model = None
|
170 |
+
tools = []
|
171 |
+
# Consider exiting if the core functionality is unavailable
|
172 |
+
# raise SystemExit("Failed to initialize core Gemini model.") from e
|
173 |
+
|
174 |
+
# --- Gradio App Logic ---
|
175 |
+
|
176 |
+
def handle_function_call(function_call):
|
177 |
+
"""Executes the function call requested by the model."""
|
178 |
+
function_name = function_call.name
|
179 |
+
args = function_call.args # This is now a dict-like object
|
180 |
+
|
181 |
+
print(f"Executing Function Call: {function_name} with args: {dict(args)}") # Log args
|
182 |
|
|
|
|
|
183 |
try:
|
184 |
+
if function_name == 'load_page':
|
185 |
+
url = args.get('url')
|
186 |
+
if url:
|
187 |
+
# Execute the actual function
|
188 |
+
function_response_content = load_page(url=url)
|
189 |
+
# Limit response size to send back to Gemini
|
190 |
+
MAX_RESPONSE_LEN = 50000 # Limit characters sent back
|
191 |
+
if len(function_response_content) > MAX_RESPONSE_LEN:
|
192 |
+
print(f"Tool Response truncated from {len(function_response_content)} to {MAX_RESPONSE_LEN} chars.")
|
193 |
+
function_response_content = function_response_content[:MAX_RESPONSE_LEN] + "\n\n[... Tool Response Truncated Due to Size Limit ...]"
|
194 |
+
else:
|
195 |
+
function_response_content = "Error: URL parameter was missing in the function call. Please ensure the 'url' argument is provided."
|
196 |
+
else:
|
197 |
+
# Should not happen if tools are defined correctly and model uses them
|
198 |
+
print(f"Error: Received call for unknown function '{function_name}'")
|
199 |
+
function_response_content = f"Error: Unknown function '{function_name}' called by the model."
|
200 |
+
|
201 |
+
# Create the FunctionResponse part to send back to the model
|
202 |
+
# API expects the response arg to be a dict, typically {'content': <result>}
|
203 |
+
function_response_part = types.Part(
|
204 |
+
function_response=types.FunctionResponse(
|
205 |
+
name=function_name,
|
206 |
+
response={'content': function_response_content}
|
207 |
+
)
|
208 |
+
)
|
209 |
+
print(f"Function Response generated for {function_name}")
|
210 |
+
return function_response_part
|
211 |
+
|
212 |
except Exception as e:
|
213 |
+
print(f"Error during execution of function '{function_name}': {e}")
|
214 |
+
traceback.print_exc()
|
215 |
+
# Return an error message back to the model
|
216 |
+
return types.Part(
|
217 |
+
function_response=types.FunctionResponse(
|
218 |
+
name=function_name,
|
219 |
+
response={'error': f"Failed to execute function {function_name}: {str(e)}"}
|
220 |
+
)
|
221 |
+
)
|
222 |
+
|
223 |
+
def generate_response_with_tools(user_input, history_state):
|
224 |
+
"""Handles user input, interacts with Gemini (incl. tools), and manages history."""
|
225 |
+
if not model:
|
226 |
+
# Handle case where model initialization failed
|
227 |
+
return "Error: The AI model (Gemini) could not be initialized. Please check the logs or API key configuration.", history_state or []
|
228 |
+
|
229 |
+
if not user_input.strip():
|
230 |
+
# Return immediately if input is empty, don't update history
|
231 |
+
# Let the UI handle showing this message without clearing history state
|
232 |
+
# For chatbot, we might just not send anything or return a specific tuple
|
233 |
+
# Returning just a message for the chatbot display:
|
234 |
+
return [[None, "Please enter a valid query."]], history_state or []
|
235 |
+
|
236 |
+
|
237 |
+
# --- History Management ---
|
238 |
+
# Load history from state (should be list of Content objects)
|
239 |
+
# Initialize if state is None or empty
|
240 |
+
conversation_history = history_state if isinstance(history_state, list) else []
|
241 |
+
|
242 |
+
# Append the user's new message to the history
|
243 |
+
conversation_history.append(types.Content(role="user", parts=[types.Part.from_text(user_input)]))
|
244 |
+
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
245 |
+
|
246 |
+
# Limit history length *before* sending to API to avoid excessive token usage/cost
|
247 |
+
# Keep the system instruction + last N turns. A turn = user msg + model response (potentially with tool calls/responses)
|
248 |
+
MAX_HISTORY_TURNS = 10
|
249 |
+
max_history_items = MAX_HISTORY_TURNS * 2 + (1 if conversation_history and conversation_history[0].role == "system" else 0) # Approx items to keep
|
250 |
+
|
251 |
+
if len(conversation_history) > max_history_items:
|
252 |
+
print(f"Trimming conversation history from {len(conversation_history)} items to ~{max_history_items}")
|
253 |
+
if conversation_history[0].role == "system":
|
254 |
+
# Keep system instruction and the latest items
|
255 |
+
conversation_history = [conversation_history[0]] + conversation_history[-(max_history_items-1):]
|
256 |
+
else:
|
257 |
+
# Just keep the latest items
|
258 |
+
conversation_history = conversation_history[-max_history_items:]
|
259 |
+
|
260 |
+
|
261 |
+
# --- Interaction Loop (for potential tool calls) ---
|
262 |
+
MAX_TOOL_LOOPS = 5 # Prevent infinite loops if the model keeps calling tools without finishing
|
263 |
+
loop_count = 0
|
264 |
+
current_history_for_api = list(conversation_history) # Work with a copy in the loop
|
265 |
|
|
|
266 |
try:
|
267 |
+
while loop_count < MAX_TOOL_LOOPS:
|
268 |
+
loop_count += 1
|
269 |
+
print(f"Generation loop {loop_count}/{MAX_TOOL_LOOPS}...")
|
270 |
+
|
271 |
+
# Send context and query to Gemini
|
272 |
+
# Use the potentially trimmed history for this API call
|
273 |
+
response = model.generate_content(
|
274 |
+
current_history_for_api,
|
275 |
+
request_options={"timeout": 120}, # Increase timeout for complex/tool calls
|
276 |
+
# generation_config=genai.types.GenerationConfig( # If you need temperature etc.
|
277 |
+
# temperature=0.7
|
278 |
+
# )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
279 |
)
|
280 |
+
|
281 |
+
# --- Process Response Candidate ---
|
282 |
+
if not response.candidates:
|
283 |
+
print("Warning: No candidates received from Gemini.")
|
284 |
+
# Append a message indicating no response
|
285 |
+
final_bot_message = "[No response generated by the model.]"
|
286 |
+
current_history_for_api.append(types.Content(role="model", parts=[types.Part.from_text(final_bot_message)]))
|
287 |
+
break # Exit loop
|
288 |
+
|
289 |
+
candidate = response.candidates[0]
|
290 |
+
|
291 |
+
# Check for safety blocks or finish reasons other than STOP or TOOL use
|
292 |
+
if candidate.finish_reason not in (types.Candidate.FinishReason.STOP, types.Candidate.FinishReason.TOOL_CALL):
|
293 |
+
print(f"Warning: Generation stopped unexpectedly. Reason: {candidate.finish_reason.name}")
|
294 |
+
# Append the reason to the conversation for context, if desired
|
295 |
+
stop_reason_msg = f"[Model stopped generating. Reason: {candidate.finish_reason.name}]"
|
296 |
+
# Check if there's any text content before adding the stop reason
|
297 |
+
if candidate.content and candidate.content.parts and any(p.text for p in candidate.content.parts):
|
298 |
+
current_history_for_api.append(candidate.content) # Add what content there was
|
299 |
+
# Extract text to display if needed, before adding stop reason
|
300 |
+
final_bot_message = "".join([p.text for p in candidate.content.parts if p.text]) + f"\n{stop_reason_msg}"
|
301 |
+
else:
|
302 |
+
# No text, just add the stop reason message as the model turn
|
303 |
+
final_bot_message = stop_reason_msg
|
304 |
+
current_history_for_api.append(types.Content(role="model", parts=[types.Part.from_text(final_bot_message)]))
|
305 |
+
break # Exit loop
|
306 |
+
|
307 |
+
# --- Handle Potential Tool Call ---
|
308 |
+
has_tool_call = candidate.finish_reason == types.Candidate.FinishReason.TOOL_CALL
|
309 |
+
|
310 |
+
# Append the model's response (which might contain text and/or tool calls) to history *before* execution
|
311 |
+
# The API expects the model's turn asking for the tool first.
|
312 |
+
current_history_for_api.append(candidate.content)
|
313 |
+
|
314 |
+
if has_tool_call:
|
315 |
+
print("Tool call requested by model.")
|
316 |
+
tool_calls_to_process = [part.function_call for part in candidate.content.parts if part.function_call]
|
317 |
+
|
318 |
+
if not tool_calls_to_process:
|
319 |
+
print("Warning: Model indicated TOOL_CALL finish reason but no function_call part found.")
|
320 |
+
# Maybe append an error message? Or just break?
|
321 |
+
# Let's try to continue, maybe there's text output.
|
322 |
+
final_bot_message = "".join([p.text for p in candidate.content.parts if p.text])
|
323 |
+
if not final_bot_message:
|
324 |
+
final_bot_message = "[Model indicated tool use but provided no details or text.]"
|
325 |
+
break # Exit loop as we can't proceed with tool call
|
326 |
+
|
327 |
+
# Execute the function(s) and get responses
|
328 |
+
tool_responses = []
|
329 |
+
for function_call in tool_calls_to_process:
|
330 |
+
function_response_part = handle_function_call(function_call)
|
331 |
+
tool_responses.append(function_response_part)
|
332 |
+
|
333 |
+
# Add the tool execution results to history for the *next* API call
|
334 |
+
current_history_for_api.append(types.Content(role="tool", parts=tool_responses)) # Use role="tool"
|
335 |
+
print("Added tool response(s) to history. Continuing loop...")
|
336 |
+
continue # Go back to the start of the while loop to call the API again
|
337 |
+
|
338 |
+
else:
|
339 |
+
# No tool call, this is the final response from the model
|
340 |
+
print("No tool call requested. Final response received.")
|
341 |
+
final_bot_message = "".join([part.text for part in candidate.content.parts if part.text])
|
342 |
+
|
343 |
+
# Also check for code execution *suggestions* or *results* in the final turn
|
344 |
+
code_parts_display = []
|
345 |
+
for part in candidate.content.parts:
|
346 |
+
if part.executable_code:
|
347 |
+
lang = part.executable_code.language.name.lower() if part.executable_code.language else "python"
|
348 |
+
code = part.executable_code.code
|
349 |
+
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
|
350 |
+
elif part.code_execution_result:
|
351 |
+
outcome_str = "Success" if part.code_execution_result.outcome == part.code_execution_result.Outcome.OK else "Failure"
|
352 |
+
code_parts_display.append(f"Code Execution Result ({outcome_str}):\n```\n{part.code_execution_result.output}\n```")
|
353 |
+
|
354 |
+
if code_parts_display:
|
355 |
+
final_bot_message += "\n\n" + "\n\n".join(code_parts_display)
|
356 |
+
|
357 |
+
# Handle empty final message case
|
358 |
+
if not final_bot_message.strip():
|
359 |
+
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
360 |
+
|
361 |
+
break # Exit the while loop
|
362 |
+
|
363 |
+
# End of while loop
|
364 |
+
if loop_count >= MAX_TOOL_LOOPS:
|
365 |
+
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
366 |
+
final_bot_message = (final_bot_message + "\n\n" if final_bot_message else "") + f"[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
367 |
+
# Ensure the last model message is added even if loop limit reached
|
368 |
+
if current_history_for_api[-1].role != "model":
|
369 |
+
current_history_for_api.append(types.Content(role="model", parts=[types.Part.from_text(final_bot_message)]))
|
370 |
+
|
371 |
+
|
372 |
+
print("--- Response Generation Complete ---")
|
373 |
+
# Update the main history state with the final state of the conversation
|
374 |
+
# We return the *final* bot message text for display, and the *full* history state
|
375 |
+
# The chatbot UI needs [[user, bot], [user, bot], ...] format
|
376 |
+
# Create the Gradio chatbot display format from our history
|
377 |
+
chatbot_display_list = []
|
378 |
+
user_msg = None
|
379 |
+
for i, content in enumerate(current_history_for_api):
|
380 |
+
# Skip system instruction for display
|
381 |
+
if content.role == "system": continue
|
382 |
+
# Combine multi-part messages for display
|
383 |
+
msg_text = ""
|
384 |
+
for part in content.parts:
|
385 |
+
if part.text:
|
386 |
+
msg_text += part.text + "\n"
|
387 |
+
# Display code suggestions nicely
|
388 |
+
elif part.executable_code:
|
389 |
+
lang = part.executable_code.language.name.lower() if part.executable_code.language else "python"
|
390 |
+
code = part.executable_code.code
|
391 |
+
msg_text += f"\nSuggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
|
392 |
+
# We don't display tool calls/responses directly in chat bubbles usually
|
393 |
+
# elif part.function_call: msg_text += f"[Requesting tool: {part.function_call.name}]\n"
|
394 |
+
# elif part.function_response: msg_text += f"[Tool response received for {part.function_response.name}]\n"
|
395 |
+
elif part.code_execution_result:
|
396 |
+
outcome_str = "Success" if part.code_execution_result.outcome == part.code_execution_result.Outcome.OK else "Failure"
|
397 |
+
msg_text += f"\nCode Execution Result ({outcome_str}):\n```\n{part.code_execution_result.output}\n```\n"
|
398 |
+
|
399 |
+
msg_text = msg_text.strip()
|
400 |
+
if not msg_text: continue # Skip empty parts/turns
|
401 |
+
|
402 |
+
if content.role == "user":
|
403 |
+
# If there was a pending user message, start a new pair
|
404 |
+
user_msg = msg_text
|
405 |
+
# Append None temporarily for the bot response, it will be filled if available
|
406 |
+
chatbot_display_list.append([user_msg, None])
|
407 |
+
elif content.role == "model":
|
408 |
+
if chatbot_display_list and chatbot_display_list[-1][1] is None:
|
409 |
+
# Fill in the bot response for the last user message
|
410 |
+
chatbot_display_list[-1][1] = msg_text
|
411 |
+
else:
|
412 |
+
# Model message without a preceding user message (unlikely here, but handle)
|
413 |
+
# Or potentially consecutive model messages after tool use. Append as separate bot message.
|
414 |
+
chatbot_display_list.append([None, msg_text])
|
415 |
+
user_msg = None # Reset pending user message
|
416 |
+
|
417 |
+
# Ensure the very last bot message is captured if the loop ended correctly
|
418 |
+
# This logic might be redundant if the history appending handles it correctly
|
419 |
+
# Let's rely on history build up and the formatting loop above.
|
420 |
+
|
421 |
+
return chatbot_display_list, current_history_for_api # Return display list and history state
|
422 |
+
|
423 |
except Exception as e:
|
424 |
+
print(f"ERROR during Gemini generation or tool processing: {str(e)}")
|
425 |
+
traceback.print_exc()
|
426 |
+
error_message = f"An error occurred while processing your request: {str(e)}"
|
427 |
+
# Return error in chatbot format and the history state *before* the error
|
428 |
+
chatbot_error_display = [[None, error_message]]
|
429 |
+
# Try to get the display history before error if possible
|
430 |
+
if 'current_history_for_api' in locals():
|
431 |
+
# Rebuild display list up to the point before error for continuity
|
432 |
+
# (This is simplified, full rebuild might be complex)
|
433 |
+
existing_display = []
|
434 |
+
for c in current_history_for_api[:-1]: # Exclude potentially problematic last addition
|
435 |
+
if c.role == "user": existing_display.append([c.parts[0].text, None])
|
436 |
+
elif c.role == "model" and existing_display and existing_display[-1][1] is None:
|
437 |
+
existing_display[-1][1] = "".join([p.text for p in c.parts if p.text])
|
438 |
+
existing_display.append([None, error_message]) # Add error message at end
|
439 |
+
chatbot_error_display = existing_display
|
440 |
+
|
441 |
+
|
442 |
+
# Return the history *before* this failed turn started
|
443 |
+
return chatbot_error_display, conversation_history # Revert state to before this turn
|
444 |
+
|
445 |
+
|
446 |
+
# --- Gradio Interface ---
|
447 |
+
|
448 |
+
with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as demo:
|
449 |
+
gr.Markdown(f"# 🚀 Gemini AI Assistant ({MODEL_NAME})")
|
450 |
+
gr.Markdown("Ask questions, request info from specific URLs, or ask for code/calculations. Uses function calling and code execution.")
|
451 |
+
|
452 |
+
# Chatbot component to display conversation
|
453 |
+
chatbot_display = gr.Chatbot(
|
454 |
+
label="Conversation",
|
455 |
+
bubble_full_width=False,
|
456 |
+
height=600, # Increased height
|
457 |
+
show_copy_button=True,
|
458 |
+
render_markdown=True # Ensure markdown inc code blocks is rendered
|
459 |
)
|
460 |
+
|
461 |
+
# Textbox for user input
|
462 |
+
msg_input = gr.Textbox(
|
463 |
+
label="Your Query",
|
464 |
+
placeholder="Ask anything... (e.g., 'Summarize example.com', 'Calculate 2^64', 'Write python code to list files')",
|
465 |
+
lines=3, # Start with more lines
|
466 |
+
scale=4 # Take more horizontal space
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
467 |
)
|
468 |
+
|
469 |
+
# Use ClearButton which handles multiple components
|
470 |
+
clear_btn = gr.ClearButton(value="🗑️ Clear Chat")
|
471 |
+
|
472 |
+
# Submit button (using default value seems fine)
|
473 |
+
send_btn = gr.Button("➡️ Send", variant="primary", scale=1)
|
474 |
+
|
475 |
+
|
476 |
+
# Hidden state to store the raw conversation history (list of genai.types.Content)
|
477 |
+
chat_history_state = gr.State([])
|
478 |
+
|
479 |
+
def user_message_update(user_message, history_display_list):
|
480 |
+
"""Appends the user's message to the display list and clears the input."""
|
481 |
+
if not user_message.strip(): # Avoid adding empty messages
|
482 |
+
return gr.update(value=""), history_display_list # Clear input, return unchanged history display
|
483 |
+
# Append user message with None placeholder for bot response
|
484 |
+
return gr.update(value=""), history_display_list + [[user_message, None]]
|
485 |
+
|
486 |
+
def bot_response_update(history_display_list, history_state):
|
487 |
+
"""Calls the backend Gemini function and updates display/state."""
|
488 |
+
if not history_display_list or history_display_list[-1][0] is None:
|
489 |
+
# Should not happen if user_message_update ran first, but safeguard
|
490 |
+
print("Warning: bot_response_update called without preceding user message in display.")
|
491 |
+
# Return unchanged display, maybe signal error? For now, just return current state.
|
492 |
+
return history_display_list, history_state
|
493 |
+
|
494 |
+
user_message = history_display_list[-1][0] # Get the last user message from display list
|
495 |
+
print(f"User message being sent to backend: {user_message}")
|
496 |
+
|
497 |
+
# Call the main Gemini interaction function
|
498 |
+
# It now returns the *entire* chat history for display, and the updated state
|
499 |
+
updated_display_list, updated_history_state = generate_response_with_tools(user_message, history_state)
|
500 |
+
|
501 |
+
# The backend function now returns the full display list
|
502 |
+
# Update the state variable directly
|
503 |
+
return updated_display_list, updated_history_state
|
504 |
+
|
505 |
+
# Define the action for sending a message (Enter key in Textbox)
|
506 |
+
msg_input.submit(
|
507 |
+
user_message_update, # 1. Update display with user msg, clear input
|
508 |
+
[msg_input, chatbot_display],
|
509 |
+
[msg_input, chatbot_display],
|
510 |
+
queue=False, # Run immediately UI update
|
511 |
+
).then(
|
512 |
+
bot_response_update, # 2. Call backend, get full display list & new state
|
513 |
+
[chatbot_display, chat_history_state], # Pass current display (for last msg) & state
|
514 |
+
[chatbot_display, chat_history_state] # Update display & state from backend return
|
515 |
+
)
|
516 |
+
|
517 |
+
# Define the action for clicking the Send button
|
518 |
+
send_btn.click(
|
519 |
+
user_message_update,
|
520 |
+
[msg_input, chatbot_display],
|
521 |
+
[msg_input, chatbot_display],
|
522 |
+
queue=False,
|
523 |
+
).then(
|
524 |
+
bot_response_update,
|
525 |
+
[chatbot_display, chat_history_state],
|
526 |
+
[chatbot_display, chat_history_state]
|
527 |
)
|
528 |
|
529 |
+
# Setup the ClearButton to target the necessary components, including the state
|
530 |
+
clear_btn.add(components=[msg_input, chatbot_display, chat_history_state])
|
531 |
+
# The ClearButton itself doesn't need a custom function when using .add()
|
532 |
+
# It will set components to their default/initial values (Textbox="", Chatbot=None, State=[])
|
533 |
+
|
534 |
+
|
535 |
if __name__ == "__main__":
|
536 |
+
print("Starting Gradio App...")
|
537 |
+
# Enable queue for handling potentially long API calls/tool executions
|
538 |
+
# Set share=True to get a public link (remove if only running locally)
|
539 |
+
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
|
540 |
+
print("Gradio App Stopped.")
|