Skip to content

Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block#2934

Open
maxisbey wants to merge 3 commits into
mainfrom
fix/streamable-http-hol-buffer
Open

Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block#2934
maxisbey wants to merge 3 commits into
mainfrom
fix/streamable-http-hol-buffer

Conversation

@maxisbey

@maxisbey maxisbey commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Two related fixes to the stateful StreamableHTTPServerTransport so concurrent requests can't wedge each other and event-store ordering doesn't depend on scheduler timing.

Buffer the per-request _request_streams (head-of-line block). Each POST registers a buffer-0 _request_streams[id] channel, start_soons the EventSourceResponse, and pushes the request to the server. The single serial message_router then forwards each response with await _request_streams[id][0].send(...) — which on a buffer-0 stream parks until that request's sse_writer (started lazily, two start_soon hops deep) has reached its first receive(). While the router is parked on one request, every other in-flight response on the session is head-of-line blocked behind it. This gives the three _request_streams[EventMessage] sites a small bounded buffer (REQUEST_STREAM_BUFFER_SIZE = 16) so the router can deposit and move on. The downstream sse_stream dict streams stay at 0 — they're not on the router's send target.

Store the priming event before request dispatch (event-store ordering). With an event_store configured, message_router calls store_event(msg) before its send(), while the priming event was stored inside the lazily-started sse_writer — so its event-store position raced the first routed message. Splitting the helper into _mint_priming_event (store + return wire dict) and _run_sse_writer (forward onto the wire) lets the POST handler await _mint_priming_event(...) before writer.send(session_message). The server can't emit anything for a request it hasn't received, so the priming row now precedes every router store for that stream by data dependency rather than scheduler timing — the same shape the TypeScript SDK uses.

Motivation and Context

Fixes #1764. The natural race is hard to reproduce standalone (multiple parties report 0 hits across thousands of loopback iterations) but is observed consistently in production by several reporters on the issue. The unit-level head-of-line block is straightforward to demonstrate though: register two request streams, give B a consumer, leave A consumer-less, write responses A then B — on main B never arrives until A is closed; with this change B arrives immediately and A's response is buffered.

The priming-order fix addresses a review finding that the buffer change widened a pre-existing window where a routed message could be stored before the priming event.

How Has This Been Tested?

  • New tests/server/test_streamable_http_router.py drives the routing layer directly: one test for the head-of-line block, one asserting [priming, m₁, …, m₅] event-store order with no sse_writer ever scheduled.
  • tests/shared/test_streamable_http.py priming-helper unit tests updated for _mint_priming_event.
  • Existing reconnection / sse-polling / replay tests pass unchanged.

Breaking Changes

None. EventStore ABC is unchanged.

Intentional behaviour changes:

  • The standalone GET stream now buffers up to 16 server-initiated messages before backpressure engages on a slow GET client (was 0).
  • A request whose consumer stalls past 16 messages (e.g. a long-running tool emitting many progress notifications to a stalled SSE client) can still wedge the router — this moves the threshold from 0 to 16, it doesn't remove the serial router. The routerless design in _streamable_http_modern.py is the structural fix on main.
  • _maybe_send_priming_event is replaced by _mint_priming_event (returns the wire dict instead of sending) and _run_sse_writer. Both are private; no public surface change.
  • The replay path's priming/live-tail ordering window is pre-existing and unchanged here; tracked separately.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

The buffer-size approach matches what reporters on #1764 have field-validated (sed-patching to 10). 16 covers the reported single-response list-request case by construction (one message per id) plus headroom for small notification workloads, while staying bounded for the session-lifetime GET stream.

A separate change is needed for the stateless task-leak half of #1764 (#2145's request-scoped task-group approach); that's not bundled here.

AI Disclaimer

maxisbey added 3 commits June 20, 2026 17:03
…-of-line block

The serial message_router forwards each response with a blocking send into
a per-request buffer-0 stream whose only consumer (sse_writer) is started
lazily via nested start_soon. Under concurrent requests one not-yet-receiving
consumer parks the router and head-of-line blocks every other in-flight
response on the session.

Give the three _request_streams[EventMessage] sites a small bounded buffer
so the router can deposit and move on. The sse_stream dict streams stay at
0 (downstream of the router; buffering them would relax per-client
backpressure without helping the race).

Fixes #1764.
…order

Splits the old _maybe_send_priming_event into _mint_priming_event (store +
return wire dict) and _run_sse_writer (forward request_stream onto the wire).
The POST handler now awaits _mint_priming_event before writer.send(), so the
priming row is in the event store before the server can produce any message
for that request id — ordering by data dependency, not scheduler timing.

The replay path keeps its priming event (test_streamable_http_multiple_reconnections
relies on it as a stream-re-registered signal); its replay→live-tail ordering
window is pre-existing and orthogonal.

Also extracts the inline sse_writer closure to a method (drops _handle_post_request
below the C901 threshold) and widens the SSE-dict stream type to SSEEvent
(dict[str, Any]) — the previous dict[str, str] was a lie masked by the old
helper's Any parameter, since priming events carry retry: int.
Hoists _mint_priming_event to the top of the SSE arm so a user EventStore
raising on the priming row returns a 500 with no per-request state allocated
(previously _request_streams[id] and _sse_stream_writers[id] leaked for the
session). The shared _request_streams registration is pushed into each branch.

Adds an old-pv-reconnect test in test_hosting_resume.py covering the
priming_event-is-None replay arm; drops the no-branch pragma. The new
priming-failure test covers the outer except handler, so its pragmas and the
dead 'if writer:' check are removed.
@maxisbey maxisbey marked this pull request as ready for review June 20, 2026 20:56
Comment on lines 918 to +931
# Register SSE writer so close_sse_stream() can close it
self._sse_stream_writers[stream_id] = sse_stream_writer

# Send priming event for this new connection
await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version)
# Prime the resumed connection so the client sees the stream
# is re-registered. The replay→live-tail ordering window here
# is pre-existing and tracked separately.
priming_event = await self._mint_priming_event(stream_id, replay_protocol_version)
if priming_event is not None:
await sse_stream_writer.send(priming_event)

# Create new request streams for this connection
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0)
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](
REQUEST_STREAM_BUFFER_SIZE
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 Pre-existing issue (not introduced by this PR, but adjacent to the priming logic rewritten here): replay_sender has no finally cleanup, so when a resumed SSE connection ends, the closed writer stays in _sse_stream_writers[stream_id] and the _request_streams[stream_id] entry can linger — long-lived sessions with many resumptions accumulate stale entries. A finally mirroring _run_sse_writer (pop _sse_stream_writers + _clean_up_memory_streams) would close the gap, either here or in the separately-tracked replay follow-up.

Extended reasoning...

What the bug is. The replay path registers per-stream state but never tears it down. In _replay_eventsreplay_sender, the code does self._sse_stream_writers[stream_id] = sse_stream_writer (line 919), mints/sends the priming event, and creates self._request_streams[stream_id] (line ~929). The other two SSE writer paths clean up after themselves: _run_sse_writer has a finally that pops _sse_stream_writers[request_id] and calls _clean_up_memory_streams(request_id), and standalone_sse_writer's finally calls _clean_up_memory_streams(GET_STREAM_KEY). replay_sender has no finally at all — the outer finally in _replay_events only closes the wire-level sse_stream_writer/sse_stream_reader, not the dict entries.

The code path that triggers it. The common case is the normal resume lifecycle: a client reconnects with Last-Event-ID, the replay completes, the resumed request finishes (its response is delivered over the replay stream), and the client drops the GET. sse-starlette tears down the response and replay_sender unwinds — but nothing removes _sse_stream_writers[stream_id] (only close_sse_stream() ever pops it on this path) or _request_streams[stream_id].

Why existing code doesn't prevent it. _request_streams[stream_id] is reaped lazily only if message_router later attempts a send to that id and hits BrokenResourceError — which for a request that has already completed never happens, so it persists until terminate() or connect() teardown. _sse_stream_writers is worse: neither terminate() nor connect()'s finally clears it, so those entries persist for the transport's lifetime. (One refinement to the original finding: the router does not keep buffering 16 messages and then block on the dead stream — replay_sender's async with msg_reader closes the receive end on unwind, so a stray routed message raises BrokenResourceError immediately and the entry self-heals. The leak is stale dict entries, not a wedge.)

Concrete walk-through. (1) Client POSTs tools/call id=7; tool emits notifications; client disconnects mid-stream. (2) Client reconnects: GET /mcp with Last-Event-ID. replay_sender replays missed events, registers _sse_stream_writers["7"] and _request_streams["7"], and tails live messages. (3) The tool finishes; the router stores and deposits the JSONRPCResponse; replay_sender forwards it; the client closes the GET. (4) replay_sender unwinds — but _sse_stream_writers["7"] still holds the closed writer and _request_streams["7"] still holds closed streams. Repeat for ids 8, 9, 10… on a long-lived session and the dicts grow monotonically. A secondary wrinkle: the writer is registered before _mint_priming_event, so a user EventStore.store_event exception (swallowed by the broad except Exception) leaves an orphaned _sse_stream_writers entry with no matching _request_streams entry. A later resume for the same stream id also finds the stale _request_streams entry and skips re-registration/priming.

Impact. Bounded, slow growth — one small dict entry plus a closed memory-stream pair per resumed request, capped by session lifetime for _request_streams and by transport lifetime for _sse_stream_writers. Not a correctness wedge, which is why this is flagged as pre-existing rather than blocking.

How to fix. Give replay_sender a finally mirroring _run_sse_writer: self._sse_stream_writers.pop(stream_id, None) (guarded for the case where stream_id was never resolved) and await self._clean_up_memory_streams(stream_id). Since this PR already rewrote the priming logic in this exact block and notes the replay-path ordering window is tracked separately, this could ride along there if preferred.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Race condition in StreamableHTTP: zero-buffer memory streams cause deadlock with concurrent SSE responses

1 participant