Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import json | |
import mimetypes | |
import os | |
import re | |
import shutil | |
import threading | |
from typing import Optional | |
import gradio as gr | |
from dotenv import load_dotenv | |
from huggingface_hub import login | |
from smolagents import ( | |
CodeAgent, | |
HfApiModel, | |
Tool, | |
GoogleSearchTool | |
) | |
from smolagents.agent_types import ( | |
AgentAudio, | |
AgentImage, | |
AgentText, | |
handle_agent_output_types, | |
) | |
from smolagents.gradio_ui import stream_to_gradio | |
from scripts.text_inspector_tool import TextInspectorTool | |
from scripts.text_web_browser import ( | |
ArchiveSearchTool, | |
FinderTool, | |
FindNextTool, | |
PageDownTool, | |
PageUpTool, | |
SimpleTextBrowser, | |
VisitTool, | |
) | |
from scripts.visual_qa import visualizer | |
web_search = GoogleSearchTool(provider="serper") | |
print(web_search(query="Donald Trump news")) | |
# quit() | |
AUTHORIZED_IMPORTS = [ | |
"requests", | |
"zipfile", | |
"pandas", | |
"numpy", | |
"sympy", | |
"json", | |
"bs4", | |
"pubchempy", | |
"xml", | |
"yahoo_finance", | |
"Bio", | |
"sklearn", | |
"scipy", | |
"pydub", | |
"PIL", | |
"chess", | |
"PyPDF2", | |
"pptx", | |
"torch", | |
"datetime", | |
"fractions", | |
"csv", | |
] | |
load_dotenv(override=True) | |
login(os.getenv("HF_TOKEN")) | |
append_answer_lock = threading.Lock() | |
custom_role_conversions = {"tool-call": "assistant", "tool-response": "user"} | |
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" | |
BROWSER_CONFIG = { | |
"viewport_size": 1024 * 5, | |
"downloads_folder": "downloads_folder", | |
"request_kwargs": { | |
"headers": {"User-Agent": user_agent}, | |
"timeout": 300, | |
}, | |
"serpapi_key": os.getenv("SERPAPI_API_KEY"), | |
} | |
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True) | |
model = HfApiModel( | |
custom_role_conversions=custom_role_conversions, | |
) | |
text_limit = 20000 | |
ti_tool = TextInspectorTool(model, text_limit) | |
browser = SimpleTextBrowser(**BROWSER_CONFIG) | |
WEB_TOOLS = [ | |
web_search, | |
VisitTool(browser), | |
PageUpTool(browser), | |
PageDownTool(browser), | |
FinderTool(browser), | |
FindNextTool(browser), | |
ArchiveSearchTool(browser), | |
TextInspectorTool(model, text_limit), | |
] | |
# Agent creation in a factory function | |
def create_agent(): | |
"""Creates a fresh agent instance for each session""" | |
return CodeAgent( | |
model=model, | |
tools=[visualizer] + WEB_TOOLS, | |
max_steps=10, | |
verbosity_level=1, | |
additional_authorized_imports=AUTHORIZED_IMPORTS, | |
planning_interval=4, | |
) | |
document_inspection_tool = TextInspectorTool(model, 20000) | |
class GradioUI: | |
"""A one-line interface to launch your agent in Gradio""" | |
def __init__(self, file_upload_folder: str | None = None): | |
self.file_upload_folder = file_upload_folder | |
if self.file_upload_folder is not None: | |
if not os.path.exists(file_upload_folder): | |
os.mkdir(file_upload_folder) | |
def interact_with_agent(self, prompt, messages, session_state): | |
# Get or create session-specific agent | |
if "agent" not in session_state: | |
session_state["agent"] = create_agent() | |
# Adding monitoring | |
try: | |
# log the existence of agent memory | |
has_memory = hasattr(session_state["agent"], "memory") | |
print(f"Agent has memory: {has_memory}") | |
if has_memory: | |
print(f"Memory type: {type(session_state['agent'].memory)}") | |
messages.append(gr.ChatMessage(role="user", content=prompt)) | |
yield messages | |
for msg in stream_to_gradio( | |
session_state["agent"], task=prompt, reset_agent_memory=False | |
): | |
messages.append(msg) | |
yield messages | |
yield messages | |
except Exception as e: | |
print(f"Error in interaction: {str(e)}") | |
raise | |
def upload_file( | |
self, | |
file, | |
file_uploads_log, | |
allowed_file_types=[ | |
"application/pdf", | |
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
"text/plain", | |
], | |
): | |
""" | |
Handle file uploads, default allowed types are .pdf, .docx, and .txt | |
""" | |
if file is None: | |
return gr.Textbox("No file uploaded", visible=True), file_uploads_log | |
try: | |
mime_type, _ = mimetypes.guess_type(file.name) | |
except Exception as e: | |
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log | |
if mime_type not in allowed_file_types: | |
return gr.Textbox("File type disallowed", visible=True), file_uploads_log | |
# Sanitize file name | |
original_name = os.path.basename(file.name) | |
sanitized_name = re.sub( | |
r"[^\w\-.]", "_", original_name | |
) # Replace any non-alphanumeric, non-dash, or non-dot characters with underscores | |
type_to_ext = {} | |
for ext, t in mimetypes.types_map.items(): | |
if t not in type_to_ext: | |
type_to_ext[t] = ext | |
# Ensure the extension correlates to the mime type | |
sanitized_name = sanitized_name.split(".")[:-1] | |
sanitized_name.append("" + type_to_ext[mime_type]) | |
sanitized_name = "".join(sanitized_name) | |
# Save the uploaded file to the specified folder | |
file_path = os.path.join( | |
self.file_upload_folder, os.path.basename(sanitized_name) | |
) | |
shutil.copy(file.name, file_path) | |
return gr.Textbox( | |
f"File uploaded: {file_path}", visible=True | |
), file_uploads_log + [file_path] | |
def log_user_message(self, text_input, file_uploads_log): | |
return ( | |
text_input | |
+ ( | |
f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" | |
if len(file_uploads_log) > 0 | |
else "" | |
), | |
gr.Textbox( | |
value="", | |
interactive=False, | |
placeholder="Please wait while Steps are getting populated", | |
), | |
gr.Button(interactive=False), | |
) | |
def detect_device(self, request: gr.Request): | |
# Check whether the user device is a mobile or a computer | |
if not request: | |
return "Unknown device" | |
# Method 1: Check sec-ch-ua-mobile header | |
is_mobile_header = request.headers.get("sec-ch-ua-mobile") | |
if is_mobile_header: | |
return "Mobile" if "?1" in is_mobile_header else "Desktop" | |
# Method 2: Check user-agent string | |
user_agent = request.headers.get("user-agent", "").lower() | |
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"] | |
if any(keyword in user_agent for keyword in mobile_keywords): | |
return "Mobile" | |
# Method 3: Check platform | |
platform = request.headers.get("sec-ch-ua-platform", "").lower() | |
if platform: | |
if platform in ['"android"', '"ios"']: | |
return "Mobile" | |
elif platform in ['"windows"', '"macos"', '"linux"']: | |
return "Desktop" | |
# Default case if no clear indicators | |
return "Desktop" | |
def launch(self, **kwargs): | |
with gr.Blocks(theme="ocean", fill_height=True) as demo: | |
# Different layouts for mobile and computer devices | |
def layout(request: gr.Request): | |
device = self.detect_device(request) | |
print(f"device - {device}") | |
# Render layout with sidebar | |
if device == "Desktop": | |
with gr.Blocks( | |
fill_height=True, | |
): | |
file_uploads_log = gr.State([]) | |
with gr.Sidebar(): | |
gr.Markdown("""# open Deep Research - free the AI agents! | |
OpenAI just published [Deep Research](https://openai.com/index/introducing-deep-research/), an amazing assistant that can perform deep searches on the web to answer user questions. | |
However, their agent has a huge downside: it's not open. So we've started a 24-hour rush to replicate and open-source it. Our resulting [open-Deep-Research agent](https://github.com/huggingface/smolagents/tree/main/examples/open_deep_research) took the #1 rank of any open submission on the GAIA leaderboard! β¨ | |
You can try a simplified version here that uses `Qwen-Coder-32B` instead of `o1`.<br><br>""") | |
with gr.Group(): | |
gr.Markdown("**Your request**", container=True) | |
text_input = gr.Textbox( | |
lines=3, | |
label="Your request", | |
container=False, | |
placeholder="Enter your prompt here and press Shift+Enter or press the button", | |
) | |
launch_research_btn = gr.Button( | |
"Run", variant="primary" | |
) | |
# If an upload folder is provided, enable the upload feature | |
if self.file_upload_folder is not None: | |
upload_file = gr.File(label="Upload a file") | |
upload_status = gr.Textbox( | |
label="Upload Status", | |
interactive=False, | |
visible=False, | |
) | |
upload_file.change( | |
self.upload_file, | |
[upload_file, file_uploads_log], | |
[upload_status, file_uploads_log], | |
) | |
gr.HTML("<br><br><h4><center>Powered by:</center></h4>") | |
with gr.Row(): | |
gr.HTML("""<div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;"> | |
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png" style="width: 32px; height: 32px; object-fit: contain;" alt="logo"> | |
<a target="_blank" href="https://github.com/huggingface/smolagents"><b>huggingface/smolagents</b></a> | |
</div>""") | |
# Add session state to store session-specific data | |
session_state = gr.State( | |
{} | |
) # Initialize empty state for each session | |
stored_messages = gr.State([]) | |
chatbot = gr.Chatbot( | |
label="open-Deep-Research", | |
type="messages", | |
avatar_images=( | |
None, | |
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
), | |
resizeable=False, | |
scale=1, | |
elem_id="my-chatbot", | |
) | |
text_input.submit( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
launch_research_btn.click( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
# Render simple layout | |
else: | |
with gr.Blocks( | |
fill_height=True, | |
): | |
gr.Markdown("""# open Deep Research - free the AI agents! | |
_Built with [smolagents](https://github.com/huggingface/smolagents)_ | |
OpenAI just published [Deep Research](https://openai.com/index/introducing-deep-research/), a very nice assistant that can perform deep searches on the web to answer user questions. | |
However, their agent has a huge downside: it's not open. So we've started a 24-hour rush to replicate and open-source it. Our resulting [open-Deep-Research agent](https://github.com/huggingface/smolagents/tree/main/examples/open_deep_research) took the #1 rank of any open submission on the GAIA leaderboard! β¨ | |
You can try a simplified version below (uses `Qwen-Coder-32B` instead of `o1`, so much less powerful than the original open-Deep-Research)π""") | |
# Add session state to store session-specific data | |
session_state = gr.State( | |
{} | |
) # Initialize empty state for each session | |
stored_messages = gr.State([]) | |
file_uploads_log = gr.State([]) | |
chatbot = gr.Chatbot( | |
label="open-Deep-Research", | |
type="messages", | |
avatar_images=( | |
None, | |
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
), | |
resizeable=True, | |
scale=1, | |
) | |
# If an upload folder is provided, enable the upload feature | |
if self.file_upload_folder is not None: | |
upload_file = gr.File(label="Upload a file") | |
upload_status = gr.Textbox( | |
label="Upload Status", interactive=False, visible=False | |
) | |
upload_file.change( | |
self.upload_file, | |
[upload_file, file_uploads_log], | |
[upload_status, file_uploads_log], | |
) | |
text_input = gr.Textbox( | |
lines=1, | |
label="Your request", | |
placeholder="Enter your prompt here and press the button", | |
) | |
launch_research_btn = gr.Button( | |
"Run", | |
variant="primary", | |
) | |
text_input.submit( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
launch_research_btn.click( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
demo.launch(debug=True, **kwargs) | |
GradioUI().launch() |