Spaces:
Running
Running
import json | |
import mimetypes | |
import os | |
import re | |
import shutil | |
import threading | |
from typing import Optional | |
from loguru import logger | |
import gradio as gr | |
from dotenv import load_dotenv | |
# from huggingface_hub import login | |
from smolagents import ( | |
CodeAgent, | |
# HfApiModel, | |
# LiteLLMModel, | |
OpenAIServerModel, | |
Tool, | |
# GoogleSearchTool, | |
DuckDuckGoSearchTool, | |
) | |
from smolagents.agent_types import ( | |
AgentAudio, | |
AgentImage, | |
AgentText, | |
handle_agent_output_types, | |
) | |
from smolagents.gradio_ui import stream_to_gradio | |
from scripts.text_inspector_tool import TextInspectorTool | |
from scripts.text_web_browser import ( | |
ArchiveSearchTool, | |
FinderTool, | |
FindNextTool, | |
PageDownTool, | |
PageUpTool, | |
SimpleTextBrowser, | |
VisitTool, | |
) | |
from scripts.visual_qa import visualizer | |
# web_search = GoogleSearchTool(provider="serper") | |
web_search = DuckDuckGoSearchTool() | |
# print(web_search(query="Donald Trump news")) | |
# TODO fix ValueError: {'message': 'Unauthorized.', 'statusCode': 403} | |
# quit() | |
AUTHORIZED_IMPORTS = [ | |
"requests", | |
"zipfile", | |
"pandas", | |
"numpy", | |
"sympy", | |
"json", | |
"bs4", | |
"pubchempy", | |
"xml", | |
"yahoo_finance", | |
"Bio", | |
"sklearn", | |
"scipy", | |
"pydub", | |
"PIL", | |
"chess", | |
"PyPDF2", | |
"pptx", | |
"torch", | |
"datetime", | |
"fractions", | |
"csv", | |
] | |
load_dotenv(override=True) | |
# login(os.getenv("HF_TOKEN")) # this is not necessary if env var HF_TOKEN is set | |
append_answer_lock = threading.Lock() | |
custom_role_conversions = {"tool-call": "assistant", "tool-response": "user"} | |
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" | |
BROWSER_CONFIG = { | |
"viewport_size": 1024 * 5, | |
"downloads_folder": "downloads_folder", | |
"request_kwargs": { | |
"headers": {"User-Agent": user_agent}, | |
"timeout": 300, | |
}, | |
"serpapi_key": os.getenv("SERPAPI_API_KEY"), | |
} | |
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True) | |
model_id = os.getenv("MODEL_ID", "deepseek-ai/DeepSeek-V3") | |
_ = "" if os.getenv("OPENAI_API_KEY") is None else os.getenv("OPENAI_API_KEY")[:8] + "..." | |
if os.getenv("MODEL_ID") and os.getenv("OPENAI_API_BASE"): | |
logger.debug(f"""using OpenAIServerModel: {model_id=}, {os.getenv("OPENAI_API_BASE")=}, os.getenv("OPENAI_API_BASE")={_}""") | |
# model = LiteLLMModel( | |
model = OpenAIServerModel( | |
# "gpt-4o", | |
# os.getenv("MODEL_ID", "gpt-4o-mini"), | |
model_id, | |
custom_role_conversions=custom_role_conversions, | |
api_base=os.getenv("OPENAI_API_BASE"), | |
api_key=os.getenv("OPENAI_API_KEY"), | |
) | |
else: | |
logger.debug(f"""using LiteLLMModel: HfApiModel default model_id=Qwen/Qwen2.5-Coder-32B-Instruct""") | |
model = HfApiModel( | |
custom_role_conversions=custom_role_conversions, | |
) | |
text_limit = 20000 | |
ti_tool = TextInspectorTool(model, text_limit) | |
browser = SimpleTextBrowser(**BROWSER_CONFIG) | |
WEB_TOOLS = [ | |
web_search, # duckduckgo | |
VisitTool(browser), | |
PageUpTool(browser), | |
PageDownTool(browser), | |
FinderTool(browser), | |
FindNextTool(browser), | |
ArchiveSearchTool(browser), | |
TextInspectorTool(model, text_limit), | |
] | |
# Agent creation in a factory function | |
def create_agent(): | |
"""Creates a fresh agent instance for each session""" | |
return CodeAgent( | |
model=model, | |
tools=[visualizer] + WEB_TOOLS, | |
max_steps=10, | |
verbosity_level=1, | |
additional_authorized_imports=AUTHORIZED_IMPORTS, | |
planning_interval=4, | |
) | |
document_inspection_tool = TextInspectorTool(model, 20000) | |
class GradioUI: | |
"""A one-line interface to launch your agent in Gradio""" | |
def __init__(self, file_upload_folder: str | None = None): | |
self.file_upload_folder = file_upload_folder | |
if self.file_upload_folder is not None: | |
if not os.path.exists(file_upload_folder): | |
os.mkdir(file_upload_folder) | |
def interact_with_agent(self, prompt, messages, session_state): | |
# Get or create session-specific agent | |
if "agent" not in session_state: | |
session_state["agent"] = create_agent() | |
# Adding monitoring | |
try: | |
# log the existence of agent memory | |
has_memory = hasattr(session_state["agent"], "memory") | |
print(f"Agent has memory: {has_memory}") | |
if has_memory: | |
print(f"Memory type: {type(session_state['agent'].memory)}") | |
messages.append(gr.ChatMessage(role="user", content=prompt)) | |
yield messages | |
for msg in stream_to_gradio( | |
session_state["agent"], task=prompt, reset_agent_memory=False | |
): | |
messages.append(msg) | |
yield messages | |
yield messages | |
except Exception as e: | |
print(f"Error in interaction: {str(e)}") | |
raise | |
def upload_file( | |
self, | |
file, | |
file_uploads_log, | |
allowed_file_types=[ | |
"application/pdf", | |
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
"text/plain", | |
], | |
): | |
""" | |
Handle file uploads, default allowed types are .pdf, .docx, and .txt | |
""" | |
if file is None: | |
return gr.Textbox("No file uploaded", visible=True), file_uploads_log | |
try: | |
mime_type, _ = mimetypes.guess_type(file.name) | |
except Exception as e: | |
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log | |
if mime_type not in allowed_file_types: | |
return gr.Textbox("File type disallowed", visible=True), file_uploads_log | |
# Sanitize file name | |
original_name = os.path.basename(file.name) | |
sanitized_name = re.sub( | |
r"[^\w\-.]", "_", original_name | |
) # Replace any non-alphanumeric, non-dash, or non-dot characters with underscores | |
type_to_ext = {} | |
for ext, t in mimetypes.types_map.items(): | |
if t not in type_to_ext: | |
type_to_ext[t] = ext | |
# Ensure the extension correlates to the mime type | |
sanitized_name = sanitized_name.split(".")[:-1] | |
sanitized_name.append("" + type_to_ext[mime_type]) | |
sanitized_name = "".join(sanitized_name) | |
# Save the uploaded file to the specified folder | |
file_path = os.path.join( | |
self.file_upload_folder, os.path.basename(sanitized_name) | |
) | |
shutil.copy(file.name, file_path) | |
return gr.Textbox( | |
f"File uploaded: {file_path}", visible=True | |
), file_uploads_log + [file_path] | |
def log_user_message(self, text_input, file_uploads_log): | |
return ( | |
text_input | |
+ ( | |
f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" | |
if len(file_uploads_log) > 0 | |
else "" | |
), | |
gr.Textbox( | |
value="", | |
interactive=False, | |
placeholder="Please wait while Steps are getting populated", | |
), | |
gr.Button(interactive=False), | |
) | |
def detect_device(self, request: gr.Request): | |
# Check whether the user device is a mobile or a computer | |
if not request: | |
return "Unknown device" | |
# Method 1: Check sec-ch-ua-mobile header | |
is_mobile_header = request.headers.get("sec-ch-ua-mobile") | |
if is_mobile_header: | |
return "Mobile" if "?1" in is_mobile_header else "Desktop" | |
# Method 2: Check user-agent string | |
user_agent = request.headers.get("user-agent", "").lower() | |
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"] | |
if any(keyword in user_agent for keyword in mobile_keywords): | |
return "Mobile" | |
# Method 3: Check platform | |
platform = request.headers.get("sec-ch-ua-platform", "").lower() | |
if platform: | |
if platform in ['"android"', '"ios"']: | |
return "Mobile" | |
elif platform in ['"windows"', '"macos"', '"linux"']: | |
return "Desktop" | |
# Default case if no clear indicators | |
return "Desktop" | |
def launch(self, **kwargs): | |
with gr.Blocks(theme="ocean", fill_height=True) as demo: | |
# Different layouts for mobile and computer devices | |
def layout(request: gr.Request): | |
device = self.detect_device(request) | |
print(f"device - {device}") | |
# Render layout with sidebar | |
if device == "Desktop": | |
with gr.Blocks( | |
fill_height=True, | |
): | |
file_uploads_log = gr.State([]) | |
with gr.Sidebar(): | |
with gr.Group(): | |
gr.Markdown("**Your request**", container=True) | |
text_input = gr.Textbox( | |
lines=3, | |
label="Your request", | |
container=False, | |
placeholder="Enter your prompt here and press Shift+Enter or press the button", | |
) | |
launch_research_btn = gr.Button( | |
"Run", variant="primary" | |
) | |
# If an upload folder is provided, enable the upload feature | |
if self.file_upload_folder is not None: | |
upload_file = gr.File(label="Upload a file") | |
upload_status = gr.Textbox( | |
label="Upload Status", | |
interactive=False, | |
visible=False, | |
) | |
upload_file.change( | |
self.upload_file, | |
[upload_file, file_uploads_log], | |
[upload_status, file_uploads_log], | |
) | |
# gr.HTML("<h4><center>Powered by huggingface/smolagents</center></h4>") | |
# gr.Markdown("Powered by [huggingface/smolagents](https://github.com/huggingface/smolagents)") | |
# _ = ''' | |
with gr.Row(): | |
gr.HTML("""<div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">Powered by | |
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png" style="width: 32px; height: 32px; object-fit: contain;" alt="logo"> | |
<a target="_blank" href="https://github.com/huggingface/smolagents"><b>hf/smolagents</b></a> | |
</div>""") | |
# ''' | |
# ----- | |
with gr.Accordion("🎈 Info", open=False): | |
gr.Markdown("""### open Deep Research - free the AI agents! | |
OpenAI just (February 2, 2025) published [Deep Research](https://openai.com/index/introducing-deep-research/), an amazing assistant that can perform deep searches on the web to answer user questions. | |
However, their agent has a huge downside: it's not open. So we've started a 24-hour rush to replicate and open-source it. Our (Huggingface's) resulting [open-Deep-Research agent](https://github.com/huggingface/smolagents/tree/main/examples/open_deep_research) took the #1 rank of any open submission on the GAIA leaderboard! ✨ | |
You can try a simplified version here that uses `Qwen-Coder-32B` (via smolagnet.HfApiModel) instead of `o1`. Modified: if you set MODEL_ID, OPENAI_API_BASE and OPENAI_API_KEY in the .env or env vars (in hf space these can be set in settings, .env will override env vars), the correspoding model will be used. N.B. if you see errors, it might be because whatever quota is exceeded, clone/duplicate this space and plug in your own resources and run your own deep-research.<br><br>""") | |
# Add session state to store session-specific data | |
session_state = gr.State( | |
{} | |
) # Initialize empty state for each session | |
stored_messages = gr.State([]) | |
chatbot = gr.Chatbot( | |
label="open-Deep-Research", | |
type="messages", | |
avatar_images=( | |
None, | |
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
), | |
resizeable=False, | |
scale=1, | |
elem_id="my-chatbot", | |
) | |
text_input.submit( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
launch_research_btn.click( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
# Render simple layout | |
else: | |
with gr.Blocks( | |
fill_height=True, | |
): | |
gr.Markdown("""# open Deep Research - free the AI agents! | |
_Built with [smolagents](https://github.com/huggingface/smolagents)_ | |
OpenAI just published [Deep Research](https://openai.com/index/introducing-deep-research/), a very nice assistant that can perform deep searches on the web to answer user questions. | |
However, their agent has a huge downside: it's not open. So we've started a 24-hour rush to replicate and open-source it. Our resulting [open-Deep-Research agent](https://github.com/huggingface/smolagents/tree/main/examples/open_deep_research) took the #1 rank of any open submission on the GAIA leaderboard! ✨ | |
You can try a simplified version below (uses `Qwen-Coder-32B` instead of `o1`, so much less powerful than the original open-Deep-Research)👇""") | |
# Add session state to store session-specific data | |
session_state = gr.State( | |
{} | |
) # Initialize empty state for each session | |
stored_messages = gr.State([]) | |
file_uploads_log = gr.State([]) | |
chatbot = gr.Chatbot( | |
label="open-Deep-Research", | |
type="messages", | |
avatar_images=( | |
None, | |
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
), | |
resizeable=True, | |
scale=1, | |
) | |
# If an upload folder is provided, enable the upload feature | |
if self.file_upload_folder is not None: | |
upload_file = gr.File(label="Upload a file") | |
upload_status = gr.Textbox( | |
label="Upload Status", interactive=False, visible=False | |
) | |
upload_file.change( | |
self.upload_file, | |
[upload_file, file_uploads_log], | |
[upload_status, file_uploads_log], | |
) | |
text_input = gr.Textbox( | |
lines=1, | |
label="Your request", | |
placeholder="Enter your prompt here and press the button", | |
) | |
launch_research_btn = gr.Button( | |
"Run", | |
variant="primary", | |
) | |
text_input.submit( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
launch_research_btn.click( | |
self.log_user_message, | |
[text_input, file_uploads_log], | |
[stored_messages, text_input, launch_research_btn], | |
).then( | |
self.interact_with_agent, | |
# Include session_state in function calls | |
[stored_messages, chatbot, session_state], | |
[chatbot], | |
).then( | |
lambda: ( | |
gr.Textbox( | |
interactive=True, | |
placeholder="Enter your prompt here and press the button", | |
), | |
gr.Button(interactive=True), | |
), | |
None, | |
[text_input, launch_research_btn], | |
) | |
demo.launch(debug=True, **kwargs) | |
# can this fix ctrl-c no response? no | |
try: | |
GradioUI().launch() | |
except KeyboardInterrupt: | |
... |