-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block #2934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
maxisbey
wants to merge
3
commits into
main
Choose a base branch
from
fix/streamable-http-hol-buffer
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+292
−149
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟣 Pre-existing issue (not introduced by this PR, but adjacent to the priming logic rewritten here):
replay_senderhas nofinallycleanup, so when a resumed SSE connection ends, the closed writer stays in_sse_stream_writers[stream_id]and the_request_streams[stream_id]entry can linger — long-lived sessions with many resumptions accumulate stale entries. Afinallymirroring_run_sse_writer(pop_sse_stream_writers+_clean_up_memory_streams) would close the gap, either here or in the separately-tracked replay follow-up.Extended reasoning...
What the bug is. The replay path registers per-stream state but never tears it down. In
_replay_events→replay_sender, the code doesself._sse_stream_writers[stream_id] = sse_stream_writer(line 919), mints/sends the priming event, and createsself._request_streams[stream_id](line ~929). The other two SSE writer paths clean up after themselves:_run_sse_writerhas afinallythat pops_sse_stream_writers[request_id]and calls_clean_up_memory_streams(request_id), andstandalone_sse_writer'sfinallycalls_clean_up_memory_streams(GET_STREAM_KEY).replay_senderhas nofinallyat all — the outerfinallyin_replay_eventsonly closes the wire-levelsse_stream_writer/sse_stream_reader, not the dict entries.The code path that triggers it. The common case is the normal resume lifecycle: a client reconnects with
Last-Event-ID, the replay completes, the resumed request finishes (its response is delivered over the replay stream), and the client drops the GET. sse-starlette tears down the response andreplay_senderunwinds — but nothing removes_sse_stream_writers[stream_id](onlyclose_sse_stream()ever pops it on this path) or_request_streams[stream_id].Why existing code doesn't prevent it.
_request_streams[stream_id]is reaped lazily only ifmessage_routerlater attempts a send to that id and hitsBrokenResourceError— which for a request that has already completed never happens, so it persists untilterminate()orconnect()teardown._sse_stream_writersis worse: neitherterminate()norconnect()'sfinallyclears it, so those entries persist for the transport's lifetime. (One refinement to the original finding: the router does not keep buffering 16 messages and then block on the dead stream —replay_sender'sasync with msg_readercloses the receive end on unwind, so a stray routed message raisesBrokenResourceErrorimmediately and the entry self-heals. The leak is stale dict entries, not a wedge.)Concrete walk-through. (1) Client POSTs
tools/callid=7; tool emits notifications; client disconnects mid-stream. (2) Client reconnects:GET /mcpwithLast-Event-ID.replay_senderreplays missed events, registers_sse_stream_writers["7"]and_request_streams["7"], and tails live messages. (3) The tool finishes; the router stores and deposits theJSONRPCResponse;replay_senderforwards it; the client closes the GET. (4)replay_senderunwinds — but_sse_stream_writers["7"]still holds the closed writer and_request_streams["7"]still holds closed streams. Repeat for ids 8, 9, 10… on a long-lived session and the dicts grow monotonically. A secondary wrinkle: the writer is registered before_mint_priming_event, so a userEventStore.store_eventexception (swallowed by the broadexcept Exception) leaves an orphaned_sse_stream_writersentry with no matching_request_streamsentry. A later resume for the same stream id also finds the stale_request_streamsentry and skips re-registration/priming.Impact. Bounded, slow growth — one small dict entry plus a closed memory-stream pair per resumed request, capped by session lifetime for
_request_streamsand by transport lifetime for_sse_stream_writers. Not a correctness wedge, which is why this is flagged as pre-existing rather than blocking.How to fix. Give
replay_senderafinallymirroring_run_sse_writer:self._sse_stream_writers.pop(stream_id, None)(guarded for the case wherestream_idwas never resolved) andawait self._clean_up_memory_streams(stream_id). Since this PR already rewrote the priming logic in this exact block and notes the replay-path ordering window is tracked separately, this could ride along there if preferred.