From f438c5e7cf14b4f122c98f032e6c6bd4252999b7 Mon Sep 17 00:00:00 2001 From: Pablo Toledo Date: Wed, 11 Jun 2025 00:55:27 +0200 Subject: [PATCH 1/5] doc: clarify SSE patch rationale --- src/mcp_agent/mcp_server/agent_server.py | 113 ++++++++++++++++++++++- 1 file changed, 111 insertions(+), 2 deletions(-) diff --git a/src/mcp_agent/mcp_server/agent_server.py b/src/mcp_agent/mcp_server/agent_server.py index a0e1f972..2821763d 100644 --- a/src/mcp_agent/mcp_server/agent_server.py +++ b/src/mcp_agent/mcp_server/agent_server.py @@ -1,5 +1,9 @@ -""" -Enhanced AgentMCPServer with robust shutdown handling for SSE transport. +"""Agent MCP server utilities with improved SSE handling. + +This module patches :meth:`FastMCP.sse_app` at runtime to avoid returning a +second :class:`starlette.responses.Response` after the SSE connection completes. +The extra response triggered ``"Unexpected ASGI message 'http.response.start'"`` +errors and caused timeouts when chaining agents in server mode. """ import asyncio @@ -8,8 +12,20 @@ from contextlib import AsyncExitStack, asynccontextmanager from typing import Set +from mcp.server.auth.middleware.auth_context import AuthContextMiddleware +from mcp.server.auth.middleware.bearer_auth import ( + BearerAuthBackend, + RequireAuthMiddleware, +) from mcp.server.fastmcp import Context as MCPContext from mcp.server.fastmcp import FastMCP +from mcp.server.sse import SseServerTransport +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.requests import Request +from starlette.responses import Response +from starlette.routing import Mount, Route import mcp_agent import mcp_agent.core @@ -36,6 +52,99 @@ def __init__( instructions=server_description or f"This server provides access to {len(agent_app._agents)} agents", ) + + # Patch FastMCP.sse_app to avoid returning an extra Response that + # triggers "Unexpected ASGI message 'http.response.start'" errors when + # running in server mode with SSE transport. The upstream implementation + # returns a Response after the EventSourceResponse has already completed + # which causes Starlette to attempt to start a new response on a closed + # connection. Removing that return fixes timeouts when chaining agents. + def patched_sse_app(self_mcp, mount_path: str | None = None): + if mount_path is not None: + self_mcp.settings.mount_path = mount_path + + normalized_message_endpoint = self_mcp._normalize_path( + self_mcp.settings.mount_path, self_mcp.settings.message_path + ) + + sse = SseServerTransport(normalized_message_endpoint) + + async def handle_sse(scope, receive, send): + async with sse.connect_sse(scope, receive, send) as streams: + await self_mcp._mcp_server.run( + streams[0], + streams[1], + self_mcp._mcp_server.create_initialization_options(), + ) + + routes = [] + middleware = [] + required_scopes = [] + + if self_mcp._auth_server_provider: + assert self_mcp.settings.auth + from mcp.server.auth.routes import create_auth_routes + + required_scopes = self_mcp.settings.auth.required_scopes or [] + + middleware = [ + Middleware( + AuthenticationMiddleware, + backend=BearerAuthBackend( + provider=self_mcp._auth_server_provider, + ), + ), + Middleware(AuthContextMiddleware), + ] + routes.extend( + create_auth_routes( + provider=self_mcp._auth_server_provider, + issuer_url=self_mcp.settings.auth.issuer_url, + service_documentation_url=self_mcp.settings.auth.service_documentation_url, + client_registration_options=self_mcp.settings.auth.client_registration_options, + revocation_options=self_mcp.settings.auth.revocation_options, + ) + ) + + if self_mcp._auth_server_provider: + routes.append( + Route( + self_mcp.settings.sse_path, + endpoint=RequireAuthMiddleware(handle_sse, required_scopes), + methods=["GET"], + ) + ) + routes.append( + Mount( + self_mcp.settings.message_path, + app=RequireAuthMiddleware(sse.handle_post_message, required_scopes), + ) + ) + else: + + async def sse_endpoint(request: Request) -> Response: + return await handle_sse(request.scope, request.receive, request._send) # type: ignore[arg-type] + + routes.append( + Route( + self_mcp.settings.sse_path, + endpoint=sse_endpoint, + methods=["GET"], + ) + ) + routes.append( + Mount( + self_mcp.settings.message_path, + app=sse.handle_post_message, + ) + ) + + routes.extend(self_mcp._custom_starlette_routes) + + return Starlette(debug=self_mcp.settings.debug, routes=routes, middleware=middleware) + + # Bind the patched method to the FastMCP instance + self.mcp_server.sse_app = patched_sse_app.__get__(self.mcp_server, FastMCP) # Shutdown coordination self._graceful_shutdown_event = asyncio.Event() self._force_shutdown_event = asyncio.Event() From aaac9ee4ff2eef02c8d9b88dc43106afd6f17307 Mon Sep 17 00:00:00 2001 From: Pablo Toledo Date: Wed, 11 Jun 2025 21:30:15 +0200 Subject: [PATCH 2/5] Revert "Fix SSE timeout when chaining agents" --- src/mcp_agent/mcp_server/agent_server.py | 113 +---------------------- 1 file changed, 2 insertions(+), 111 deletions(-) diff --git a/src/mcp_agent/mcp_server/agent_server.py b/src/mcp_agent/mcp_server/agent_server.py index 2821763d..a0e1f972 100644 --- a/src/mcp_agent/mcp_server/agent_server.py +++ b/src/mcp_agent/mcp_server/agent_server.py @@ -1,9 +1,5 @@ -"""Agent MCP server utilities with improved SSE handling. - -This module patches :meth:`FastMCP.sse_app` at runtime to avoid returning a -second :class:`starlette.responses.Response` after the SSE connection completes. -The extra response triggered ``"Unexpected ASGI message 'http.response.start'"`` -errors and caused timeouts when chaining agents in server mode. +""" +Enhanced AgentMCPServer with robust shutdown handling for SSE transport. """ import asyncio @@ -12,20 +8,8 @@ from contextlib import AsyncExitStack, asynccontextmanager from typing import Set -from mcp.server.auth.middleware.auth_context import AuthContextMiddleware -from mcp.server.auth.middleware.bearer_auth import ( - BearerAuthBackend, - RequireAuthMiddleware, -) from mcp.server.fastmcp import Context as MCPContext from mcp.server.fastmcp import FastMCP -from mcp.server.sse import SseServerTransport -from starlette.applications import Starlette -from starlette.middleware import Middleware -from starlette.middleware.authentication import AuthenticationMiddleware -from starlette.requests import Request -from starlette.responses import Response -from starlette.routing import Mount, Route import mcp_agent import mcp_agent.core @@ -52,99 +36,6 @@ def __init__( instructions=server_description or f"This server provides access to {len(agent_app._agents)} agents", ) - - # Patch FastMCP.sse_app to avoid returning an extra Response that - # triggers "Unexpected ASGI message 'http.response.start'" errors when - # running in server mode with SSE transport. The upstream implementation - # returns a Response after the EventSourceResponse has already completed - # which causes Starlette to attempt to start a new response on a closed - # connection. Removing that return fixes timeouts when chaining agents. - def patched_sse_app(self_mcp, mount_path: str | None = None): - if mount_path is not None: - self_mcp.settings.mount_path = mount_path - - normalized_message_endpoint = self_mcp._normalize_path( - self_mcp.settings.mount_path, self_mcp.settings.message_path - ) - - sse = SseServerTransport(normalized_message_endpoint) - - async def handle_sse(scope, receive, send): - async with sse.connect_sse(scope, receive, send) as streams: - await self_mcp._mcp_server.run( - streams[0], - streams[1], - self_mcp._mcp_server.create_initialization_options(), - ) - - routes = [] - middleware = [] - required_scopes = [] - - if self_mcp._auth_server_provider: - assert self_mcp.settings.auth - from mcp.server.auth.routes import create_auth_routes - - required_scopes = self_mcp.settings.auth.required_scopes or [] - - middleware = [ - Middleware( - AuthenticationMiddleware, - backend=BearerAuthBackend( - provider=self_mcp._auth_server_provider, - ), - ), - Middleware(AuthContextMiddleware), - ] - routes.extend( - create_auth_routes( - provider=self_mcp._auth_server_provider, - issuer_url=self_mcp.settings.auth.issuer_url, - service_documentation_url=self_mcp.settings.auth.service_documentation_url, - client_registration_options=self_mcp.settings.auth.client_registration_options, - revocation_options=self_mcp.settings.auth.revocation_options, - ) - ) - - if self_mcp._auth_server_provider: - routes.append( - Route( - self_mcp.settings.sse_path, - endpoint=RequireAuthMiddleware(handle_sse, required_scopes), - methods=["GET"], - ) - ) - routes.append( - Mount( - self_mcp.settings.message_path, - app=RequireAuthMiddleware(sse.handle_post_message, required_scopes), - ) - ) - else: - - async def sse_endpoint(request: Request) -> Response: - return await handle_sse(request.scope, request.receive, request._send) # type: ignore[arg-type] - - routes.append( - Route( - self_mcp.settings.sse_path, - endpoint=sse_endpoint, - methods=["GET"], - ) - ) - routes.append( - Mount( - self_mcp.settings.message_path, - app=sse.handle_post_message, - ) - ) - - routes.extend(self_mcp._custom_starlette_routes) - - return Starlette(debug=self_mcp.settings.debug, routes=routes, middleware=middleware) - - # Bind the patched method to the FastMCP instance - self.mcp_server.sse_app = patched_sse_app.__get__(self.mcp_server, FastMCP) # Shutdown coordination self._graceful_shutdown_event = asyncio.Event() self._force_shutdown_event = asyncio.Event() From 14e32bb2d455a58ef1abc0fd5a4a567637d6e173 Mon Sep 17 00:00:00 2001 From: Pablo Toledo Date: Wed, 11 Jun 2025 21:40:33 +0200 Subject: [PATCH 3/5] Add chain response aggregator and server integration --- src/mcp_agent/agents/workflow/chain_agent.py | 19 +++++++- src/mcp_agent/mcp_server/agent_server.py | 32 +++++++++++++- src/mcp_agent/server/response_aggregator.py | 43 +++++++++++++++++++ .../server/test_response_aggregator.py | 43 +++++++++++++++++++ 4 files changed, 134 insertions(+), 3 deletions(-) create mode 100644 src/mcp_agent/server/response_aggregator.py create mode 100644 tests/unit/mcp_agent/server/test_response_aggregator.py diff --git a/src/mcp_agent/agents/workflow/chain_agent.py b/src/mcp_agent/agents/workflow/chain_agent.py index 70a5604b..d8551de9 100644 --- a/src/mcp_agent/agents/workflow/chain_agent.py +++ b/src/mcp_agent/agents/workflow/chain_agent.py @@ -71,12 +71,21 @@ async def generate( # # Get the original user message (last message in the list) user_message = multipart_messages[-1] if multipart_messages else None + aggregator = getattr(self.context, "response_aggregator", None) + if not self.cumulative: response: PromptMessageMultipart = await self.agents[0].generate(multipart_messages) + if aggregator: + await aggregator.add_agent_response(self.agents[0].name, response.all_text()) # Process the rest of the agents in the chain for agent in self.agents[1:]: next_message = Prompt.user(*response.content) response = await agent.generate([next_message]) + if aggregator: + await aggregator.add_agent_response(agent.name, response.all_text()) + + if aggregator and await aggregator.should_send_response(): + await aggregator.get_aggregated_response() return response @@ -96,6 +105,8 @@ async def generate( chain_messages = multipart_messages.copy() chain_messages.extend(all_responses) current_response = await agent.generate(chain_messages, request_params) + if aggregator: + await aggregator.add_agent_response(agent.name, current_response.all_text()) # Store the response all_responses.append(current_response) @@ -111,10 +122,16 @@ async def generate( # For cumulative mode, return the properly formatted output with XML tags response_text = "\n\n".join(final_results) - return PromptMessageMultipart( + final_message = PromptMessageMultipart( role="assistant", content=[TextContent(type="text", text=response_text)], ) + if aggregator: + await aggregator.add_agent_response(self.name, response_text) + if await aggregator.should_send_response(): + await aggregator.get_aggregated_response() + + return final_message async def structured( self, diff --git a/src/mcp_agent/mcp_server/agent_server.py b/src/mcp_agent/mcp_server/agent_server.py index a0e1f972..3080b165 100644 --- a/src/mcp_agent/mcp_server/agent_server.py +++ b/src/mcp_agent/mcp_server/agent_server.py @@ -16,6 +16,7 @@ import mcp_agent.core.prompt from mcp_agent.core.agent_app import AgentApp from mcp_agent.logging.logger import get_logger +from mcp_agent.server.response_aggregator import ChainResponseAggregator logger = get_logger(__name__) @@ -77,7 +78,21 @@ async def execute_send(): # Execute with bridged context if agent_context and ctx: - return await self.with_bridged_context(agent_context, ctx, execute_send) + aggregator = None + try: + from mcp_agent.agents.workflow.chain_agent import ChainAgent + + if isinstance(agent, ChainAgent): + aggregator = ChainResponseAggregator( + chain_name=agent_name, + total_agents=len(agent.agents), + ) + except Exception: + aggregator = None + + return await self.with_bridged_context( + agent_context, ctx, execute_send, aggregator=aggregator + ) else: return await execute_send() @@ -368,7 +383,15 @@ async def _close_sse_connections(self): except Exception as e: logger.error(f"Error during ASGI lifespan shutdown: {e}") - async def with_bridged_context(self, agent_context, mcp_context, func, *args, **kwargs): + async def with_bridged_context( + self, + agent_context, + mcp_context, + func, + *args, + aggregator=None, + **kwargs, + ): """ Execute a function with bridged context between MCP and agent @@ -397,6 +420,9 @@ async def bridged_progress(progress, total=None) -> None: if hasattr(agent_context, "progress_reporter"): agent_context.progress_reporter = bridged_progress + if aggregator is not None: + agent_context.response_aggregator = aggregator + try: # Call the function return await func(*args, **kwargs) @@ -408,6 +434,8 @@ async def bridged_progress(progress, total=None) -> None: # Remove MCP context reference if hasattr(agent_context, "mcp_context"): delattr(agent_context, "mcp_context") + if aggregator is not None and hasattr(agent_context, "response_aggregator"): + delattr(agent_context, "response_aggregator") async def _cleanup_stdio(self): """Minimal cleanup for STDIO transport to avoid keeping process alive.""" diff --git a/src/mcp_agent/server/response_aggregator.py b/src/mcp_agent/server/response_aggregator.py new file mode 100644 index 00000000..14d8f161 --- /dev/null +++ b/src/mcp_agent/server/response_aggregator.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any, Dict + + +class ChainResponseAggregator: + """Aggregate responses for a multi-agent chain.""" + + def __init__(self, chain_name: str, total_agents: int) -> None: + self.chain_name = chain_name + self.total_agents = total_agents + self.agent_responses: Dict[str, Any] = {} + self.completed_agents = 0 + self._response_sent = False + + async def add_agent_response(self, agent_name: str, response: Any) -> None: + """Record a response from an agent in the chain.""" + self.agent_responses[agent_name] = response + self.completed_agents += 1 + + async def should_send_response(self) -> bool: + """Return ``True`` if the aggregated response should be sent.""" + return not self._response_sent and self.completed_agents >= self.total_agents + + async def get_aggregated_response(self) -> Dict[str, Any]: + """Return the aggregated response for the chain.""" + self._response_sent = True + return {"chain": self.chain_name, "responses": self.agent_responses} + + +class SSEEventType(Enum): + AGENT_START = "agent_start" + AGENT_PROGRESS = "agent_progress" + AGENT_COMPLETE = "agent_complete" + CHAIN_COMPLETE = "chain_complete" + ERROR = "error" + + +async def send_sse_event(event_type: SSEEventType, data: Dict[str, Any], stream: Any) -> None: + """Send an SSE event to the provided stream if possible.""" + if stream is not None and hasattr(stream, "send"): + await stream.send({"event": event_type.value, "data": data}) diff --git a/tests/unit/mcp_agent/server/test_response_aggregator.py b/tests/unit/mcp_agent/server/test_response_aggregator.py new file mode 100644 index 00000000..909808a7 --- /dev/null +++ b/tests/unit/mcp_agent/server/test_response_aggregator.py @@ -0,0 +1,43 @@ +import importlib.util +from pathlib import Path + +import pytest + +MODULE_PATH = ( + Path(__file__).resolve().parents[4] / "src" / "mcp_agent" / "server" / "response_aggregator.py" +) +spec = importlib.util.spec_from_file_location("response_aggregator", MODULE_PATH) +response_aggregator = importlib.util.module_from_spec(spec) +assert spec.loader +spec.loader.exec_module(response_aggregator) + +ChainResponseAggregator = response_aggregator.ChainResponseAggregator +SSEEventType = response_aggregator.SSEEventType +send_sse_event = response_aggregator.send_sse_event + + +@pytest.mark.asyncio +async def test_chain_response_aggregator(): + agg = ChainResponseAggregator("chain", 2) + await agg.add_agent_response("a1", "one") + assert not await agg.should_send_response() + await agg.add_agent_response("a2", "two") + assert await agg.should_send_response() + result = await agg.get_aggregated_response() + assert result["chain"] == "chain" + assert result["responses"] == {"a1": "one", "a2": "two"} + + +class _DummyStream: + def __init__(self) -> None: + self.sent = [] + + async def send(self, data): + self.sent.append(data) + + +@pytest.mark.asyncio +async def test_send_sse_event(): + stream = _DummyStream() + await send_sse_event(SSEEventType.AGENT_START, {"foo": "bar"}, stream) + assert stream.sent == [{"event": "agent_start", "data": {"foo": "bar"}}] From 65aaf6a01918ecd45f00a65a6484092e3eb5592a Mon Sep 17 00:00:00 2001 From: Pablo Toledo Date: Wed, 11 Jun 2025 22:25:09 +0200 Subject: [PATCH 4/5] Handle chain agents without SSE aggregation --- src/mcp_agent/agents/workflow/chain_agent.py | 19 +++++++- src/mcp_agent/mcp_server/agent_server.py | 30 ++++++++++--- src/mcp_agent/server/response_aggregator.py | 43 +++++++++++++++++++ .../server/test_response_aggregator.py | 43 +++++++++++++++++++ 4 files changed, 128 insertions(+), 7 deletions(-) create mode 100644 src/mcp_agent/server/response_aggregator.py create mode 100644 tests/unit/mcp_agent/server/test_response_aggregator.py diff --git a/src/mcp_agent/agents/workflow/chain_agent.py b/src/mcp_agent/agents/workflow/chain_agent.py index 70a5604b..d8551de9 100644 --- a/src/mcp_agent/agents/workflow/chain_agent.py +++ b/src/mcp_agent/agents/workflow/chain_agent.py @@ -71,12 +71,21 @@ async def generate( # # Get the original user message (last message in the list) user_message = multipart_messages[-1] if multipart_messages else None + aggregator = getattr(self.context, "response_aggregator", None) + if not self.cumulative: response: PromptMessageMultipart = await self.agents[0].generate(multipart_messages) + if aggregator: + await aggregator.add_agent_response(self.agents[0].name, response.all_text()) # Process the rest of the agents in the chain for agent in self.agents[1:]: next_message = Prompt.user(*response.content) response = await agent.generate([next_message]) + if aggregator: + await aggregator.add_agent_response(agent.name, response.all_text()) + + if aggregator and await aggregator.should_send_response(): + await aggregator.get_aggregated_response() return response @@ -96,6 +105,8 @@ async def generate( chain_messages = multipart_messages.copy() chain_messages.extend(all_responses) current_response = await agent.generate(chain_messages, request_params) + if aggregator: + await aggregator.add_agent_response(agent.name, current_response.all_text()) # Store the response all_responses.append(current_response) @@ -111,10 +122,16 @@ async def generate( # For cumulative mode, return the properly formatted output with XML tags response_text = "\n\n".join(final_results) - return PromptMessageMultipart( + final_message = PromptMessageMultipart( role="assistant", content=[TextContent(type="text", text=response_text)], ) + if aggregator: + await aggregator.add_agent_response(self.name, response_text) + if await aggregator.should_send_response(): + await aggregator.get_aggregated_response() + + return final_message async def structured( self, diff --git a/src/mcp_agent/mcp_server/agent_server.py b/src/mcp_agent/mcp_server/agent_server.py index a0e1f972..4980bd66 100644 --- a/src/mcp_agent/mcp_server/agent_server.py +++ b/src/mcp_agent/mcp_server/agent_server.py @@ -68,18 +68,29 @@ def register_agent_tools(self, agent_name: str, agent) -> None: ) async def send_message(message: str, ctx: MCPContext) -> str: """Send a message to the agent and return its response.""" - # Get the agent's context + from mcp_agent.agents.workflow.chain_agent import ChainAgent + + # For chain agents, handle execution without SSE aggregation + if isinstance(agent, ChainAgent): + response = await agent.send(message) + + if hasattr(response, "all_text"): + return response.all_text() + elif isinstance(response, dict): + import json + + return json.dumps(response) + return str(response) + + # Non-chain agents use normal flow agent_context = getattr(agent, "context", None) - # Define the function to execute async def execute_send(): return await agent.send(message) - # Execute with bridged context if agent_context and ctx: return await self.with_bridged_context(agent_context, ctx, execute_send) - else: - return await execute_send() + return await execute_send() # Register a history prompt for this agent @self.mcp_server.prompt( @@ -368,7 +379,14 @@ async def _close_sse_connections(self): except Exception as e: logger.error(f"Error during ASGI lifespan shutdown: {e}") - async def with_bridged_context(self, agent_context, mcp_context, func, *args, **kwargs): + async def with_bridged_context( + self, + agent_context, + mcp_context, + func, + *args, + **kwargs, + ): """ Execute a function with bridged context between MCP and agent diff --git a/src/mcp_agent/server/response_aggregator.py b/src/mcp_agent/server/response_aggregator.py new file mode 100644 index 00000000..14d8f161 --- /dev/null +++ b/src/mcp_agent/server/response_aggregator.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any, Dict + + +class ChainResponseAggregator: + """Aggregate responses for a multi-agent chain.""" + + def __init__(self, chain_name: str, total_agents: int) -> None: + self.chain_name = chain_name + self.total_agents = total_agents + self.agent_responses: Dict[str, Any] = {} + self.completed_agents = 0 + self._response_sent = False + + async def add_agent_response(self, agent_name: str, response: Any) -> None: + """Record a response from an agent in the chain.""" + self.agent_responses[agent_name] = response + self.completed_agents += 1 + + async def should_send_response(self) -> bool: + """Return ``True`` if the aggregated response should be sent.""" + return not self._response_sent and self.completed_agents >= self.total_agents + + async def get_aggregated_response(self) -> Dict[str, Any]: + """Return the aggregated response for the chain.""" + self._response_sent = True + return {"chain": self.chain_name, "responses": self.agent_responses} + + +class SSEEventType(Enum): + AGENT_START = "agent_start" + AGENT_PROGRESS = "agent_progress" + AGENT_COMPLETE = "agent_complete" + CHAIN_COMPLETE = "chain_complete" + ERROR = "error" + + +async def send_sse_event(event_type: SSEEventType, data: Dict[str, Any], stream: Any) -> None: + """Send an SSE event to the provided stream if possible.""" + if stream is not None and hasattr(stream, "send"): + await stream.send({"event": event_type.value, "data": data}) diff --git a/tests/unit/mcp_agent/server/test_response_aggregator.py b/tests/unit/mcp_agent/server/test_response_aggregator.py new file mode 100644 index 00000000..909808a7 --- /dev/null +++ b/tests/unit/mcp_agent/server/test_response_aggregator.py @@ -0,0 +1,43 @@ +import importlib.util +from pathlib import Path + +import pytest + +MODULE_PATH = ( + Path(__file__).resolve().parents[4] / "src" / "mcp_agent" / "server" / "response_aggregator.py" +) +spec = importlib.util.spec_from_file_location("response_aggregator", MODULE_PATH) +response_aggregator = importlib.util.module_from_spec(spec) +assert spec.loader +spec.loader.exec_module(response_aggregator) + +ChainResponseAggregator = response_aggregator.ChainResponseAggregator +SSEEventType = response_aggregator.SSEEventType +send_sse_event = response_aggregator.send_sse_event + + +@pytest.mark.asyncio +async def test_chain_response_aggregator(): + agg = ChainResponseAggregator("chain", 2) + await agg.add_agent_response("a1", "one") + assert not await agg.should_send_response() + await agg.add_agent_response("a2", "two") + assert await agg.should_send_response() + result = await agg.get_aggregated_response() + assert result["chain"] == "chain" + assert result["responses"] == {"a1": "one", "a2": "two"} + + +class _DummyStream: + def __init__(self) -> None: + self.sent = [] + + async def send(self, data): + self.sent.append(data) + + +@pytest.mark.asyncio +async def test_send_sse_event(): + stream = _DummyStream() + await send_sse_event(SSEEventType.AGENT_START, {"foo": "bar"}, stream) + assert stream.sent == [{"event": "agent_start", "data": {"foo": "bar"}}] From 8daa4fbc1ba2b4a0c89d4ae157474e43c8ddfbbe Mon Sep 17 00:00:00 2001 From: evalstate <1936278+evalstate@users.noreply.github.com> Date: Thu, 19 Jun 2025 20:38:17 +0100 Subject: [PATCH 5/5] lint --- src/mcp_agent/mcp_server/agent_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mcp_agent/mcp_server/agent_server.py b/src/mcp_agent/mcp_server/agent_server.py index 1fe334d3..cf0c240b 100644 --- a/src/mcp_agent/mcp_server/agent_server.py +++ b/src/mcp_agent/mcp_server/agent_server.py @@ -16,7 +16,6 @@ import mcp_agent.core.prompt from mcp_agent.core.agent_app import AgentApp from mcp_agent.logging.logger import get_logger -from mcp_agent.server.response_aggregator import ChainResponseAggregator logger = get_logger(__name__)