Update app.py
Browse files
app.py
CHANGED
@@ -1,11 +1,14 @@
|
|
1 |
# -*- coding: utf-8 -*-
|
2 |
import os
|
3 |
import gradio as gr
|
|
|
4 |
import google.generativeai as genai
|
5 |
-
|
6 |
-
|
7 |
-
|
8 |
-
from google.
|
|
|
|
|
9 |
import requests
|
10 |
import markdownify
|
11 |
from urllib.robotparser import RobotFileParser
|
@@ -17,7 +20,6 @@ import json # Although not directly used in the final code, useful for debugging
|
|
17 |
|
18 |
def can_crawl_url(url: str, user_agent: str = "PythonGoogleGenAIAgent/1.0") -> bool:
|
19 |
"""Check robots.txt permissions for a URL"""
|
20 |
-
# Use a more specific user agent, but '*' is a fallback
|
21 |
if not url:
|
22 |
print("No URL provided to can_crawl_url")
|
23 |
return False
|
@@ -25,12 +27,9 @@ def can_crawl_url(url: str, user_agent: str = "PythonGoogleGenAIAgent/1.0") -> b
|
|
25 |
parsed_url = urlparse(url)
|
26 |
if not parsed_url.scheme or not parsed_url.netloc:
|
27 |
print(f"Invalid URL format for robots.txt check: {url}")
|
28 |
-
return False
|
29 |
-
|
30 |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
|
31 |
print(f"Checking robots.txt at: {robots_url} for URL: {url}")
|
32 |
-
|
33 |
-
# Using RobotFileParser's default opener which handles redirects
|
34 |
rp = RobotFileParser()
|
35 |
rp.set_url(robots_url)
|
36 |
rp.read()
|
@@ -39,7 +38,6 @@ def can_crawl_url(url: str, user_agent: str = "PythonGoogleGenAIAgent/1.0") -> b
|
|
39 |
return can_fetch
|
40 |
except Exception as e:
|
41 |
print(f"Error checking robots.txt for {url}: {e}")
|
42 |
-
# Default to false if unsure, to be polite to servers
|
43 |
return False
|
44 |
|
45 |
def load_page(url: str) -> str:
|
@@ -56,27 +54,22 @@ def load_page(url: str) -> str:
|
|
56 |
if not url.startswith(('http://', 'https://')):
|
57 |
return f"Error: Invalid URL scheme. Please provide http or https URL. Got: {url}"
|
58 |
|
59 |
-
USER_AGENT = "PythonGoogleGenAIAgent/1.0 (Function Calling)"
|
60 |
if not can_crawl_url(url, user_agent=USER_AGENT):
|
61 |
print(f"URL {url} failed robots.txt check for agent {USER_AGENT}")
|
62 |
return f"Error: Access denied by robots.txt for URL {url}"
|
63 |
try:
|
64 |
headers = {'User-Agent': USER_AGENT}
|
65 |
response = requests.get(url, timeout=15, headers=headers, allow_redirects=True)
|
66 |
-
response.raise_for_status()
|
67 |
-
|
68 |
-
# Check content type - try to only process HTML
|
69 |
content_type = response.headers.get('content-type', '').lower()
|
70 |
if 'html' not in content_type:
|
71 |
print(f"Non-HTML content type '{content_type}' at {url}. Returning summary.")
|
72 |
-
# Return limited info for non-html types
|
73 |
return f"Content at {url} is of type '{content_type}'. Size: {len(response.content)} bytes. Cannot convert to Markdown."
|
74 |
|
75 |
-
|
76 |
-
MAX_CONTENT_SIZE = 1_000_000 # 1MB limit
|
77 |
if len(response.content) > MAX_CONTENT_SIZE:
|
78 |
print(f"Content size {len(response.content)} exceeds limit {MAX_CONTENT_SIZE}. Truncating.")
|
79 |
-
# Decode potentially large content carefully
|
80 |
try:
|
81 |
html_content = response.content[:MAX_CONTENT_SIZE].decode(response.apparent_encoding or 'utf-8', errors='ignore')
|
82 |
except Exception as decode_err:
|
@@ -84,18 +77,12 @@ def load_page(url: str) -> str:
|
|
84 |
html_content = response.content[:MAX_CONTENT_SIZE].decode('utf-8', errors='ignore')
|
85 |
truncated_msg = "\n\n[Content truncated due to size limit]"
|
86 |
else:
|
87 |
-
html_content = response.text
|
88 |
truncated_msg = ""
|
89 |
|
90 |
-
# Convert to Markdown
|
91 |
-
# Added heading_style for potentially better formatting
|
92 |
markdown_content = markdownify.markdownify(html_content, heading_style="ATX", strip=['script', 'style'], escape_underscores=False)
|
93 |
-
|
94 |
-
# Simple cleaning (optional, can be expanded)
|
95 |
markdown_content = '\n'.join([line.strip() for line in markdown_content.splitlines() if line.strip()])
|
96 |
-
|
97 |
print(f"Successfully loaded and converted {url} to markdown.")
|
98 |
-
# Add URL source attribution
|
99 |
return f"Content from {url}:\n\n" + markdown_content + truncated_msg
|
100 |
|
101 |
except requests.exceptions.Timeout:
|
@@ -106,7 +93,7 @@ def load_page(url: str) -> str:
|
|
106 |
return f"Error loading page {url}: {str(e)}"
|
107 |
except Exception as e:
|
108 |
print(f"General error loading page {url}: {str(e)}")
|
109 |
-
traceback.print_exc()
|
110 |
return f"Error loading page {url}: An unexpected error occurred ({type(e).__name__})."
|
111 |
|
112 |
|
@@ -115,50 +102,43 @@ try:
|
|
115 |
api_key = os.environ.get("GEMINI_API_KEY")
|
116 |
if not api_key:
|
117 |
raise ValueError("GEMINI_API_KEY environment variable not set.")
|
|
|
118 |
genai.configure(api_key=api_key)
|
119 |
|
120 |
-
# *** Use the requested experimental model ***
|
121 |
MODEL_NAME = "gemini-2.5-pro-exp-03-25"
|
122 |
print(f"Attempting to use EXPERIMENTAL model: {MODEL_NAME}")
|
123 |
|
124 |
-
# Define
|
125 |
-
browse_tool = types.Tool(
|
126 |
function_declarations=[
|
127 |
-
types.FunctionDeclaration(
|
128 |
name='load_page',
|
129 |
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
130 |
-
parameters=types.Schema(
|
131 |
-
type=types.Type.OBJECT,
|
132 |
properties={
|
133 |
-
'url': types.Schema(type=types.Type.STRING, description="The *full* URL of the webpage to load (must start with http:// or https://).")
|
134 |
},
|
135 |
required=['url']
|
136 |
)
|
137 |
)
|
138 |
]
|
139 |
)
|
140 |
-
|
141 |
-
# Enables the model to suggest and potentially execute Python code.
|
142 |
-
code_execution_tool = types.Tool(code_execution=types.ToolCodeExecution())
|
143 |
|
144 |
-
# Combine tools that the model can use
|
145 |
tools = [browse_tool, code_execution_tool]
|
146 |
|
147 |
-
# Create the model instance
|
148 |
model = genai.GenerativeModel(
|
149 |
model_name=MODEL_NAME,
|
150 |
tools=tools,
|
151 |
-
# Relax safety settings slightly *if needed* for code/complex generation,
|
152 |
-
# but be aware of the implications. BLOCK_NONE is risky. Use with caution.
|
153 |
-
# Consider BLOCK_LOW_AND_ABOVE or MEDIUM as safer alternatives.
|
154 |
safety_settings={
|
155 |
-
HarmCategory
|
156 |
-
HarmCategory.
|
157 |
-
HarmCategory.
|
158 |
-
HarmCategory.
|
159 |
-
|
160 |
},
|
161 |
-
# System instruction (optional but recommended for setting context)
|
162 |
system_instruction="You are a helpful AI assistant called Gemini-Toolkit. You can browse specific web pages provided by the user via the 'load_page' tool. You can also execute Python code using the 'code_execution' tool to perform calculations, analyze data, or demonstrate programming concepts. Explain your reasoning and the steps you take. If asked to browse, confirm the URL you are accessing. If providing code, explain what it does.",
|
163 |
)
|
164 |
print(f"Gemini client initialized with model: {MODEL_NAME} and tools.")
|
@@ -166,43 +146,37 @@ try:
|
|
166 |
except Exception as e:
|
167 |
print(f"CRITICAL ERROR: Error initializing Gemini client: {e}")
|
168 |
traceback.print_exc()
|
169 |
-
# Provide a fallback model or handle the error gracefully in the UI
|
170 |
model = None
|
171 |
tools = []
|
172 |
-
|
173 |
-
# raise SystemExit("Failed to initialize core Gemini model.") from e
|
174 |
|
175 |
# --- Gradio App Logic ---
|
176 |
|
177 |
def handle_function_call(function_call):
|
178 |
"""Executes the function call requested by the model."""
|
179 |
function_name = function_call.name
|
180 |
-
args = function_call.args #
|
181 |
|
182 |
-
print(f"Executing Function Call: {function_name} with args: {dict(args)}")
|
183 |
|
184 |
try:
|
185 |
if function_name == 'load_page':
|
186 |
url = args.get('url')
|
187 |
if url:
|
188 |
-
# Execute the actual function
|
189 |
function_response_content = load_page(url=url)
|
190 |
-
|
191 |
-
MAX_RESPONSE_LEN = 50000 # Limit characters sent back
|
192 |
if len(function_response_content) > MAX_RESPONSE_LEN:
|
193 |
print(f"Tool Response truncated from {len(function_response_content)} to {MAX_RESPONSE_LEN} chars.")
|
194 |
function_response_content = function_response_content[:MAX_RESPONSE_LEN] + "\n\n[... Tool Response Truncated Due to Size Limit ...]"
|
195 |
else:
|
196 |
function_response_content = "Error: URL parameter was missing in the function call. Please ensure the 'url' argument is provided."
|
197 |
else:
|
198 |
-
# Should not happen if tools are defined correctly and model uses them
|
199 |
print(f"Error: Received call for unknown function '{function_name}'")
|
200 |
function_response_content = f"Error: Unknown function '{function_name}' called by the model."
|
201 |
|
202 |
-
#
|
203 |
-
|
204 |
-
|
205 |
-
function_response=types.FunctionResponse(
|
206 |
name=function_name,
|
207 |
response={'content': function_response_content}
|
208 |
)
|
@@ -213,9 +187,9 @@ def handle_function_call(function_call):
|
|
213 |
except Exception as e:
|
214 |
print(f"Error during execution of function '{function_name}': {e}")
|
215 |
traceback.print_exc()
|
216 |
-
#
|
217 |
-
return types.Part(
|
218 |
-
function_response=types.FunctionResponse(
|
219 |
name=function_name,
|
220 |
response={'error': f"Failed to execute function {function_name}: {str(e)}"}
|
221 |
)
|
@@ -224,140 +198,142 @@ def handle_function_call(function_call):
|
|
224 |
def generate_response_with_tools(user_input, history_state):
|
225 |
"""Handles user input, interacts with Gemini (incl. tools), and manages history."""
|
226 |
if not model:
|
227 |
-
|
228 |
-
return "Error: The AI model (Gemini) could not be initialized. Please check the logs or API key configuration.", history_state or []
|
229 |
|
230 |
if not user_input.strip():
|
231 |
-
# Return immediately if input is empty, don't update history
|
232 |
-
# Let the UI handle showing this message without clearing history state
|
233 |
-
# For chatbot, we might just not send anything or return a specific tuple
|
234 |
-
# Returning just a message for the chatbot display:
|
235 |
return [[None, "Please enter a valid query."]], history_state or []
|
236 |
|
237 |
-
|
238 |
# --- History Management ---
|
239 |
-
# Load history from state (should be list of Content objects)
|
240 |
-
# Initialize if state is None or empty
|
241 |
conversation_history = history_state if isinstance(history_state, list) else []
|
242 |
-
|
243 |
-
|
244 |
-
conversation_history.append(types.Content(role="user", parts=[types.Part.from_text(user_input)]))
|
245 |
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
246 |
|
247 |
-
# Limit history length *before* sending to API to avoid excessive token usage/cost
|
248 |
-
# Keep the system instruction + last N turns. A turn = user msg + model response (potentially with tool calls/responses)
|
249 |
MAX_HISTORY_TURNS = 10
|
250 |
-
max_history_items = MAX_HISTORY_TURNS * 2 + (1 if conversation_history and conversation_history[0].role == "system" else 0)
|
251 |
-
|
252 |
if len(conversation_history) > max_history_items:
|
253 |
print(f"Trimming conversation history from {len(conversation_history)} items to ~{max_history_items}")
|
254 |
if conversation_history[0].role == "system":
|
255 |
-
# Keep system instruction and the latest items
|
256 |
conversation_history = [conversation_history[0]] + conversation_history[-(max_history_items-1):]
|
257 |
else:
|
258 |
-
# Just keep the latest items
|
259 |
conversation_history = conversation_history[-max_history_items:]
|
260 |
|
261 |
-
|
262 |
-
|
263 |
-
MAX_TOOL_LOOPS = 5 # Prevent infinite loops if the model keeps calling tools without finishing
|
264 |
loop_count = 0
|
265 |
-
current_history_for_api = list(conversation_history)
|
|
|
266 |
|
267 |
try:
|
268 |
while loop_count < MAX_TOOL_LOOPS:
|
269 |
loop_count += 1
|
270 |
print(f"Generation loop {loop_count}/{MAX_TOOL_LOOPS}...")
|
271 |
|
272 |
-
# Send context and query to Gemini
|
273 |
-
# Use the potentially trimmed history for this API call
|
274 |
response = model.generate_content(
|
275 |
current_history_for_api,
|
276 |
-
request_options={"timeout": 120},
|
277 |
-
# generation_config=genai.types.GenerationConfig( # If you need temperature etc.
|
278 |
-
# temperature=0.7
|
279 |
-
# )
|
280 |
)
|
281 |
|
282 |
-
# --- Process Response Candidate ---
|
283 |
if not response.candidates:
|
284 |
print("Warning: No candidates received from Gemini.")
|
285 |
-
# Append a message indicating no response
|
286 |
final_bot_message = "[No response generated by the model.]"
|
287 |
-
|
288 |
-
|
|
|
289 |
|
290 |
candidate = response.candidates[0]
|
|
|
|
|
291 |
|
292 |
-
#
|
293 |
-
|
294 |
-
|
295 |
-
|
296 |
-
|
297 |
-
|
298 |
-
|
299 |
-
|
300 |
-
|
301 |
-
|
302 |
-
|
303 |
-
|
304 |
-
|
305 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
306 |
break # Exit loop
|
307 |
|
308 |
-
#
|
309 |
-
|
310 |
-
|
311 |
-
# Append the model's response (which might contain text and/or tool calls) to history *before* execution
|
312 |
-
# The API expects the model's turn asking for the tool first.
|
313 |
-
current_history_for_api.append(candidate.content)
|
314 |
|
315 |
if has_tool_call:
|
316 |
print("Tool call requested by model.")
|
317 |
-
|
318 |
-
|
319 |
-
|
320 |
-
|
321 |
-
#
|
322 |
-
#
|
323 |
-
|
|
|
|
|
|
|
|
|
|
|
324 |
if not final_bot_message:
|
325 |
-
final_bot_message = "[Model indicated tool use but provided no
|
326 |
-
|
|
|
327 |
|
328 |
-
# Execute the function(s) and get responses
|
329 |
tool_responses = []
|
330 |
-
for
|
331 |
-
|
332 |
-
|
333 |
-
|
334 |
-
|
335 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
336 |
print("Added tool response(s) to history. Continuing loop...")
|
337 |
-
continue # Go back to the start of the while loop
|
338 |
|
339 |
-
else:
|
340 |
-
# No tool call, this is the final response from the model
|
341 |
print("No tool call requested. Final response received.")
|
342 |
-
|
343 |
-
|
344 |
-
# Also check for code execution *suggestions* or *results* in the final turn
|
345 |
code_parts_display = []
|
346 |
-
|
347 |
-
|
348 |
-
|
349 |
-
|
350 |
-
|
351 |
-
|
352 |
-
|
353 |
-
|
|
|
|
|
|
|
354 |
|
355 |
if code_parts_display:
|
356 |
final_bot_message += "\n\n" + "\n\n".join(code_parts_display)
|
357 |
|
358 |
-
# Handle empty final message case
|
359 |
if not final_bot_message.strip():
|
360 |
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
|
|
361 |
|
362 |
break # Exit the while loop
|
363 |
|
@@ -365,83 +341,90 @@ def generate_response_with_tools(user_input, history_state):
|
|
365 |
if loop_count >= MAX_TOOL_LOOPS:
|
366 |
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
367 |
final_bot_message = (final_bot_message + "\n\n" if final_bot_message else "") + f"[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
368 |
-
# Ensure the
|
369 |
-
if current_history_for_api[-1].role
|
370 |
-
|
|
|
|
|
|
|
|
|
|
|
371 |
|
372 |
|
373 |
print("--- Response Generation Complete ---")
|
374 |
-
#
|
375 |
-
# We return the *final* bot message text for display, and the *full* history state
|
376 |
-
# The chatbot UI needs [[user, bot], [user, bot], ...] format
|
377 |
-
# Create the Gradio chatbot display format from our history
|
378 |
chatbot_display_list = []
|
379 |
-
|
380 |
for i, content in enumerate(current_history_for_api):
|
381 |
-
|
382 |
-
|
383 |
-
|
384 |
-
|
385 |
-
|
386 |
-
|
387 |
-
|
388 |
-
|
389 |
-
|
390 |
-
|
391 |
-
|
392 |
-
|
393 |
-
|
394 |
-
|
395 |
-
|
396 |
-
|
397 |
-
|
398 |
-
|
399 |
-
|
400 |
-
|
401 |
-
|
402 |
-
|
403 |
-
|
404 |
-
|
405 |
-
|
406 |
-
|
407 |
-
|
408 |
-
|
409 |
-
|
410 |
-
|
411 |
-
|
412 |
-
|
413 |
-
|
414 |
-
|
415 |
-
|
416 |
-
|
417 |
-
|
418 |
-
|
419 |
-
|
420 |
-
|
421 |
-
|
422 |
-
|
|
|
|
|
|
|
|
|
423 |
|
424 |
except Exception as e:
|
425 |
print(f"ERROR during Gemini generation or tool processing: {str(e)}")
|
426 |
traceback.print_exc()
|
427 |
-
error_message = f"An error occurred
|
428 |
-
# Return error in chatbot format
|
429 |
-
|
430 |
-
|
431 |
-
if
|
432 |
-
#
|
433 |
-
|
434 |
-
|
435 |
-
|
436 |
-
|
437 |
-
|
438 |
-
|
439 |
-
|
440 |
-
|
441 |
-
|
442 |
-
|
443 |
-
|
444 |
-
|
|
|
445 |
|
446 |
|
447 |
# --- Gradio Interface ---
|
@@ -450,72 +433,62 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
450 |
gr.Markdown(f"# 🚀 Gemini AI Assistant ({MODEL_NAME})")
|
451 |
gr.Markdown("Ask questions, request info from specific URLs, or ask for code/calculations. Uses function calling and code execution.")
|
452 |
|
453 |
-
# Chatbot component to display conversation
|
454 |
chatbot_display = gr.Chatbot(
|
455 |
label="Conversation",
|
456 |
bubble_full_width=False,
|
457 |
-
height=600,
|
458 |
show_copy_button=True,
|
459 |
-
render_markdown=True
|
460 |
)
|
461 |
|
462 |
-
#
|
463 |
-
|
464 |
-
|
465 |
-
|
466 |
-
|
467 |
-
|
468 |
-
|
469 |
-
|
470 |
-
|
471 |
-
|
472 |
-
|
473 |
-
# Submit button (using default value seems fine)
|
474 |
-
send_btn = gr.Button("➡️ Send", variant="primary", scale=1)
|
475 |
-
|
476 |
|
477 |
# Hidden state to store the raw conversation history (list of genai.types.Content)
|
478 |
chat_history_state = gr.State([])
|
479 |
|
480 |
def user_message_update(user_message, history_display_list):
|
481 |
"""Appends the user's message to the display list and clears the input."""
|
482 |
-
if not user_message.strip():
|
483 |
-
return gr.update(value=""), history_display_list
|
484 |
-
|
485 |
-
return gr.update(value=""), history_display_list + [[user_message, None]]
|
486 |
|
487 |
def bot_response_update(history_display_list, history_state):
|
488 |
"""Calls the backend Gemini function and updates display/state."""
|
489 |
-
if not history_display_list or history_display_list[-1][
|
490 |
-
|
491 |
-
|
492 |
-
|
493 |
-
|
494 |
|
495 |
-
user_message = history_display_list[-1][0]
|
496 |
print(f"User message being sent to backend: {user_message}")
|
497 |
|
498 |
# Call the main Gemini interaction function
|
499 |
-
# It now returns the *entire* chat history for display, and the updated state
|
500 |
updated_display_list, updated_history_state = generate_response_with_tools(user_message, history_state)
|
501 |
|
502 |
-
# The backend function now returns the full display list
|
503 |
-
# Update the state variable directly
|
504 |
return updated_display_list, updated_history_state
|
505 |
|
506 |
-
#
|
507 |
msg_input.submit(
|
508 |
-
user_message_update,
|
509 |
[msg_input, chatbot_display],
|
510 |
[msg_input, chatbot_display],
|
511 |
-
queue=False,
|
512 |
).then(
|
513 |
-
bot_response_update,
|
514 |
-
[chatbot_display, chat_history_state],
|
515 |
-
[chatbot_display, chat_history_state]
|
516 |
)
|
517 |
|
518 |
-
# Define the action for clicking the Send button
|
519 |
send_btn.click(
|
520 |
user_message_update,
|
521 |
[msg_input, chatbot_display],
|
@@ -528,14 +501,17 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
528 |
)
|
529 |
|
530 |
# Setup the ClearButton to target the necessary components, including the state
|
531 |
-
|
532 |
-
#
|
533 |
-
|
|
|
|
|
|
|
|
|
|
|
534 |
|
535 |
|
536 |
if __name__ == "__main__":
|
537 |
print("Starting Gradio App...")
|
538 |
-
# Enable queue for handling potentially long API calls/tool executions
|
539 |
-
# Set share=True to get a public link (remove if only running locally)
|
540 |
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
|
541 |
print("Gradio App Stopped.")
|
|
|
1 |
# -*- coding: utf-8 -*-
|
2 |
import os
|
3 |
import gradio as gr
|
4 |
+
# Corrected import: Import the main module and use an alias
|
5 |
import google.generativeai as genai
|
6 |
+
# Types will be accessed via genai.types
|
7 |
+
|
8 |
+
# Removed direct type imports, will use genai.types instead
|
9 |
+
# from google.generativeai import types # No longer needed
|
10 |
+
# from google.generativeai.types import HarmCategory, HarmBlockThreshold # No longer needed
|
11 |
+
|
12 |
import requests
|
13 |
import markdownify
|
14 |
from urllib.robotparser import RobotFileParser
|
|
|
20 |
|
21 |
def can_crawl_url(url: str, user_agent: str = "PythonGoogleGenAIAgent/1.0") -> bool:
|
22 |
"""Check robots.txt permissions for a URL"""
|
|
|
23 |
if not url:
|
24 |
print("No URL provided to can_crawl_url")
|
25 |
return False
|
|
|
27 |
parsed_url = urlparse(url)
|
28 |
if not parsed_url.scheme or not parsed_url.netloc:
|
29 |
print(f"Invalid URL format for robots.txt check: {url}")
|
30 |
+
return False
|
|
|
31 |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
|
32 |
print(f"Checking robots.txt at: {robots_url} for URL: {url}")
|
|
|
|
|
33 |
rp = RobotFileParser()
|
34 |
rp.set_url(robots_url)
|
35 |
rp.read()
|
|
|
38 |
return can_fetch
|
39 |
except Exception as e:
|
40 |
print(f"Error checking robots.txt for {url}: {e}")
|
|
|
41 |
return False
|
42 |
|
43 |
def load_page(url: str) -> str:
|
|
|
54 |
if not url.startswith(('http://', 'https://')):
|
55 |
return f"Error: Invalid URL scheme. Please provide http or https URL. Got: {url}"
|
56 |
|
57 |
+
USER_AGENT = "PythonGoogleGenAIAgent/1.0 (Function Calling)"
|
58 |
if not can_crawl_url(url, user_agent=USER_AGENT):
|
59 |
print(f"URL {url} failed robots.txt check for agent {USER_AGENT}")
|
60 |
return f"Error: Access denied by robots.txt for URL {url}"
|
61 |
try:
|
62 |
headers = {'User-Agent': USER_AGENT}
|
63 |
response = requests.get(url, timeout=15, headers=headers, allow_redirects=True)
|
64 |
+
response.raise_for_status()
|
|
|
|
|
65 |
content_type = response.headers.get('content-type', '').lower()
|
66 |
if 'html' not in content_type:
|
67 |
print(f"Non-HTML content type '{content_type}' at {url}. Returning summary.")
|
|
|
68 |
return f"Content at {url} is of type '{content_type}'. Size: {len(response.content)} bytes. Cannot convert to Markdown."
|
69 |
|
70 |
+
MAX_CONTENT_SIZE = 1_000_000
|
|
|
71 |
if len(response.content) > MAX_CONTENT_SIZE:
|
72 |
print(f"Content size {len(response.content)} exceeds limit {MAX_CONTENT_SIZE}. Truncating.")
|
|
|
73 |
try:
|
74 |
html_content = response.content[:MAX_CONTENT_SIZE].decode(response.apparent_encoding or 'utf-8', errors='ignore')
|
75 |
except Exception as decode_err:
|
|
|
77 |
html_content = response.content[:MAX_CONTENT_SIZE].decode('utf-8', errors='ignore')
|
78 |
truncated_msg = "\n\n[Content truncated due to size limit]"
|
79 |
else:
|
80 |
+
html_content = response.text
|
81 |
truncated_msg = ""
|
82 |
|
|
|
|
|
83 |
markdown_content = markdownify.markdownify(html_content, heading_style="ATX", strip=['script', 'style'], escape_underscores=False)
|
|
|
|
|
84 |
markdown_content = '\n'.join([line.strip() for line in markdown_content.splitlines() if line.strip()])
|
|
|
85 |
print(f"Successfully loaded and converted {url} to markdown.")
|
|
|
86 |
return f"Content from {url}:\n\n" + markdown_content + truncated_msg
|
87 |
|
88 |
except requests.exceptions.Timeout:
|
|
|
93 |
return f"Error loading page {url}: {str(e)}"
|
94 |
except Exception as e:
|
95 |
print(f"General error loading page {url}: {str(e)}")
|
96 |
+
traceback.print_exc()
|
97 |
return f"Error loading page {url}: An unexpected error occurred ({type(e).__name__})."
|
98 |
|
99 |
|
|
|
102 |
api_key = os.environ.get("GEMINI_API_KEY")
|
103 |
if not api_key:
|
104 |
raise ValueError("GEMINI_API_KEY environment variable not set.")
|
105 |
+
# Use genai (the alias) to configure
|
106 |
genai.configure(api_key=api_key)
|
107 |
|
|
|
108 |
MODEL_NAME = "gemini-2.5-pro-exp-03-25"
|
109 |
print(f"Attempting to use EXPERIMENTAL model: {MODEL_NAME}")
|
110 |
|
111 |
+
# Define tools using genai.types
|
112 |
+
browse_tool = genai.types.Tool(
|
113 |
function_declarations=[
|
114 |
+
genai.types.FunctionDeclaration(
|
115 |
name='load_page',
|
116 |
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
117 |
+
parameters=genai.types.Schema(
|
118 |
+
type=genai.types.Type.OBJECT,
|
119 |
properties={
|
120 |
+
'url': genai.types.Schema(type=genai.types.Type.STRING, description="The *full* URL of the webpage to load (must start with http:// or https://).")
|
121 |
},
|
122 |
required=['url']
|
123 |
)
|
124 |
)
|
125 |
]
|
126 |
)
|
127 |
+
code_execution_tool = genai.types.Tool(code_execution=genai.types.ToolCodeExecution()) # Note: Simplified access
|
|
|
|
|
128 |
|
|
|
129 |
tools = [browse_tool, code_execution_tool]
|
130 |
|
131 |
+
# Create the model instance using genai alias
|
132 |
model = genai.GenerativeModel(
|
133 |
model_name=MODEL_NAME,
|
134 |
tools=tools,
|
|
|
|
|
|
|
135 |
safety_settings={
|
136 |
+
# Access HarmCategory and HarmBlockThreshold via genai.types
|
137 |
+
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
138 |
+
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
139 |
+
genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
140 |
+
genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
141 |
},
|
|
|
142 |
system_instruction="You are a helpful AI assistant called Gemini-Toolkit. You can browse specific web pages provided by the user via the 'load_page' tool. You can also execute Python code using the 'code_execution' tool to perform calculations, analyze data, or demonstrate programming concepts. Explain your reasoning and the steps you take. If asked to browse, confirm the URL you are accessing. If providing code, explain what it does.",
|
143 |
)
|
144 |
print(f"Gemini client initialized with model: {MODEL_NAME} and tools.")
|
|
|
146 |
except Exception as e:
|
147 |
print(f"CRITICAL ERROR: Error initializing Gemini client: {e}")
|
148 |
traceback.print_exc()
|
|
|
149 |
model = None
|
150 |
tools = []
|
151 |
+
|
|
|
152 |
|
153 |
# --- Gradio App Logic ---
|
154 |
|
155 |
def handle_function_call(function_call):
|
156 |
"""Executes the function call requested by the model."""
|
157 |
function_name = function_call.name
|
158 |
+
args = function_call.args # Dict-like object
|
159 |
|
160 |
+
print(f"Executing Function Call: {function_name} with args: {dict(args)}")
|
161 |
|
162 |
try:
|
163 |
if function_name == 'load_page':
|
164 |
url = args.get('url')
|
165 |
if url:
|
|
|
166 |
function_response_content = load_page(url=url)
|
167 |
+
MAX_RESPONSE_LEN = 50000
|
|
|
168 |
if len(function_response_content) > MAX_RESPONSE_LEN:
|
169 |
print(f"Tool Response truncated from {len(function_response_content)} to {MAX_RESPONSE_LEN} chars.")
|
170 |
function_response_content = function_response_content[:MAX_RESPONSE_LEN] + "\n\n[... Tool Response Truncated Due to Size Limit ...]"
|
171 |
else:
|
172 |
function_response_content = "Error: URL parameter was missing in the function call. Please ensure the 'url' argument is provided."
|
173 |
else:
|
|
|
174 |
print(f"Error: Received call for unknown function '{function_name}'")
|
175 |
function_response_content = f"Error: Unknown function '{function_name}' called by the model."
|
176 |
|
177 |
+
# Use genai.types for FunctionResponse and Part
|
178 |
+
function_response_part = genai.types.Part(
|
179 |
+
function_response=genai.types.FunctionResponse(
|
|
|
180 |
name=function_name,
|
181 |
response={'content': function_response_content}
|
182 |
)
|
|
|
187 |
except Exception as e:
|
188 |
print(f"Error during execution of function '{function_name}': {e}")
|
189 |
traceback.print_exc()
|
190 |
+
# Use genai.types here too
|
191 |
+
return genai.types.Part(
|
192 |
+
function_response=genai.types.FunctionResponse(
|
193 |
name=function_name,
|
194 |
response={'error': f"Failed to execute function {function_name}: {str(e)}"}
|
195 |
)
|
|
|
198 |
def generate_response_with_tools(user_input, history_state):
|
199 |
"""Handles user input, interacts with Gemini (incl. tools), and manages history."""
|
200 |
if not model:
|
201 |
+
return [[None, "Error: The AI model (Gemini) could not be initialized. Please check the logs or API key configuration."]], history_state or []
|
|
|
202 |
|
203 |
if not user_input.strip():
|
|
|
|
|
|
|
|
|
204 |
return [[None, "Please enter a valid query."]], history_state or []
|
205 |
|
|
|
206 |
# --- History Management ---
|
|
|
|
|
207 |
conversation_history = history_state if isinstance(history_state, list) else []
|
208 |
+
# Use genai.types for Content and Part
|
209 |
+
conversation_history.append(genai.types.Content(role="user", parts=[genai.types.Part.from_text(user_input)]))
|
|
|
210 |
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
211 |
|
|
|
|
|
212 |
MAX_HISTORY_TURNS = 10
|
213 |
+
max_history_items = MAX_HISTORY_TURNS * 2 + (1 if conversation_history and conversation_history[0].role == "system" else 0)
|
|
|
214 |
if len(conversation_history) > max_history_items:
|
215 |
print(f"Trimming conversation history from {len(conversation_history)} items to ~{max_history_items}")
|
216 |
if conversation_history[0].role == "system":
|
|
|
217 |
conversation_history = [conversation_history[0]] + conversation_history[-(max_history_items-1):]
|
218 |
else:
|
|
|
219 |
conversation_history = conversation_history[-max_history_items:]
|
220 |
|
221 |
+
# --- Interaction Loop ---
|
222 |
+
MAX_TOOL_LOOPS = 5
|
|
|
223 |
loop_count = 0
|
224 |
+
current_history_for_api = list(conversation_history)
|
225 |
+
final_bot_message = "" # Initialize variable to hold the final message text
|
226 |
|
227 |
try:
|
228 |
while loop_count < MAX_TOOL_LOOPS:
|
229 |
loop_count += 1
|
230 |
print(f"Generation loop {loop_count}/{MAX_TOOL_LOOPS}...")
|
231 |
|
|
|
|
|
232 |
response = model.generate_content(
|
233 |
current_history_for_api,
|
234 |
+
request_options={"timeout": 120},
|
|
|
|
|
|
|
235 |
)
|
236 |
|
|
|
237 |
if not response.candidates:
|
238 |
print("Warning: No candidates received from Gemini.")
|
|
|
239 |
final_bot_message = "[No response generated by the model.]"
|
240 |
+
# Use genai.types here
|
241 |
+
current_history_for_api.append(genai.types.Content(role="model", parts=[genai.types.Part.from_text(final_bot_message)]))
|
242 |
+
break
|
243 |
|
244 |
candidate = response.candidates[0]
|
245 |
+
# Access FinishReason via genai.types
|
246 |
+
finish_reason = candidate.finish_reason
|
247 |
|
248 |
+
# Append model's turn to history *before* potentially executing tools
|
249 |
+
# This includes text parts and potential function_call parts
|
250 |
+
if candidate.content:
|
251 |
+
current_history_for_api.append(candidate.content)
|
252 |
+
else:
|
253 |
+
print("Warning: Candidate content is empty.")
|
254 |
+
# Decide how to handle this - perhaps break or log and continue?
|
255 |
+
# If finish_reason indicates a stop, maybe just break.
|
256 |
+
# If it indicates TOOL_CALL without content, that's an error state.
|
257 |
+
|
258 |
+
# Check for safety or unexpected stops first
|
259 |
+
# Use genai.types for FinishReason comparison
|
260 |
+
if finish_reason not in (genai.types.Candidate.FinishReason.STOP, genai.types.Candidate.FinishReason.TOOL_CALL):
|
261 |
+
print(f"Warning: Generation stopped unexpectedly. Reason: {finish_reason.name}")
|
262 |
+
stop_reason_msg = f"[Model stopped generating. Reason: {finish_reason.name}]"
|
263 |
+
# Extract any partial text response
|
264 |
+
partial_text = ""
|
265 |
+
if candidate.content and candidate.content.parts:
|
266 |
+
partial_text = "".join([p.text for p in candidate.content.parts if p.text])
|
267 |
+
final_bot_message = (partial_text + "\n" if partial_text else "") + stop_reason_msg
|
268 |
+
# We already appended the content, so the history is up-to-date with the partial model turn.
|
269 |
break # Exit loop
|
270 |
|
271 |
+
# Check for Tool Call
|
272 |
+
# Use genai.types for FinishReason comparison
|
273 |
+
has_tool_call = finish_reason == genai.types.Candidate.FinishReason.TOOL_CALL
|
|
|
|
|
|
|
274 |
|
275 |
if has_tool_call:
|
276 |
print("Tool call requested by model.")
|
277 |
+
if not candidate.content or not candidate.content.parts:
|
278 |
+
print("Error: TOOL_CALL indicated but candidate content is empty.")
|
279 |
+
final_bot_message = "[Model indicated tool use but provided no details.]"
|
280 |
+
# Append error message as model turn?
|
281 |
+
# current_history_for_api.append(genai.types.Content(role="model", parts=[genai.types.Part.from_text(final_bot_message)]))
|
282 |
+
break # Exit loop
|
283 |
+
|
284 |
+
function_calls = [part.function_call for part in candidate.content.parts if hasattr(part, 'function_call')]
|
285 |
+
|
286 |
+
if not function_calls:
|
287 |
+
print("Warning: TOOL_CALL finish reason but no function_call part found in content.")
|
288 |
+
final_bot_message = "".join([p.text for p in candidate.content.parts if p.text]) # Capture any text
|
289 |
if not final_bot_message:
|
290 |
+
final_bot_message = "[Model indicated tool use but provided no callable function.]"
|
291 |
+
# Model turn with text (if any) is already in history
|
292 |
+
break # Exit loop
|
293 |
|
|
|
294 |
tool_responses = []
|
295 |
+
for func_call in function_calls:
|
296 |
+
if func_call: # Ensure it's not None
|
297 |
+
function_response_part = handle_function_call(func_call)
|
298 |
+
tool_responses.append(function_response_part)
|
299 |
+
else:
|
300 |
+
print("Warning: Encountered None value where function_call was expected.")
|
301 |
+
|
302 |
+
if not tool_responses:
|
303 |
+
print("Warning: No valid tool responses generated despite TOOL_CALL.")
|
304 |
+
# Decide how to proceed. Maybe break?
|
305 |
+
final_bot_message = "[Failed to process tool call request.]"
|
306 |
+
break
|
307 |
+
|
308 |
+
# Add the tool execution results to history
|
309 |
+
# Use genai.types for Content
|
310 |
+
current_history_for_api.append(genai.types.Content(role="tool", parts=tool_responses))
|
311 |
print("Added tool response(s) to history. Continuing loop...")
|
312 |
+
continue # Go back to the start of the while loop
|
313 |
|
314 |
+
else: # FinishReason == STOP
|
|
|
315 |
print("No tool call requested. Final response received.")
|
316 |
+
# Extract final text and any code suggestions/results
|
317 |
+
final_bot_message = ""
|
|
|
318 |
code_parts_display = []
|
319 |
+
if candidate.content and candidate.content.parts:
|
320 |
+
for part in candidate.content.parts:
|
321 |
+
if hasattr(part, 'text'):
|
322 |
+
final_bot_message += part.text
|
323 |
+
if hasattr(part, 'executable_code') and part.executable_code:
|
324 |
+
lang = part.executable_code.language.name.lower() if part.executable_code.language else "python"
|
325 |
+
code = part.executable_code.code
|
326 |
+
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
|
327 |
+
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
328 |
+
outcome_str = "Success" if part.code_execution_result.outcome == genai.types.ExecutableCodeResponse.Outcome.OK else "Failure" # Adjusted reference
|
329 |
+
code_parts_display.append(f"Code Execution Result ({outcome_str}):\n```\n{part.code_execution_result.output}\n```")
|
330 |
|
331 |
if code_parts_display:
|
332 |
final_bot_message += "\n\n" + "\n\n".join(code_parts_display)
|
333 |
|
|
|
334 |
if not final_bot_message.strip():
|
335 |
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
336 |
+
# The empty model turn is already in history
|
337 |
|
338 |
break # Exit the while loop
|
339 |
|
|
|
341 |
if loop_count >= MAX_TOOL_LOOPS:
|
342 |
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
343 |
final_bot_message = (final_bot_message + "\n\n" if final_bot_message else "") + f"[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
344 |
+
# Ensure the loop warning is part of the last model message if needed
|
345 |
+
if current_history_for_api[-1].role == "model":
|
346 |
+
# Append warning to the existing last message parts (simplistic)
|
347 |
+
# Use genai.types for Part
|
348 |
+
current_history_for_api[-1].parts.append(genai.types.Part.from_text(f"\n[Warning: Max loops reached]"))
|
349 |
+
else:
|
350 |
+
# Use genai.types for Content and Part
|
351 |
+
current_history_for_api.append(genai.types.Content(role="model", parts=[genai.types.Part.from_text(final_bot_message)]))
|
352 |
|
353 |
|
354 |
print("--- Response Generation Complete ---")
|
355 |
+
# --- Format final output for Gradio Chatbot ---
|
|
|
|
|
|
|
356 |
chatbot_display_list = []
|
357 |
+
user_msg_buffer = None # To hold user message until bot reply comes
|
358 |
for i, content in enumerate(current_history_for_api):
|
359 |
+
if content.role == "system": continue # Skip system prompt in display
|
360 |
+
|
361 |
+
# Combine parts into a single message string for display
|
362 |
+
display_text = ""
|
363 |
+
if content.parts: # Check if parts exist
|
364 |
+
for part in content.parts:
|
365 |
+
# Check attributes safely before accessing
|
366 |
+
if hasattr(part, 'text'):
|
367 |
+
display_text += part.text + "\n"
|
368 |
+
elif hasattr(part, 'executable_code') and part.executable_code:
|
369 |
+
lang = part.executable_code.language.name.lower() if hasattr(part.executable_code, 'language') and part.executable_code.language else "python"
|
370 |
+
code = part.executable_code.code if hasattr(part.executable_code, 'code') else ""
|
371 |
+
display_text += f"\nSuggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
|
372 |
+
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
373 |
+
# Use genai.types.ExecutableCodeResponse.Outcome
|
374 |
+
outcome_ok = genai.types.ExecutableCodeResponse.Outcome.OK if hasattr(genai.types, 'ExecutableCodeResponse') else 1 # Fallback if type not found? Be careful
|
375 |
+
outcome_str = "Success" if part.code_execution_result.outcome == outcome_ok else "Failure"
|
376 |
+
output = part.code_execution_result.output if hasattr(part.code_execution_result, 'output') else ""
|
377 |
+
display_text += f"\nCode Execution Result ({outcome_str}):\n```\n{output}\n```\n"
|
378 |
+
# Optional: Display indications of tool use (can make chat noisy)
|
379 |
+
# elif hasattr(part, 'function_call') and part.function_call: display_text += f"[Requesting tool: {part.function_call.name}...]\n"
|
380 |
+
# elif hasattr(part, 'function_response') and part.function_response: display_text += f"[Tool '{part.function_response.name}' response processed.]\n"
|
381 |
+
|
382 |
+
display_text = display_text.strip()
|
383 |
+
|
384 |
+
if not display_text and content.role != 'tool': continue # Skip empty non-tool turns
|
385 |
+
|
386 |
+
if content.role == "user":
|
387 |
+
user_msg_buffer = display_text # Store user message
|
388 |
+
# Don't append to display list yet, wait for model response
|
389 |
+
elif content.role == "model":
|
390 |
+
if user_msg_buffer is not None:
|
391 |
+
# We have a user message and now the model's response
|
392 |
+
chatbot_display_list.append([user_msg_buffer, display_text])
|
393 |
+
user_msg_buffer = None # Clear buffer
|
394 |
+
else:
|
395 |
+
# Model message without preceding user message (e.g., initial greeting or consecutive model turns)
|
396 |
+
chatbot_display_list.append([None, display_text])
|
397 |
+
# Ignore 'tool' role messages in the chatbot display list
|
398 |
+
|
399 |
+
# If the loop ended with a user message still in the buffer (e.g., error before model reply)
|
400 |
+
if user_msg_buffer is not None:
|
401 |
+
chatbot_display_list.append([user_msg_buffer, None]) # Show user msg, no bot reply yet
|
402 |
+
|
403 |
+
|
404 |
+
return chatbot_display_list, current_history_for_api
|
405 |
|
406 |
except Exception as e:
|
407 |
print(f"ERROR during Gemini generation or tool processing: {str(e)}")
|
408 |
traceback.print_exc()
|
409 |
+
error_message = f"An error occurred: {str(e)}"
|
410 |
+
# Return error in chatbot format, maintain previous history state
|
411 |
+
# Build display history from existing state + error
|
412 |
+
error_display_list = []
|
413 |
+
if isinstance(history_state, list):
|
414 |
+
# Simplified history-to-display conversion for error case
|
415 |
+
temp_user_msg = None
|
416 |
+
for content in history_state:
|
417 |
+
if content.role == "user": temp_user_msg = content.parts[0].text
|
418 |
+
elif content.role == "model" and temp_user_msg:
|
419 |
+
model_text = "".join([p.text for p in content.parts if hasattr(p, 'text')])
|
420 |
+
error_display_list.append([temp_user_msg, model_text])
|
421 |
+
temp_user_msg = None
|
422 |
+
if temp_user_msg: error_display_list.append([temp_user_msg, None]) # Append dangling user message
|
423 |
+
|
424 |
+
error_display_list.append([None, error_message]) # Add the error message
|
425 |
+
|
426 |
+
# Return the state *before* the error occurred
|
427 |
+
return error_display_list, conversation_history[:-1] # Exclude the failed user turn
|
428 |
|
429 |
|
430 |
# --- Gradio Interface ---
|
|
|
433 |
gr.Markdown(f"# 🚀 Gemini AI Assistant ({MODEL_NAME})")
|
434 |
gr.Markdown("Ask questions, request info from specific URLs, or ask for code/calculations. Uses function calling and code execution.")
|
435 |
|
|
|
436 |
chatbot_display = gr.Chatbot(
|
437 |
label="Conversation",
|
438 |
bubble_full_width=False,
|
439 |
+
height=600,
|
440 |
show_copy_button=True,
|
441 |
+
render_markdown=True
|
442 |
)
|
443 |
|
444 |
+
with gr.Row(): # Arrange input and buttons horizontally
|
445 |
+
msg_input = gr.Textbox(
|
446 |
+
label="Your Query",
|
447 |
+
placeholder="Ask anything...",
|
448 |
+
lines=3,
|
449 |
+
scale=4 # Input takes more space
|
450 |
+
)
|
451 |
+
with gr.Column(scale=1, min_width=150): # Column for buttons
|
452 |
+
send_btn = gr.Button("➡️ Send", variant="primary")
|
453 |
+
clear_btn = gr.ClearButton(value="🗑️ Clear Chat")
|
|
|
|
|
|
|
|
|
454 |
|
455 |
# Hidden state to store the raw conversation history (list of genai.types.Content)
|
456 |
chat_history_state = gr.State([])
|
457 |
|
458 |
def user_message_update(user_message, history_display_list):
|
459 |
"""Appends the user's message to the display list and clears the input."""
|
460 |
+
if not user_message.strip():
|
461 |
+
return gr.update(value=""), history_display_list
|
462 |
+
return gr.update(value=""), history_display_list + [[user_message, None]] # Add placeholder for bot response
|
|
|
463 |
|
464 |
def bot_response_update(history_display_list, history_state):
|
465 |
"""Calls the backend Gemini function and updates display/state."""
|
466 |
+
if not history_display_list or history_display_list[-1][1] is not None:
|
467 |
+
# Only proceed if there is a pending user message (placeholder is None)
|
468 |
+
print("Bot update called without pending user message.")
|
469 |
+
# Should return current state if called incorrectly
|
470 |
+
return history_display_list, history_state
|
471 |
|
472 |
+
user_message = history_display_list[-1][0]
|
473 |
print(f"User message being sent to backend: {user_message}")
|
474 |
|
475 |
# Call the main Gemini interaction function
|
|
|
476 |
updated_display_list, updated_history_state = generate_response_with_tools(user_message, history_state)
|
477 |
|
|
|
|
|
478 |
return updated_display_list, updated_history_state
|
479 |
|
480 |
+
# --- Event Listeners ---
|
481 |
msg_input.submit(
|
482 |
+
user_message_update,
|
483 |
[msg_input, chatbot_display],
|
484 |
[msg_input, chatbot_display],
|
485 |
+
queue=False,
|
486 |
).then(
|
487 |
+
bot_response_update,
|
488 |
+
[chatbot_display, chat_history_state],
|
489 |
+
[chatbot_display, chat_history_state] # Update display and state
|
490 |
)
|
491 |
|
|
|
492 |
send_btn.click(
|
493 |
user_message_update,
|
494 |
[msg_input, chatbot_display],
|
|
|
501 |
)
|
502 |
|
503 |
# Setup the ClearButton to target the necessary components, including the state
|
504 |
+
# Use list comprehension/lambda if add doesn't accept state directly, or define custom clear fn
|
505 |
+
#clear_btn.add(components=[msg_input, chatbot_display, chat_history_state]) # May not work with state
|
506 |
+
|
507 |
+
# Custom clear function is safer for state
|
508 |
+
def clear_all():
|
509 |
+
return ["", None, []] # Clears Textbox, Chatbot display, State
|
510 |
+
|
511 |
+
clear_btn.click(clear_all, [], [msg_input, chatbot_display, chat_history_state], queue=False)
|
512 |
|
513 |
|
514 |
if __name__ == "__main__":
|
515 |
print("Starting Gradio App...")
|
|
|
|
|
516 |
demo.queue().launch(server_name="0.0.0.0", server_port=7860)
|
517 |
print("Gradio App Stopped.")
|