Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block#2934
Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block#2934maxisbey wants to merge 3 commits into
Conversation
…-of-line block The serial message_router forwards each response with a blocking send into a per-request buffer-0 stream whose only consumer (sse_writer) is started lazily via nested start_soon. Under concurrent requests one not-yet-receiving consumer parks the router and head-of-line blocks every other in-flight response on the session. Give the three _request_streams[EventMessage] sites a small bounded buffer so the router can deposit and move on. The sse_stream dict streams stay at 0 (downstream of the router; buffering them would relax per-client backpressure without helping the race). Fixes #1764.
…order Splits the old _maybe_send_priming_event into _mint_priming_event (store + return wire dict) and _run_sse_writer (forward request_stream onto the wire). The POST handler now awaits _mint_priming_event before writer.send(), so the priming row is in the event store before the server can produce any message for that request id — ordering by data dependency, not scheduler timing. The replay path keeps its priming event (test_streamable_http_multiple_reconnections relies on it as a stream-re-registered signal); its replay→live-tail ordering window is pre-existing and orthogonal. Also extracts the inline sse_writer closure to a method (drops _handle_post_request below the C901 threshold) and widens the SSE-dict stream type to SSEEvent (dict[str, Any]) — the previous dict[str, str] was a lie masked by the old helper's Any parameter, since priming events carry retry: int.
Hoists _mint_priming_event to the top of the SSE arm so a user EventStore raising on the priming row returns a 500 with no per-request state allocated (previously _request_streams[id] and _sse_stream_writers[id] leaked for the session). The shared _request_streams registration is pushed into each branch. Adds an old-pv-reconnect test in test_hosting_resume.py covering the priming_event-is-None replay arm; drops the no-branch pragma. The new priming-failure test covers the outer except handler, so its pragmas and the dead 'if writer:' check are removed.
| # Register SSE writer so close_sse_stream() can close it | ||
| self._sse_stream_writers[stream_id] = sse_stream_writer | ||
|
|
||
| # Send priming event for this new connection | ||
| await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version) | ||
| # Prime the resumed connection so the client sees the stream | ||
| # is re-registered. The replay→live-tail ordering window here | ||
| # is pre-existing and tracked separately. | ||
| priming_event = await self._mint_priming_event(stream_id, replay_protocol_version) | ||
| if priming_event is not None: | ||
| await sse_stream_writer.send(priming_event) | ||
|
|
||
| # Create new request streams for this connection | ||
| self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0) | ||
| self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage]( | ||
| REQUEST_STREAM_BUFFER_SIZE | ||
| ) |
There was a problem hiding this comment.
🟣 Pre-existing issue (not introduced by this PR, but adjacent to the priming logic rewritten here): replay_sender has no finally cleanup, so when a resumed SSE connection ends, the closed writer stays in _sse_stream_writers[stream_id] and the _request_streams[stream_id] entry can linger — long-lived sessions with many resumptions accumulate stale entries. A finally mirroring _run_sse_writer (pop _sse_stream_writers + _clean_up_memory_streams) would close the gap, either here or in the separately-tracked replay follow-up.
Extended reasoning...
What the bug is. The replay path registers per-stream state but never tears it down. In _replay_events → replay_sender, the code does self._sse_stream_writers[stream_id] = sse_stream_writer (line 919), mints/sends the priming event, and creates self._request_streams[stream_id] (line ~929). The other two SSE writer paths clean up after themselves: _run_sse_writer has a finally that pops _sse_stream_writers[request_id] and calls _clean_up_memory_streams(request_id), and standalone_sse_writer's finally calls _clean_up_memory_streams(GET_STREAM_KEY). replay_sender has no finally at all — the outer finally in _replay_events only closes the wire-level sse_stream_writer/sse_stream_reader, not the dict entries.
The code path that triggers it. The common case is the normal resume lifecycle: a client reconnects with Last-Event-ID, the replay completes, the resumed request finishes (its response is delivered over the replay stream), and the client drops the GET. sse-starlette tears down the response and replay_sender unwinds — but nothing removes _sse_stream_writers[stream_id] (only close_sse_stream() ever pops it on this path) or _request_streams[stream_id].
Why existing code doesn't prevent it. _request_streams[stream_id] is reaped lazily only if message_router later attempts a send to that id and hits BrokenResourceError — which for a request that has already completed never happens, so it persists until terminate() or connect() teardown. _sse_stream_writers is worse: neither terminate() nor connect()'s finally clears it, so those entries persist for the transport's lifetime. (One refinement to the original finding: the router does not keep buffering 16 messages and then block on the dead stream — replay_sender's async with msg_reader closes the receive end on unwind, so a stray routed message raises BrokenResourceError immediately and the entry self-heals. The leak is stale dict entries, not a wedge.)
Concrete walk-through. (1) Client POSTs tools/call id=7; tool emits notifications; client disconnects mid-stream. (2) Client reconnects: GET /mcp with Last-Event-ID. replay_sender replays missed events, registers _sse_stream_writers["7"] and _request_streams["7"], and tails live messages. (3) The tool finishes; the router stores and deposits the JSONRPCResponse; replay_sender forwards it; the client closes the GET. (4) replay_sender unwinds — but _sse_stream_writers["7"] still holds the closed writer and _request_streams["7"] still holds closed streams. Repeat for ids 8, 9, 10… on a long-lived session and the dicts grow monotonically. A secondary wrinkle: the writer is registered before _mint_priming_event, so a user EventStore.store_event exception (swallowed by the broad except Exception) leaves an orphaned _sse_stream_writers entry with no matching _request_streams entry. A later resume for the same stream id also finds the stale _request_streams entry and skips re-registration/priming.
Impact. Bounded, slow growth — one small dict entry plus a closed memory-stream pair per resumed request, capped by session lifetime for _request_streams and by transport lifetime for _sse_stream_writers. Not a correctness wedge, which is why this is flagged as pre-existing rather than blocking.
How to fix. Give replay_sender a finally mirroring _run_sse_writer: self._sse_stream_writers.pop(stream_id, None) (guarded for the case where stream_id was never resolved) and await self._clean_up_memory_streams(stream_id). Since this PR already rewrote the priming logic in this exact block and notes the replay-path ordering window is tracked separately, this could ride along there if preferred.
Two related fixes to the stateful
StreamableHTTPServerTransportso concurrent requests can't wedge each other and event-store ordering doesn't depend on scheduler timing.Buffer the per-request
_request_streams(head-of-line block). Each POST registers a buffer-0_request_streams[id]channel,start_soons theEventSourceResponse, and pushes the request to the server. The single serialmessage_routerthen forwards each response withawait _request_streams[id][0].send(...)— which on a buffer-0 stream parks until that request'ssse_writer(started lazily, twostart_soonhops deep) has reached its firstreceive(). While the router is parked on one request, every other in-flight response on the session is head-of-line blocked behind it. This gives the three_request_streams[EventMessage]sites a small bounded buffer (REQUEST_STREAM_BUFFER_SIZE = 16) so the router can deposit and move on. The downstreamsse_streamdict streams stay at 0 — they're not on the router's send target.Store the priming event before request dispatch (event-store ordering). With an
event_storeconfigured,message_routercallsstore_event(msg)before itssend(), while the priming event was stored inside the lazily-startedsse_writer— so its event-store position raced the first routed message. Splitting the helper into_mint_priming_event(store + return wire dict) and_run_sse_writer(forward onto the wire) lets the POST handlerawait _mint_priming_event(...)beforewriter.send(session_message). The server can't emit anything for a request it hasn't received, so the priming row now precedes every router store for that stream by data dependency rather than scheduler timing — the same shape the TypeScript SDK uses.Motivation and Context
Fixes #1764. The natural race is hard to reproduce standalone (multiple parties report 0 hits across thousands of loopback iterations) but is observed consistently in production by several reporters on the issue. The unit-level head-of-line block is straightforward to demonstrate though: register two request streams, give B a consumer, leave A consumer-less, write responses A then B — on
mainB never arrives until A is closed; with this change B arrives immediately and A's response is buffered.The priming-order fix addresses a review finding that the buffer change widened a pre-existing window where a routed message could be stored before the priming event.
How Has This Been Tested?
tests/server/test_streamable_http_router.pydrives the routing layer directly: one test for the head-of-line block, one asserting[priming, m₁, …, m₅]event-store order with nosse_writerever scheduled.tests/shared/test_streamable_http.pypriming-helper unit tests updated for_mint_priming_event.Breaking Changes
None.
EventStoreABC is unchanged.Intentional behaviour changes:
_streamable_http_modern.pyis the structural fix onmain._maybe_send_priming_eventis replaced by_mint_priming_event(returns the wire dict instead of sending) and_run_sse_writer. Both are private; no public surface change.Types of changes
Checklist
Additional context
The buffer-size approach matches what reporters on #1764 have field-validated (sed-patching to 10). 16 covers the reported single-response list-request case by construction (one message per id) plus headroom for small notification workloads, while staying bounded for the session-lifetime GET stream.
A separate change is needed for the stateless task-leak half of #1764 (#2145's request-scoped task-group approach); that's not bundled here.
AI Disclaimer