Skip to content

feat: deep search sandbox #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions demohouse/deep_search_mcp/backend/agent/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ async def astream(
)
if self.state_manager:
await self.state_manager.dump(global_state.custom_state)
return

while planning.get_todos():
next_todo = planning.get_next_todo()
Expand Down Expand Up @@ -214,6 +215,7 @@ async def _make_planning(
and chunk.choices
and chunk.choices[0].delta.content
):
buffer_response += chunk.choices[0].delta.content
yield OutputTextEvent(delta=chunk.choices[0].delta.content)
if (
isinstance(chunk, ChatCompletionChunk)
Expand Down
10 changes: 9 additions & 1 deletion demohouse/deep_search_mcp/backend/agent/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from models.planning import PlanningItem, Planning
from prompt.worker import DEFAULT_WORKER_PROMPT
from state.global_state import GlobalState
from tools.browser import browser_debug_stream
from utils.common import get_env_info
from utils.converter import (
convert_post_tool_call_to_event,
Expand Down Expand Up @@ -79,13 +80,20 @@ async def astream(
self.record_usage(chunk, global_state.custom_state.total_usage)
if isinstance(chunk, ToolChunk):
if chunk.tool_exception or chunk.tool_response:
print("tool", chunk.model_dump())
# post
yield convert_post_tool_call_to_event(
event = convert_post_tool_call_to_event(
function_name=chunk.tool_name,
function_parameter=chunk.tool_arguments,
function_result=chunk.tool_response,
exception=chunk.tool_exception,
)
print("event", event.model_dump())
yield event

if "create_browser_use_task" in chunk.tool_name:
async for browser_event in browser_debug_stream(event.task_id, event.pod_name):
yield browser_event
else:
# pre
yield convert_pre_tool_call_to_event(
Expand Down
10 changes: 10 additions & 0 deletions demohouse/deep_search_mcp/backend/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@
SUMMARY_LLM_MODEL = os.environ.get('SUMMARY_LLM_MODEL') or 'deepseek-r1-250120'

COLLECTION_DESCRIPTION = os.environ.get('COLLECTION_DESCRIPTION') or '私域知识'

BROWSER_USE_ENDPOINT = os.environ.get('BROWSER_USE_ENDPOINT') or ''

RESUME_SLEEP_SECS = int(os.environ.get('RESUME_SLEEP_SECS') or '30')

RETRY_SLEEP_SECS = int(os.environ.get('RETRY_SLEEP_SECS') or '2')

RETRY_TIMES = int(os.environ.get('RETRY_TIMES') or '5')

BROWSER_USE_AUTH_KEY = os.environ.get('BROWSER_USE_AUTH_KEY') or ''
3 changes: 3 additions & 0 deletions demohouse/deep_search_mcp/backend/mcp_servers_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"search": {
"url": "http://localhost:7001/sse"
},
"code": {
"url": "http://localhost:7002/sse"
},
"tls": {
"url": "http://localhost:7003/sse"
},
Expand Down
4 changes: 4 additions & 0 deletions demohouse/deep_search_mcp/backend/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ class ReasoningEvent(MessageEvent):
class ToolCallEvent(BaseEvent):
type: str = ''
status: str = 'pending'
function_name: str = ''


class ToolCompletedEvent(BaseEvent):
type: str = ''
status: str = 'completed'
success: bool = True
error_msg: str = ''
function_name: str = ''


"""
Expand Down Expand Up @@ -132,12 +134,14 @@ class LinkReaderToolCompletedEvent(ToolCompletedEvent):
class PythonExecutorToolCallEvent(ToolCallEvent):
type: str = "python_executor"
code: str = ""
fetch_files: Optional[List[str]] = []


class PythonExecutorToolCompletedEvent(ToolCompletedEvent):
type: str = "python_executor"
code: str = ""
stdout: str = ""
files: Optional[Dict[str, Any]] = {}


"""
Expand Down
3 changes: 2 additions & 1 deletion demohouse/deep_search_mcp/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ dependencies = [
"mcp_server_ark @ git+https://github.com/volcengine/ai-app-lab.git@main#subdirectory=mcp/server/mcp_server_ark",
"mcp_server_tls @ git+https://github.com/volcengine/ai-app-lab.git@main#subdirectory=mcp/server/mcp_server_tls",
"mcp_server_vefaas_browser_use @ git+https://github.com/volcengine/mcp-server.git@main#subdirectory=server/mcp_server_vefaas_browser_use",
"mcp_server_ppt @ git+https://github.com/volcengine/ai-app-lab.git@main#subdirectory=mcp/server/mcp_server_ppt"
"mcp_server_ppt @ git+https://github.com/volcengine/ai-app-lab.git@main#subdirectory=mcp/server/mcp_server_ppt",
"mcp_server_vefaas_sandbox @ git+https://github.com/volcengine/ai-app-lab.git@main#subdirectory=mcp/server/mcp_server_vefaas_sandbox"
]
[dependency-groups]
dev = ["poetry-plugin-export>=1.9.0,<2"]
Expand Down
3 changes: 2 additions & 1 deletion demohouse/deep_search_mcp/backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ mcp-server-ark @ git+https://github.com/volcengine/ai-app-lab.git@edd7824ca863a1
mcp-server-knowledgebase @ git+https://github.com/volcengine/ai-app-lab.git@edd7824ca863a1134258ecd6ba1d7e9afedd1f6a#subdirectory=mcp/server/mcp_server_knowledgebase
mcp-server-ppt @ git+https://github.com/volcengine/ai-app-lab.git@9d1d316705c54680c0e78841eb4c14bd51d2dd25#subdirectory=mcp/server/mcp_server_ppt
mcp-server-tls @ git+https://github.com/volcengine/ai-app-lab.git@edd7824ca863a1134258ecd6ba1d7e9afedd1f6a#subdirectory=mcp/server/mcp_server_tls
mcp-server-vefaas-browser-use @ git+https://github.com/volcengine/mcp-server.git@6b0eb21383657387bec873861ee83e961c10712c#subdirectory=server/mcp_server_vefaas_browser_use
mcp-server-vefaas-browser-use @ git+https://github.com/zhilulian/ai-app-lab.git@3e4dc7bfd539ed24988d5978c41b25d92a6f3851#subdirectory=mcp/server/mcp_server_vefaas_browser_use
mcp-server-vefaas-sandbox @ git+https://github.com/zhilulian/ai-app-lab.git@01879352d0819a96dbc586fe49dcb09108c889ae#subdirectory=mcp/server/mcp_server_vefaas_sandbox
mdurl==0.1.2
more-itertools==10.6.0
msgpack==1.1.0
Expand Down
26 changes: 16 additions & 10 deletions demohouse/deep_search_mcp/backend/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,13 @@ def get_workers(
post_tool_call_hook=SearcherPostToolCallHook(global_state=global_state),
)

# coder = Worker(
# llm_model=WORKER_LLM_MODEL, name='coder',
# instruction='编写和运行python代码',
# tools=[
# mcp_clients.get('code')
# ],
# post_tool_call_hook=PythonExecutorPostToolCallHook()
# )
coder = Worker(
llm_model=WORKER_LLM_MODEL, name='coder',
instruction="""代码生成与执行单元,提供了在一个沙盒环境中,生成并执行Python代码的能力。应当作为兜底能力使用,当且仅当其他能力无法满足任务要求、且任务可以通过代码实现时,使用此单元。当代码中包含生成一个html文件时,调用run_code的fetch_files参数必须为包含该html文件名的列表。调用时,tool_arguments必须是一个为一个合法的json字符串。
""",
tools=[mcp_clients.get('code')],
post_tool_call_hook=PythonExecutorPostToolCallHook()
)

log_retriever = Worker(
llm_model=WORKER_LLM_MODEL,
Expand Down Expand Up @@ -178,15 +177,21 @@ def get_workers(
tools=[mcp_clients.get("browser")]
)

llm_generator = Worker(
llm_model=WORKER_LLM_MODEL,
name="llm_generator",
instruction="文本生成单元。提供了向用户呈现文本形式结果的能力。当你需要输出最终结果、撰写报告、或是汇报任务进展时,使用此工具。",
)

if global_state.custom_state.enabled_mcp_servers:
# add dynamic mask
if (
"web_search" in global_state.custom_state.enabled_mcp_servers
or "link_reader" in global_state.custom_state.enabled_mcp_servers
):
workers.update({"searcher": searcher})
# if 'code' in global_state.custom_state.enabled_mcp_servers:
# workers.update({'coder': coder})
if 'code' in global_state.custom_state.enabled_mcp_servers:
workers.update({'coder': coder})
if "tls" in global_state.custom_state.enabled_mcp_servers:
workers.update({"log_retriever": log_retriever})
if "knowledgebase" in global_state.custom_state.enabled_mcp_servers:
Expand All @@ -195,6 +200,7 @@ def get_workers(
workers.update({"browser_user": browser_user})
if "chatppt" in global_state.custom_state.enabled_mcp_servers:
workers.update({"ppt_generator": ppt_generator})
workers.update({"llm_generator": llm_generator})

return workers
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ directory=. ; directory to cwd to before exec (def no cwd)
; startsecs=1 ; # of secs prog must stay up to be running (def. 1)
autorestart=true ; when to restart if exited after running (def: unexpected)
startsecs=180
stdout_logfile=/dev/stdout
stdout_logfile=/tmp/sandbox.std.out
stdout_logfile_maxbytes=32MB
stderr_logfile=/dev/stderr
stderr_logfile=/tmp/sandbox.err.out
stderr_logfile_maxbytes=32MB
environment=PORT="7002"
172 changes: 172 additions & 0 deletions demohouse/deep_search_mcp/backend/tools/browser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# Licensed under the 【火山方舟】原型应用软件自用许可协议
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.volcengine.com/docs/82379/1433703
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import re
import traceback
import aiohttp

from config.config import BROWSER_USE_ENDPOINT, SESSION_SAVE_PATH, RETRY_SLEEP_SECS, RESUME_SLEEP_SECS, RETRY_TIMES, \
BROWSER_USE_AUTH_KEY
from models.events import BrowserUseToolCompletedEvent


async def browser_debug_stream(task_id, pod_name):
browser_use_client = BrowserUseClient(task_id=task_id, pod_name=pod_name)
queue = asyncio.Queue()

async def produce_astream():
try:
async for event in browser_use_client.astream():
await queue.put(event)
finally:
await queue.put(None) # 表示astream结束

async def produce_url():
try:
url_event = await browser_use_client.get_url()
await queue.put(url_event)
finally:
await queue.put(None) # 表示get_url结束

astream_task = asyncio.create_task(produce_astream())
url_task = asyncio.create_task(produce_url())

finished = 0
while finished < 2:
item = await queue.get()
if item is None:
finished += 1
else:
yield item

await asyncio.gather(astream_task, url_task)


class BrowserUseClient:
def __init__(self, task_id, pod_name):
self.task_id = task_id
self.pod_name = pod_name
self.endpoint = re.sub(r'^https?://', '', BROWSER_USE_ENDPOINT)
self.stream_url = f"https://{self.endpoint}/tasks/{task_id}/stream"
self.version_url = f"https://{self.endpoint}/tasks/{task_id}/devtools/json/version"
self.resume_url = f"https://{self.endpoint}/tasks/{task_id}/resume"
self.headers = {
"X-Faas-Event-Type": "http",
"Content-Type": "application/json",
"x-faas-instance-name": pod_name,
"Authorization": f"Bearer {BROWSER_USE_AUTH_KEY}" if BROWSER_USE_AUTH_KEY else "",
}
print(self.headers)

async def astream(self):
print("astream...")
retries = 10
for _ in range(retries):
try:
timeout = aiohttp.ClientTimeout(
total=None,
sock_connect=10,
sock_read=900
)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self.stream_url, headers=self.headers) as response:
print("response", response)
if response.status != 200:
continue

async for line in response.content:
line = line.decode().strip()
print(line)
if line.startswith("data: "):
result = json.loads(line.removeprefix("data: "))
task_id = result.get("task_id", "")
result_data = json.loads(result.get("data", "").removeprefix("data: "))
status = result_data.get("status")
metadata = result_data.get("metadata", {})
action = metadata.get("data", {}).get("message", "")

yield BrowserUseToolCompletedEvent(
status=status,
task_id=task_id,
metadata=result_data.get("metadata", {}),
)
if "'pause': {'reason'" in action:
# post resume_url after 30s
asyncio.create_task(self.resume_task())

if status == "completed":
with open(f"{SESSION_SAVE_PATH}/{task_id}.txt", mode="w") as result_file:
result_file.write(line)

print("astream end...")
return
except Exception as e:
traceback.print_exc()
print(e)
yield BrowserUseToolCompletedEvent(
success=False,
status="steam_failed",
task_id=task_id,
)
await asyncio.sleep(2)

async def get_url(self):
retries = 5
for attempt in range(retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.version_url, headers=self.headers) as response:
if response.status == 200:
response_json = await response.json()
print(response_json)
websocket_url = response_json.get("webSocketDebuggerUrl")
if websocket_url:
websocket_url = websocket_url.replace("None", f"{self.endpoint}/tasks/{self.task_id}")
websocket_url = websocket_url.replace("ws://", "wss://")
websocket_url += f"?faasInstanceName={self.pod_name}"

print("websocket_url", websocket_url)
return BrowserUseToolCompletedEvent(
status="url_fetched",
task_id=self.task_id,
url=websocket_url,
)
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")

await asyncio.sleep(2)

return BrowserUseToolCompletedEvent(
success=False,
status="get_url_failed",
task_id=self.task_id,
url="",
)

async def resume_task(self):
await asyncio.sleep(RESUME_SLEEP_SECS)
print("start resume")
retries = RETRY_TIMES
for _ in range(retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.resume_url, headers=self.headers) as resume_response:
if resume_response.status == 200:
print(f"resume {self.task_id} success")
return
else:
text = await resume_response.text()
print(f"resume {self.task_id} failed, failed text = {text}, resume_response = {resume_response}")
continue
except Exception as e:
print("resume failed", e)
await asyncio.sleep(RETRY_SLEEP_SECS)
Loading