# -*- coding: utf-8 -*- # @Time : 2025/1/1 # @Author : wenshao # @Email : wenshaoguo1026@gmail.com # @Project : browser-use-webui # @FileName: webui.py from dotenv import load_dotenv load_dotenv() import argparse import gradio as gr import os import asyncio from playwright.async_api import async_playwright from browser_use.browser.browser import Browser, BrowserConfig from browser_use.browser.context import ( BrowserContextConfig, BrowserContextWindowSize, ) from browser_use.agent.service import Agent from src.browser.custom_browser import CustomBrowser from src.controller.custom_controller import CustomController from src.agent.custom_agent import CustomAgent from src.agent.custom_prompts import CustomSystemPrompt from src.utils import utils from src.utils.file_utils import get_latest_files from src.utils.stream_utils import stream_browser_view, capture_screenshot async def run_browser_agent( agent_type, llm_provider, llm_model_name, llm_temperature, llm_base_url, llm_api_key, use_own_browser, headless, disable_security, window_w, window_h, save_recording_path, task, add_infos, max_steps, use_vision, browser_context=None # Added optional argument ): """ Runs the browser agent based on user configurations. """ llm = utils.get_llm_model( provider=llm_provider, model_name=llm_model_name, temperature=llm_temperature, base_url=llm_base_url, api_key=llm_api_key ) if agent_type == "org": return await run_org_agent( llm=llm, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, task=task, max_steps=max_steps, use_vision=use_vision, browser_context=browser_context # pass context ) elif agent_type == "custom": return await run_custom_agent( llm=llm, use_own_browser=use_own_browser, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, task=task, add_infos=add_infos, max_steps=max_steps, use_vision=use_vision, browser_context=browser_context # pass context ) else: raise ValueError(f"Invalid agent type: {agent_type}") async def run_org_agent( llm, headless, disable_security, window_w, window_h, save_recording_path, task, max_steps, use_vision, browser_context=None # receive context ): browser = None if browser_context is None: browser = Browser( config=BrowserConfig( headless=False, # Force non-headless for streaming disable_security=disable_security, extra_chromium_args=[f'--window-size={window_w},{window_h}'], ) ) async with await browser.new_context( config=BrowserContextConfig( trace_path='./tmp/traces', save_recording_path=save_recording_path if save_recording_path else None, no_viewport=False, browser_window_size=BrowserContextWindowSize(width=window_w, height=window_h), ) ) as browser_context_in: agent = Agent( task=task, llm=llm, use_vision=use_vision, browser_context=browser_context_in, ) history = await agent.run(max_steps=max_steps) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() recorded_files = get_latest_files(save_recording_path) trace_file = get_latest_files(save_recording_path + "/../traces") await browser.close() return final_result, errors, model_actions, model_thoughts, recorded_files.get('.webm'), trace_file.get('.zip') else: # Reuse existing context agent = Agent( task=task, llm=llm, use_vision=use_vision, browser_context=browser_context ) history = await agent.run(max_steps=max_steps) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() recorded_files = get_latest_files(save_recording_path) trace_file = get_latest_files(save_recording_path + "/../traces") return final_result, errors, model_actions, model_thoughts, recorded_files.get('.webm'), trace_file.get('.zip') async def run_custom_agent( llm, use_own_browser, headless, disable_security, window_w, window_h, save_recording_path, task, add_infos, max_steps, use_vision, browser_context=None # receive context ): controller = CustomController() playwright = None browser = None try: if use_own_browser: playwright = await async_playwright().start() chrome_exe = os.getenv("CHROME_PATH", "") chrome_use_data = os.getenv("CHROME_USER_DATA", "") browser_context_ = await playwright.chromium.launch_persistent_context( user_data_dir=chrome_use_data, executable_path=chrome_exe, no_viewport=False, headless=headless, # 保持浏览器窗口可见 user_agent=( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36' ), java_script_enabled=True, bypass_csp=disable_security, ignore_https_errors=disable_security, record_video_dir=save_recording_path if save_recording_path else None, record_video_size={'width': window_w, 'height': window_h} ) else: browser_context_ = None if browser_context is not None: # Reuse context agent = CustomAgent( task=task, add_infos=add_infos, use_vision=use_vision, llm=llm, browser_context=browser_context, controller=controller, system_prompt_class=CustomSystemPrompt ) history = await agent.run(max_steps=max_steps) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() recorded_files = get_latest_files(save_recording_path) trace_file = get_latest_files(save_recording_path + "/../traces") return final_result, errors, model_actions, model_thoughts, recorded_files.get('.webm'), trace_file.get('.zip') else: browser = CustomBrowser( config=BrowserConfig( headless=headless, disable_security=disable_security, extra_chromium_args=[f'--window-size={window_w},{window_h}'], ) ) async with await browser.new_context( config=BrowserContextConfig( trace_path='./tmp/result_processing', save_recording_path=save_recording_path if save_recording_path else None, no_viewport=False, browser_window_size=BrowserContextWindowSize(width=window_w, height=window_h), ), context=browser_context_ ) as browser_context_in: agent = CustomAgent( task=task, add_infos=add_infos, use_vision=use_vision, llm=llm, browser_context=browser_context_in, controller=controller, system_prompt_class=CustomSystemPrompt ) history = await agent.run(max_steps=max_steps) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() recorded_files = get_latest_files(save_recording_path) except Exception as e: import traceback traceback.print_exc() final_result = "" errors = str(e) + "\n" + traceback.format_exc() model_actions = "" model_thoughts = "" recorded_files = {} finally: # 显式关闭持久化上下文 if browser_context_: await browser_context_.close() # 关闭 Playwright 对象 if playwright: await playwright.stop() if browser: await browser.close() return final_result, errors, model_actions, model_thoughts, recorded_files.get('.webm'), recorded_files.get('.zip') async def run_with_stream(*args): """Wrapper to run agent and handle streaming""" browser = None try: browser = CustomBrowser(config=BrowserConfig( headless=False, disable_security=args[8], extra_chromium_args=[f'--window-size={args[9]},{args[10]}'], )) async with await browser.new_context( config=BrowserContextConfig( trace_path='./tmp/traces', save_recording_path=args[11], no_viewport=False, browser_window_size=BrowserContextWindowSize(width=args[9], height=args[10]), ) ) as browser_context: # No need to explicitly create page - context creation handles it # Run agent in background agent_task = asyncio.create_task(run_browser_agent(*args, browser_context=browser_context)) # Initialize values html_content = "
Starting browser...
" final_result = errors = model_actions = model_thoughts = "" recording = trace = None while not agent_task.done(): try: html_content = await capture_screenshot(browser_context) except Exception as e: html_content = f"
Screenshot error: {str(e)}
" yield [html_content, final_result, errors, model_actions, model_thoughts, recording, trace] await asyncio.sleep(0.01) # Get agent results when done try: result = await agent_task if isinstance(result, tuple) and len(result) == 6: final_result, errors, model_actions, model_thoughts, recording, trace = result else: errors = "Unexpected result format from agent" except Exception as e: errors = f"Agent error: {str(e)}" yield [ html_content, final_result, errors, model_actions, model_thoughts, recording, trace ] except Exception as e: import traceback yield [ f"
Browser error: {str(e)}
", "", f"Error: {str(e)}\n{traceback.format_exc()}", "", "", None, None ] finally: if browser: await browser.close() def main(): # Gradio UI setup with gr.Blocks(title="Browser Use WebUI", theme=gr.themes.Soft(font=[gr.themes.GoogleFont("Plus Jakarta Sans")])) as demo: gr.Markdown("

Browser Use WebUI

") with gr.Tabs(): # Tab for LLM Settings with gr.Tab("LLM Settings"): with gr.Row(): llm_provider = gr.Dropdown( ["anthropic", "openai", "gemini", "azure_openai", "deepseek"], label="LLM Provider", value="gemini" ) llm_model_name = gr.Textbox(label="LLM Model Name", value="gemini-2.0-flash-exp") llm_temperature = gr.Number(label="LLM Temperature", value=1.0) with gr.Row(): llm_base_url = gr.Textbox(label="LLM Base URL") llm_api_key = gr.Textbox(label="LLM API Key", type="password") # Tab for Browser Settings with gr.Tab("Browser Settings"): with gr.Accordion("Browser Settings", open=True): use_own_browser = gr.Checkbox(label="Use Own Browser", value=False) headless = gr.Checkbox(label="Headless", value=False) disable_security = gr.Checkbox(label="Disable Security", value=True) with gr.Row(): window_w = gr.Number(label="Window Width", value=1920) window_h = gr.Number(label="Window Height", value=1080) save_recording_path = gr.Textbox(label="Save Recording Path", placeholder="e.g. ./tmp/record_videos", value="./tmp/record_videos") # Tab for Task Settings with gr.Tab("Task Settings"): with gr.Accordion("Task Settings", open=True): task = gr.Textbox(label="Task", lines=10, value="go to google.com and type 'OpenAI' click search and give me the first url") add_infos = gr.Textbox(label="Additional Infos (Optional): Hints to help LLM complete Task", lines=5) agent_type = gr.Radio(["org", "custom"], label="Agent Type", value="custom") max_steps = gr.Number(label="Max Run Steps", value=100) use_vision = gr.Checkbox(label="Use Vision", value=True) # Tab for Stream + File Download and Agent Thoughts with gr.Tab("Results"): with gr.Column(): # Add live stream viewer before other components browser_view = gr.HTML( label="Live Browser View", value="

Waiting for browser session...

" ) final_result_output = gr.Textbox(label="Final Result", lines=5) errors_output = gr.Textbox(label="Errors", lines=5) model_actions_output = gr.Textbox(label="Model Actions", lines=5) model_thoughts_output = gr.Textbox(label="Model Thoughts", lines=5) with gr.Row(): recording_file = gr.Video(label="Recording File") # Changed from gr.File to gr.Video trace_file = gr.File(label="Trace File (ZIP)") # Add a refresh button refresh_button = gr.Button("Refresh Files") def refresh_files(): recorded_files = get_latest_files("./tmp/record_videos") trace_file = get_latest_files("./tmp/traces") return ( recorded_files.get('.webm') if recorded_files.get('.webm') else None, trace_file.get('.zip') if trace_file.get('.zip') else None ) refresh_button.click( fn=refresh_files, inputs=[], outputs=[recording_file, trace_file] ) # Run button outside tabs for global execution run_button = gr.Button("Run Agent", variant="primary") run_button.click( fn=run_with_stream, inputs=[ agent_type, llm_provider, llm_model_name, llm_temperature, llm_base_url, llm_api_key, use_own_browser, headless, disable_security, window_w, window_h, save_recording_path, task, add_infos, max_steps, use_vision ], outputs=[ browser_view, final_result_output, errors_output, model_actions_output, model_thoughts_output, recording_file, trace_file ], queue=True ) demo.launch(server_name=args.ip, server_port=args.port, share=True) if __name__ == "__main__": # For local development import argparse parser = argparse.ArgumentParser(description="Gradio UI for Browser Agent") parser.add_argument("--ip", type=str, default="0.0.0.0", help="IP address to bind to") parser.add_argument("--port", type=int, default=7860, help="Port to listen on") args = parser.parse_args() main() else: # For Vercel deployment main()