Spaces:
Running
Running
File size: 11,002 Bytes
69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd 897173a 69dbdbd e0dd43c 69dbdbd 897173a e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 69dbdbd 897173a 69dbdbd e0dd43c 897173a 401be78 69dbdbd 401be78 69dbdbd 401be78 897173a 401be78 69dbdbd 1b0c4df e0dd43c 69dbdbd e0dd43c 69dbdbd e0dd43c 366b8c6 6492525 401be78 69dbdbd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
#!/usr/bin/env python
# coding=utf-8
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
import os
import re
import json
from typing import Optional
import logging
from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types
from smolagents.agents import ActionStep, MultiStepAgent
from smolagents.memory import MemoryStep
from smolagents.utils import _is_package_available
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def pull_messages_from_step(step_log: MemoryStep):
"""Extract ChatMessage objects from agent steps with proper nesting"""
import gradio as gr
if isinstance(step_log, ActionStep):
step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")
if hasattr(step_log, "model_output") and step_log.model_output is not None:
model_output = step_log.model_output.strip()
model_output = re.sub(r"```\s*<end_code>", "```", model_output)
model_output = re.sub(r"<end_code>\s*```", "```", model_output)
model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output)
model_output = model_output.strip()
yield gr.ChatMessage(role="assistant", content=model_output)
if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
first_tool_call = step_log.tool_calls[0]
used_code = first_tool_call.name == "python_interpreter"
parent_id = f"call_{len(step_log.tool_calls)}"
args = first_tool_call.arguments
content = str(args.get("answer", str(args))) if isinstance(args, dict) else str(args).strip()
if used_code:
content = re.sub(r"```.*?\n", "", content)
content = re.sub(r"\s*<end_code>\s*", "", content)
content = content.strip()
if not content.startswith("```python"):
content = f"```python\n{content}\n```"
parent_message_tool = gr.ChatMessage(
role="assistant",
content=content,
metadata={"title": f"π οΈ Used tool {first_tool_call.name}", "id": parent_id, "status": "pending"}
)
yield parent_message_tool
if hasattr(step_log, "observations") and step_log.observations and step_log.observations.strip():
log_content = re.sub(r"^Execution logs:\s*", "", step_log.observations.strip())
if log_content:
try:
# Try to parse as JSON for table data
data = json.loads(log_content)
if isinstance(data, list) and data and isinstance(data[0], list):
# Format as markdown table
headers = data[0]
rows = data[1:]
table_md = "| " + " | ".join(headers) + " |\n"
table_md += "| " + " | ".join(["---"] * len(headers)) + " |\n"
for row in rows:
table_md += "| " + " | ".join(str(cell) for cell in row) + " |\n"
yield gr.ChatMessage(
role="assistant",
content=table_md,
metadata={"title": "π Table Data", "parent_id": parent_id, "status": "done"}
)
else:
yield gr.ChatMessage(
role="assistant",
content=log_content,
metadata={"title": "π Execution Logs", "parent_id": parent_id, "status": "done"}
)
except json.JSONDecodeError:
yield gr.ChatMessage(
role="assistant",
content=log_content,
metadata={"title": "π Execution Logs", "parent_id": parent_id, "status": "done"}
)
if hasattr(step_log, "error") and step_log.error is not None:
yield gr.ChatMessage(
role="assistant",
content=str(step_log.error),
metadata={"title": "π₯ Error", "parent_id": parent_id, "status": "done"}
)
parent_message_tool.metadata["status"] = "done"
elif hasattr(step_log, "error") and step_log.error is not None:
yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "π₯ Error"})
step_footnote = f"{step_number}"
if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"):
token_str = f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}"
step_footnote += token_str
if hasattr(step_log, "duration"):
step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None
step_footnote += step_duration
step_footnote = f"""<span style="color: #bbbbc2; font-size: 12px;">{step_footnote}</span> """
yield gr.ChatMessage(role="assistant", content=f"{step_footnote}")
yield gr.ChatMessage(role="assistant", content="-----")
if hasattr(step_log, "observations") and step_log.observations:
for line in step_log.observations.split("\n"):
if line.startswith("Screenshot saved at:"):
screenshot_path = line.replace("Screenshot saved at: ", "").strip()
logger.debug(f"Yielding screenshot: {screenshot_path}")
yield gr.ChatMessage(
role="assistant",
content={"path": screenshot_path, "mime_type": "image/png"},
metadata={"title": "πΈ Screenshot"}
)
elif line.endswith("_detected.png"):
yield gr.ChatMessage(
role="assistant",
content={"path": line.strip(), "mime_type": "image/png"},
metadata={"title": "πΌοΈ Detected Elements"}
)
elif line and not line.startswith("Current url:"):
yield gr.ChatMessage(
role="assistant",
content=line,
metadata={"title": "π Scraped Text"}
)
def stream_to_gradio(initialize_agent, task: str, api_key: str = None, reset_agent_memory: bool = False, additional_args: Optional[dict] = None):
if not _is_package_available("gradio"):
raise ModuleNotFoundError("Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`")
import gradio as gr
logger.debug(f"Received api_key: {'****' if api_key else 'None'}")
agent = initialize_agent(api_key)
total_input_tokens = 0
total_output_tokens = 0
for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
input_tokens = agent.model.last_input_token_count
output_tokens = agent.model.last_output_token_count
logger.debug(f"Input tokens: {input_tokens}, Output tokens: {output_tokens}")
if input_tokens is not None:
total_input_tokens += input_tokens
if output_tokens is not None:
total_output_tokens += output_tokens
if isinstance(step_log, ActionStep):
step_log.input_token_count = input_tokens if input_tokens is not None else 0
step_log.output_token_count = output_tokens if output_tokens is not None else 0
for message in pull_messages_from_step(step_log):
yield message
final_answer = step_log
final_answer = handle_agent_output_types(final_answer)
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(role="assistant", content=f"**Final answer:**\n{final_answer.to_string()}\n")
elif isinstance(final_answer, AgentImage):
yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "image/png"})
elif isinstance(final_answer, AgentAudio):
yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "audio/wav"})
else:
yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}")
class GradioUI:
def __init__(self, initialize_agent):
if not _is_package_available("gradio"):
raise ModuleNotFoundError("Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`")
self.initialize_agent = initialize_agent
self.messages = [] # Initialize messages as a class attribute
def interact_with_agent(self, prompt, api_key):
import gradio as gr
self.messages.append(gr.ChatMessage(role="user", content=prompt))
yield self.messages
for msg in stream_to_gradio(self.initialize_agent, task=prompt, api_key=api_key, reset_agent_memory=False):
self.messages.append(msg)
yield self.messages
yield self.messages
def launch(self, **kwargs):
import gradio as gr
css = """
.chatbot .avatar-container {
display: flex !important;
justify-content: center !important;
align-items: center !important;
width: 40px !important;
height: 40px !important;
overflow: hidden !important;
}
.chatbot .avatar-container img {
width: 100% !important;
height: 100% !important;
object-fit: cover !important;
border-radius: 50% !important;
}
"""
with gr.Blocks(fill_height=True, css=css) as demo:
gr.Markdown("**Note**: Please provide your own Gemini API key below. The default key may run out of quota.")
api_key_input = gr.Textbox(
lines=1, label="Gemini API Key (optional)", placeholder="Enter your Gemini API key here", type="password"
)
chatbot = gr.Chatbot(
label="Web Navigation Agent", type="messages",
avatar_images=(None, "./icon.png"), scale=1, height=600
)
text_input = gr.Textbox(
lines=1, label="Enter URL and request (e.g., navigate to https://en.wikipedia.org/wiki/Nvidia, and provide me info on its history)"
)
text_input.submit(self.interact_with_agent, [text_input, api_key_input], [chatbot])
demo.launch(debug=True, **kwargs)
__all__ = ["stream_to_gradio", "GradioUI"] |