-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Slim ServerMiddleware to (ctx, call_next) and add OpenTelemetryMiddleware
#2941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Any | ||
|
|
||
| from opentelemetry.trace import SpanKind, StatusCode | ||
| from pydantic import ValidationError | ||
|
|
||
| from mcp.server.context import CallNext, HandlerResult, ServerMiddleware, ServerRequestContext | ||
| from mcp.shared._otel import extract_trace_context, otel_span | ||
| from mcp.shared.exceptions import MCPError | ||
|
|
||
|
|
||
| class OpenTelemetryMiddleware(ServerMiddleware[Any]): | ||
| """Context-tier middleware that wraps each inbound message in an OpenTelemetry span. | ||
|
|
||
| Span name `"MCP handle <method> [<target>]"`, `mcp.method.name` attribute, W3C | ||
| trace context extracted from `params._meta` (SEP-414), and an ERROR status if | ||
| the handler raises. Requests and notifications both get a span; | ||
| `jsonrpc.request.id` is set only when `ctx.request_id` is present (notifications | ||
| have none). | ||
| """ | ||
|
|
||
| async def __call__(self, ctx: ServerRequestContext[Any, Any], call_next: CallNext) -> HandlerResult: | ||
| name = ctx.params.get("name") if ctx.params else None | ||
| target = name if isinstance(name, str) else None | ||
|
|
||
| attributes: dict[str, Any] = {"mcp.method.name": ctx.method} | ||
| if ctx.request_id is not None: | ||
| attributes["jsonrpc.request.id"] = str(ctx.request_id) | ||
|
|
||
| with otel_span( | ||
| name=f"MCP handle {ctx.method}{f' {target}' if target else ''}", | ||
| kind=SpanKind.SERVER, | ||
| attributes=attributes, | ||
| context=extract_trace_context(ctx.meta or {}), | ||
| record_exception=False, | ||
| set_status_on_exception=False, | ||
| ) as span: | ||
|
Check failure on line 38 in src/mcp/server/_otel.py
|
||
| try: | ||
| return await call_next(ctx) | ||
| except MCPError as e: | ||
| span.set_status(StatusCode.ERROR, e.error.message) | ||
| raise | ||
| except ValidationError: | ||
| # Mirror the sanitized wire response; pydantic messages carry client input. | ||
| span.set_status(StatusCode.ERROR, "Invalid request parameters") | ||
| raise | ||
| except Exception as e: | ||
| span.record_exception(e) | ||
| span.set_status(StatusCode.ERROR, str(e)) | ||
| raise | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from collections.abc import Mapping | ||
| from collections.abc import Awaitable, Mapping | ||
| from dataclasses import dataclass, field | ||
| from functools import partial, reduce | ||
| from typing import TYPE_CHECKING, Any, Generic, cast | ||
|
|
@@ -103,7 +103,7 @@ | |
| return "2025-11-25" | ||
|
|
||
|
|
||
| def otel_middleware(next_on_request: OnRequest) -> OnRequest: | ||
| def otel_middleware(call_next: OnRequest) -> OnRequest: | ||
| """Dispatch-tier middleware that wraps each request in an OpenTelemetry span. | ||
|
|
||
| Mirrors the span shape of the existing `Server._handle_request`: span name | ||
|
|
@@ -139,7 +139,7 @@ | |
| set_status_on_exception=False, | ||
| ) as span: | ||
| try: | ||
| return await next_on_request(dctx, method, params) | ||
| return await call_next(dctx, method, params) | ||
| except MCPError as e: | ||
| span.set_status(StatusCode.ERROR, e.error.message) | ||
| raise | ||
|
|
@@ -169,6 +169,14 @@ | |
| raise TypeError(f"handler returned {type(result).__name__}; expected BaseModel, dict, or None") | ||
|
|
||
|
|
||
| def _apply_middleware( | ||
| mw: ServerMiddleware[Any], call_next: CallNext, ctx: ServerRequestContext[Any, Any] | ||
| ) -> Awaitable[HandlerResult]: | ||
| """Adapt one middleware to the `CallNext` shape: bind `call_next`, take | ||
| `ctx` at call time so a rewritten context flows down the chain.""" | ||
| return mw(ctx, call_next) | ||
|
Comment on lines
+172
to
+177
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this necessary? Also, we should stop using "mw" in this code source!!! I don't need to guess variable names! |
||
|
|
||
|
|
||
| @dataclass | ||
| class ServerRunner(Generic[LifespanT]): | ||
| """Per-connection orchestrator. One instance per client connection.""" | ||
|
|
@@ -244,15 +252,18 @@ | |
| ) -> dict[str, Any]: | ||
| meta = _extract_meta(params) | ||
| version = _resolve_protocol_version(self.connection.protocol_version, meta, dctx.message_metadata) | ||
| ctx = self._make_context(dctx, meta, version) | ||
| ctx = self._make_context(dctx, method, params, meta, version) | ||
| is_spec_method = method in _methods.SPEC_CLIENT_METHODS | ||
|
|
||
| async def _inner() -> HandlerResult: | ||
| async def _inner(ctx: ServerRequestContext[LifespanT, Any]) -> HandlerResult: | ||
| # Read method/params off `ctx` so a middleware that rewrote them via | ||
| # `call_next(replace(ctx, ...))` reaches lookup and the handler. | ||
| method, params = ctx.method, ctx.params | ||
| # Pinned compat: spec methods are surface-validated before lookup, | ||
| # so malformed params are INVALID_PARAMS even with no handler | ||
| # registered. Custom methods miss the monolith map and fall through | ||
| # to `entry.params_type` exactly as before. | ||
| if is_spec_method: | ||
| if method in _methods.SPEC_CLIENT_METHODS: | ||
| try: | ||
| _methods.validate_client_request(method, version, params) | ||
| except KeyError: | ||
|
|
@@ -279,14 +290,14 @@ | |
| result = await entry.handler(ctx, typed_params) | ||
| if isinstance(result, ErrorData): | ||
| # Raise inside the chain so middleware observes the failure. | ||
| raise MCPError.from_error_data(result) | ||
| return result | ||
|
|
||
| call = self._compose_server_middleware(ctx, method, params, _inner) | ||
| result = _dump_result(await call()) | ||
| call = self._compose_server_middleware(_inner) | ||
| result = _dump_result(await call(ctx)) | ||
| # TODO: reject resultType values outside {"complete", "input_required"} unless the | ||
| # corresponding extension is in this request's _meta clientCapabilities.extensions; the | ||
| # explicit MUST-reject is client-side (basic/index.mdx ResultType), this enforces it proactively. | ||
|
Check failure on line 300 in src/mcp/server/runner.py
|
||
|
Comment on lines
293
to
300
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The post-chain Extended reasoning...The bug Inside if method == "initialize":
self.connection.client_params, self.connection.protocol_version = self._negotiate_initialize(params)Here Why nothing else prevents it Rewriting initialize is squarely within the contract this PR advertises: the migration doc and the Impact The wire response and the committed connection state disagree: every subsequent Step-by-step proof
Fix Key the post-chain commit off the final context rather than the pre-chain locals — e.g. perform the commit inside
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. honestly not super sure what to do about this one @Kludex do you have any ideas? |
||
| if is_spec_method: | ||
| try: | ||
| result = _methods.serialize_server_result(method, version, result) | ||
|
|
@@ -313,9 +324,10 @@ | |
| ) -> None: | ||
| meta = _extract_meta(params) | ||
| version = _resolve_protocol_version(self.connection.protocol_version, meta, dctx.message_metadata) | ||
| ctx = self._make_context(dctx, meta, version) | ||
| ctx = self._make_context(dctx, method, params, meta, version) | ||
|
|
||
| async def _inner() -> None: | ||
| async def _inner(ctx: ServerRequestContext[LifespanT, Any]) -> None: | ||
| method, params = ctx.method, ctx.params | ||
| if method in _methods.SPEC_CLIENT_NOTIFICATION_METHODS: | ||
| try: | ||
| _methods.validate_client_notification(method, version, params) | ||
|
|
@@ -345,33 +357,33 @@ | |
| return | ||
| await entry.handler(ctx, typed_params) | ||
|
|
||
| call = self._compose_server_middleware(ctx, method, params, _inner) | ||
| call = self._compose_server_middleware(_inner) | ||
| try: | ||
| await call() | ||
| await call(ctx) | ||
| except Exception: | ||
| # A crashing handler must not cancel the dispatcher's task group; | ||
| # middleware saw the raise out of call_next() first. | ||
| logger.exception("notification handler for %r raised", method) | ||
|
|
||
| def _compose_server_middleware( | ||
| self, | ||
| ctx: ServerRequestContext[LifespanT, Any], | ||
| method: str, | ||
| params: Mapping[str, Any] | None, | ||
| inner: CallNext, | ||
| ) -> CallNext: | ||
| def _compose_server_middleware(self, inner: CallNext) -> CallNext: | ||
| """Wrap `inner` in `Server.middleware`, outermost-first. | ||
|
|
||
| Shared by `_on_request` and `_on_notify` so the same middleware chain | ||
| observes every inbound message. | ||
| observes every inbound message. The composed callable takes the `ctx` | ||
| at call time, so a middleware can rewrite it for the rest of the chain. | ||
| """ | ||
| call = inner | ||
| call: CallNext = inner | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary type hint. |
||
| for mw in reversed(self.server.middleware): | ||
| call = partial(mw, ctx, method, params, call) | ||
| call = partial(_apply_middleware, mw, call) | ||
| return call | ||
|
|
||
| def _make_context( | ||
| self, dctx: DispatchContext[TransportContext], meta: RequestParamsMeta | None, protocol_version: str | ||
| self, | ||
| dctx: DispatchContext[TransportContext], | ||
| method: str, | ||
| params: Mapping[str, Any] | None, | ||
| meta: RequestParamsMeta | None, | ||
| protocol_version: str, | ||
| ) -> ServerRequestContext[LifespanT, Any]: | ||
| # TODO(maxisbey): remove for Context rework. Reads the SHTTP per-request | ||
| # data off the raw `dctx.message_metadata` carrier; replace with the | ||
|
|
@@ -386,6 +398,8 @@ | |
| return ServerRequestContext( | ||
| session=self.session, | ||
| lifespan_context=self.lifespan_state, | ||
| method=method, | ||
| params=params, | ||
| request_id=dctx.request_id, | ||
| meta=meta, | ||
| protocol_version=protocol_version, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 When the inbound message has no
_meta/traceparent,extract_trace_context(ctx.meta or {})returns a fresh emptyContext, and passing that explicit context tootel_spanoverrides the ambient current context — so the span becomes an orphaned trace root instead of parenting to the already-current span (e.g. the dispatch-tierotel_middlewarespan thatServer.run()installs by default). This diverges from the dispatch-tier middleware this class mirrors, which passesparent=Nonewhen params has no_meta. Passcontext=None(or only pass the extracted context whenctx.metaactually carries trace headers) so the no-traceparent case attaches to the ambient context.Extended reasoning...
The bug.
OpenTelemetryMiddleware.__call__always callsotel_span(..., context=extract_trace_context(ctx.meta or {})).extract_trace_contextcallsopentelemetry.propagate.extract(carrier); the W3CTraceContextTextMapPropagatorcreates a fresh emptyContext()when nocontextargument is supplied and returns it unchanged when the carrier has notraceparent. So whenever the client does not propagate trace context in_meta(i.e. every non-OTel-instrumented client), the middleware starts its span with an explicit emptyContextrather thancontext=None. In the OTel SDK,Tracer.start_spanresolves the parent viatrace.get_current_span(context): with an explicit context the ambient/current context is ignored, the lookup yieldsINVALID_SPAN, and the new span becomes a brand-new trace root. Onlycontext=Nonefalls back to the ambient current span.\n\nThe code path that triggers it.Server.run()(src/mcp/server/lowlevel/server.py:431) unconditionally installs the dispatch-tierotel_middlewareindispatch_middleware. That middleware wraps_on_requestwithstart_as_current_spanin the same task, so its span is the current ambient span around the entire context-tier middleware chain. A user who appends the newOpenTelemetryMiddlewaretoserver.middlewareand is hit by a client that sends no_metatherefore gets a context-tier span that is an orphaned root in a separate trace, instead of a child of the dispatch-tier (or any other ambient transport/ASGI) span.\n\nStep-by-step proof.\n1. A non-OTel client sendstools/callwith no_meta.\n2. The composed dispatch chain runsotel_middleware:paramshas no_meta, so it passesparent=None, andstart_as_current_spanmakes span A ("MCP handle tools/call ...") the current span.\n3. Inside span A,_on_requestbuildsctxwithmeta=Noneand runs the context-tier chain.OpenTelemetryMiddlewarecomputesextract_trace_context(ctx.meta or {})=extract({})→ a fresh emptyContext().\n4.otel_spanforwards that explicit empty context tostart_as_current_span. The SDK resolves the parent from that context →INVALID_SPAN→ span B is created as a new trace root with a newtrace_id.\n5. Result: the same request produces two disjoint traces (A and B) instead of B nesting under A. Withcontext=Nonein step 4, B would have parented to A.\n\nWhy existing code/tests don't catch it.test_extracts_trace_context_from_metaalways injects atraceparent(the in-SDK test client path), and the notification test never asserts onspan.parent, so the no-traceparent parenting behavior is unpinned.\n\nOn the counter-argument that this is the conventional server-instrumentation pattern. ASGI/WSGI instrumentations do attach the extracted (possibly empty) context, but they sit at the outermost edge of the process where there is no meaningful ambient span to detach from — that trade-off doesn't apply here, where the SDK itself installs an enclosing dispatch-tier span by default. It's also true that the dispatch-tier middleware extracts an explicit context whenever_metais present even without atraceparent; but the divergence at issue is the no-_metacase (the common case for external clients), where the dispatch-tier middleware deliberately passesparent=Noneand this class — whose docstring says it mirrors that span shape — does not. The duplicate "MCP handle" spans when both tiers are enabled are documented and fine; what's broken is that they land in two unrelated traces rather than nesting, which is a telemetry-correctness defect in the very feature this PR adds, not merely a span-shape preference.\n\nFix. Passcontext=Nonewhenctx.metacarries no trace headers — e.g. only callextract_trace_contextwhenctx.metacontainstraceparent(mirroring the dispatch-tiermatchon_meta), or changeextract_trace_contextto returnNonefor an empty/header-less carrier. Impact is telemetry-only (no request-handling breakage), but it should be fixed in this PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense