Skip to content

runner

ServerRunner - the per-connection handler kernel.

ServerRunner bridges the dispatch layer (on_request / on_notify, untyped dicts) and the user's handler layer (typed Context, typed params). It is a pure kernel: it holds a pre-populated Connection and reads connection.protocol_version / connection.outbound as facts. Driving a dispatcher loop and tearing down the connection live in the free-function drivers (serve_connection, serve_loop, serve_dual_era_loop, serve_one); the entry constructs the Connection, the driver tears it down.

ServerRunner holds a Server directly - Server is the registry.

CallNext module-attribute

CallNext = Callable[
    ["ServerRequestContext[Any, Any]"],
    Awaitable[HandlerResult],
]

Invokes the rest of the chain. Pass the ctx through; rewrite method or params with dataclasses.replace(ctx, ...) to alter what the handler sees.

ServerMiddleware

Bases: Protocol[_MwLifespanT]

Context-tier middleware: (ctx, call_next) -> result.

Runs at the top of ServerRunner._on_request / _on_notify after ctx is built but before any validation, lookup, or handshake. Wraps every inbound request and notification: initialize, the pre-init gate, METHOD_NOT_FOUND, params validation, the handler call, and notifications/initialized all run inside call_next(ctx). notifications/cancelled is observed too; the dispatcher applies the cancellation itself, then forwards the notification. A request-side failure reaches the middleware as a raised MCPError (or ValidationError for malformed params) so observation/logging middleware can record it. Listed outermost-first on Server.middleware.

The method and the raw inbound params are ctx.method and ctx.params (no model validation has happened yet). To rewrite either before the handler runs, pass an adjusted context: await call_next(replace(ctx, params=...)). ctx.request_id is None distinguishes a notification from a request. For notifications call_next(ctx) returns None (a dropped or unhandled notification also returns None) and the middleware's own return value is discarded.

Warning

initialize is handled inline - the dispatcher does not read further inbound messages until the middleware chain returns. Awaiting a server-to-client request (ctx.session.send_request, send_ping, ...) while handling initialize therefore deadlocks the connection: the response can never be dequeued. Send-and-forget notifications are safe. initialize is observed but not rewritable: the post-chain handshake commit reads the wire params, so to veto the handshake raise before call_next().

Server[L].middleware holds ServerMiddleware[L], so an app-specific middleware sees ctx.lifespan_context: L. While the context is the mutable ServerRequestContext dataclass it is invariant in L, so a reusable middleware should be typed ServerMiddleware[Any] to register on any Server[L].

Source code in src/mcp/server/context.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class ServerMiddleware(Protocol[_MwLifespanT]):
    """Context-tier middleware: `(ctx, call_next) -> result`.

    Runs at the top of `ServerRunner._on_request` / `_on_notify` after `ctx`
    is built but before any validation, lookup, or handshake. Wraps every
    inbound request and notification: `initialize`, the pre-init gate,
    `METHOD_NOT_FOUND`, params validation, the handler call, and
    `notifications/initialized` all run inside `call_next(ctx)`.
    `notifications/cancelled` is observed too; the dispatcher applies the
    cancellation itself, then forwards the notification. A request-side
    failure reaches the middleware as a raised `MCPError` (or
    `ValidationError` for malformed params) so observation/logging middleware
    can record it. Listed outermost-first on `Server.middleware`.

    The method and the raw inbound params are `ctx.method` and `ctx.params` (no
    model validation has happened yet). To rewrite either before the handler
    runs, pass an adjusted context: `await call_next(replace(ctx, params=...))`.
    `ctx.request_id is None` distinguishes a notification from a request. For
    notifications `call_next(ctx)` returns `None` (a dropped or unhandled
    notification also returns `None`) and the middleware's own return value is
    discarded.

    !!! warning
        `initialize` is handled inline - the dispatcher does not read
        further inbound messages until the middleware chain returns. Awaiting a
        server-to-client request (`ctx.session.send_request`, `send_ping`, ...)
        while handling `initialize` therefore deadlocks the connection: the
        response can never be dequeued. Send-and-forget notifications are safe.
        `initialize` is observed but not rewritable: the post-chain handshake
        commit reads the wire params, so to veto the handshake raise *before*
        `call_next()`.

    `Server[L].middleware` holds `ServerMiddleware[L]`, so an app-specific
    middleware sees `ctx.lifespan_context: L`. While the context is the
    mutable `ServerRequestContext` dataclass it is invariant in `L`, so a
    reusable middleware should be typed `ServerMiddleware[Any]` to register on
    any `Server[L]`.
    """

    # TODO(maxisbey): once `_make_context` returns the (covariant) `Context[L]`
    # again, restore `_MwLifespanT` to `contravariant=True` and retype `ctx`
    # below to `Context[_MwLifespanT]` so reusable middleware can be
    # `ServerMiddleware[object]` instead of `ServerMiddleware[Any]`.

    async def __call__(
        self,
        ctx: ServerRequestContext[_MwLifespanT, Any],
        call_next: CallNext,
    ) -> HandlerResult: ...

aclose_shielded async

aclose_shielded(connection: Connection) -> None

Unwind connection.exit_stack under a shielded, bounded scope.

Called from a driver's finally: the shield lets per-connection cleanup callbacks run even when the driver itself is being cancelled, the _EXIT_STACK_CLOSE_TIMEOUT bound stops a hung callback wedging shutdown, and a raising callback is logged-and-swallowed so it never masks the driver's own exception.

Source code in src/mcp/server/runner.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def aclose_shielded(connection: Connection) -> None:
    """Unwind ``connection.exit_stack`` under a shielded, bounded scope.

    Called from a driver's ``finally``: the shield lets per-connection cleanup
    callbacks run even when the driver itself is being cancelled, the
    `_EXIT_STACK_CLOSE_TIMEOUT` bound stops a hung callback wedging shutdown,
    and a raising callback is logged-and-swallowed so it never masks the
    driver's own exception.
    """
    with anyio.move_on_after(_EXIT_STACK_CLOSE_TIMEOUT, shield=True) as scope:
        try:
            await connection.exit_stack.aclose()
        except Exception:
            logger.exception("connection exit_stack cleanup raised")
    if scope.cancelled_caught:
        logger.warning(
            "connection exit_stack cleanup exceeded %s seconds; abandoning remaining callbacks",
            _EXIT_STACK_CLOSE_TIMEOUT,
        )

ServerRunner dataclass

Bases: Generic[LifespanT]

Per-connection handler kernel. One instance per client connection.

Source code in src/mcp/server/runner.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
@dataclass
class ServerRunner(Generic[LifespanT]):
    """Per-connection handler kernel. One instance per client connection."""

    server: Server[LifespanT]
    connection: Connection
    lifespan_state: LifespanT
    _: KW_ONLY
    init_options: InitializationOptions | None = None
    """`InitializeResult` payload. Defaults to `server.create_initialization_options()`."""

    @cached_property
    def on_request(self) -> OnRequest:
        return self._on_request

    @cached_property
    def on_notify(self) -> OnNotify:
        return self._on_notify

    async def _on_request(
        self,
        dctx: DispatchContext[TransportContext],
        method: str,
        params: Mapping[str, Any] | None,
    ) -> dict[str, Any]:
        meta = _extract_meta(params)
        version = self.connection.protocol_version
        ctx = self._make_context(dctx, method, params, meta, version)

        async def _inner(ctx: ServerRequestContext[LifespanT, Any]) -> HandlerResult:
            # Read method/params off `ctx` so a middleware that rewrote them via
            # `call_next(replace(ctx, ...))` reaches lookup and the handler.
            method, params = ctx.method, ctx.params
            # Pinned compat: spec methods are surface-validated before lookup,
            # so malformed params are INVALID_PARAMS even with no handler
            # registered. Custom methods miss the monolith map and fall through
            # to `entry.params_type` exactly as before.
            if method in _methods.SPEC_CLIENT_METHODS:
                try:
                    _methods.validate_client_request(method, version, params)
                except KeyError:
                    raise MCPError(code=METHOD_NOT_FOUND, message="Method not found", data=method) from None
            # TODO(L29): the 2026-07-28 spec drops the handshake; this branch and
            # the gate become a per-version legacy path then. Initialize runs inline
            # (read loop parked), so awaiting the peer anywhere on this path deadlocks.
            if method == "initialize":
                return self._serialize(method, version, self._handle_initialize(params))
            # Methods without a handler are METHOD_NOT_FOUND regardless of
            # initialization state: JSON-RPC 2.0 reserves -32601 for "not
            # available on this server", and clients probing a server before
            # the handshake key off that code. The init gate below therefore
            # only ever applies to methods the server actually serves.
            entry = self.server.get_request_handler(method)
            if entry is None:
                raise MCPError(code=METHOD_NOT_FOUND, message="Method not found", data=method)
            if not self.connection.initialize_accepted and method not in _INIT_EXEMPT:
                # Pinned compat: the same error shape the union validation produced.
                raise MCPError(code=INVALID_PARAMS, message="Invalid request parameters", data="")
            # Absent params validate as {} (required fields still reject), so
            # the handler receives the model with its defaults, never None.
            typed_params = entry.params_type.model_validate({} if params is None else params, by_name=False)
            result = await entry.handler(ctx, typed_params)
            if isinstance(result, ErrorData):
                # Raise inside the chain so middleware observes the failure.
                raise MCPError.from_error_data(result)
            # Fill cache hints on the handler result, before the serialize sieve
            # decides whether the negotiated version carries the fields at all.
            # MRTR carve-out: `input_required` interim results, typed or mapping, never get hints.
            if (hint := self.server.cache_hints.get(method)) is not None:
                if isinstance(result, CacheableResult):
                    result = apply_cache_hint(result, hint)
                elif isinstance(result, Mapping) and not _methods.is_input_required(result):
                    # Hint keys first so wire keys the handler set win, matching `apply_cache_hint` precedence.
                    result = {"ttlMs": hint.ttl_ms, "cacheScope": hint.scope, **result}
            # Dump and serialize inside the chain so the OpenTelemetry span (the
            # outermost middleware) records a failing handler return shape too.
            return self._serialize(method, version, result)

        call = self._compose_server_middleware(_inner)
        # `_inner` already produced the wire dict; a middleware that short-circuited
        # without `call_next` is trusted to return its own well-formed result.
        result = _dump_result(await call(ctx))
        if method == "initialize":
            # Commit only on chain success, so a middleware veto leaves no state.
            # Race-free: the read loop is parked until this call returns.
            # TODO: this re-reads the wire `params`, so a middleware that rewrote
            # `ctx.params` (or `ctx.method`, or short-circuited without `call_next`)
            # can leave `connection.protocol_version` out of step with the
            # `InitializeResult` `_inner` produced. Resolve when `initialize` becomes
            # a built-in handler so commit and result derive from one negotiation.
            self.connection.client_params, self.connection.protocol_version = self._negotiate_initialize(params)
        return result

    async def _on_notify(
        self,
        dctx: DispatchContext[TransportContext],
        method: str,
        params: Mapping[str, Any] | None,
    ) -> None:
        meta = _extract_meta(params)
        version = self.connection.protocol_version
        ctx = self._make_context(dctx, method, params, meta, version)

        async def _inner(ctx: ServerRequestContext[LifespanT, Any]) -> None:
            method, params = ctx.method, ctx.params
            if method in _methods.SPEC_CLIENT_NOTIFICATION_METHODS:
                try:
                    _methods.validate_client_notification(method, version, params)
                except KeyError:
                    logger.debug("dropped %r: not defined at %s", method, version)
                    return
                except ValidationError:
                    logger.warning("dropped %r: malformed params", method)
                    return
            if method == "notifications/initialized":
                # Surface validation above already rejected a malformed body, so
                # commit; fall through so a registered handler observes an
                # initialized connection.
                self.connection.initialized.set()
            elif not self.connection.initialize_accepted:
                logger.debug("dropped %s: received before initialization", method)
                return
            entry = self.server.get_notification_handler(method)
            if entry is None:
                logger.debug("no handler for notification %s", method)
                return
            # Same absent-params contract as requests.
            try:
                typed_params = entry.params_type.model_validate({} if params is None else params, by_name=False)
            except ValidationError:
                logger.warning("dropped %r: malformed params", method)
                return
            await entry.handler(ctx, typed_params)

        call = self._compose_server_middleware(_inner)
        try:
            await call(ctx)
        except Exception:
            # A crashing handler must not cancel the dispatcher's task group;
            # middleware saw the raise out of call_next() first.
            logger.exception("notification handler for %r raised", method)

    def _compose_server_middleware(self, inner: CallNext) -> CallNext:
        """Wrap `inner` in `Server.middleware`, outermost-first.

        Shared by `_on_request` and `_on_notify` so the same middleware chain
        observes every inbound message. The composed callable takes the `ctx`
        at call time, so a middleware can rewrite it for the rest of the chain.
        """
        call = inner
        for middleware in reversed(self.server.middleware):
            call = partial(_apply_middleware, middleware, call)
        return call

    def _make_context(
        self,
        dctx: DispatchContext[TransportContext],
        method: str,
        params: Mapping[str, Any] | None,
        meta: RequestParamsMeta | None,
        protocol_version: str,
    ) -> ServerRequestContext[LifespanT, Any]:
        # TODO(L54): remove for Context rework. Reads the SHTTP per-request
        # data off the raw `dctx.message_metadata` carrier; replace with the
        # per-transport context once that lands.
        md = dctx.message_metadata
        if isinstance(md, ServerMessageMetadata):
            request = md.request_context
            close_sse_stream = md.close_sse_stream
            close_standalone_sse_stream = md.close_standalone_sse_stream
        else:
            request = close_sse_stream = close_standalone_sse_stream = None
        # Per-request session: `dctx` is the request-scoped channel (auto-threads
        # its own request_id on streamable HTTP); the standalone channel is read
        # off `connection.outbound`. `related_request_id` on the public API selects.
        session = ServerSession(dctx, self.connection)
        return ServerRequestContext(
            session=session,
            lifespan_context=self.lifespan_state,
            method=method,
            params=params,
            request_id=dctx.request_id,
            meta=meta,
            protocol_version=protocol_version,
            request=request,
            close_sse_stream=close_sse_stream,
            close_standalone_sse_stream=close_standalone_sse_stream,
        )

    @staticmethod
    def _serialize(method: str, version: str, result: HandlerResult) -> dict[str, Any]:
        """Dump a handler result to the wire dict, serializing spec methods.

        Runs inside the middleware chain so the OpenTelemetry span observes a
        failing return shape (unsupported type, malformed spec result) as an
        error rather than closing on a request that the client sees fail.
        """
        dumped = _dump_result(result)
        # TODO(L56): reject resultType values outside {"complete", "input_required"} unless the
        # corresponding extension is in this request's _meta clientCapabilities.extensions; the
        # explicit MUST-reject is client-side (basic/index.mdx ResultType), this enforces it proactively.
        if method not in _methods.SPEC_CLIENT_METHODS:
            return dumped
        try:
            return _methods.serialize_server_result(method, version, dumped)
        except ValidationError:
            # Server bug, not client fault. Detail stays in the server log:
            # pydantic messages echo the result body.
            logger.exception("handler for %r returned an invalid result", method)
            raise MCPError(code=INTERNAL_ERROR, message="Handler returned an invalid result") from None

    @staticmethod
    def _negotiate_initialize(params: Mapping[str, Any] | None) -> tuple[InitializeRequestParams, str]:
        """Validate `initialize` params and pick the protocol version."""
        init = InitializeRequestParams.model_validate(params or {}, by_name=False)
        requested = init.protocol_version
        negotiated = requested if requested in HANDSHAKE_PROTOCOL_VERSIONS else LATEST_HANDSHAKE_VERSION
        return init, negotiated

    def _handle_initialize(self, params: Mapping[str, Any] | None) -> InitializeResult:
        """Build the `initialize` result; state commits later in `_on_request`."""
        _, negotiated = self._negotiate_initialize(params)
        opts = self.init_options if self.init_options is not None else self.server.create_initialization_options()
        return InitializeResult(
            protocol_version=negotiated,
            capabilities=opts.capabilities,
            server_info=Implementation(
                name=opts.server_name,
                title=opts.title,
                description=opts.description,
                version=opts.server_version,
                website_url=opts.website_url,
                icons=opts.icons,
            ),
            instructions=opts.instructions,
        )

init_options class-attribute instance-attribute

init_options: InitializationOptions | None = None

InitializeResult payload. Defaults to server.create_initialization_options().

serve_connection async

serve_connection(
    server: Server[LifespanT],
    dispatcher: Dispatcher[Any],
    *,
    connection: Connection,
    lifespan_state: LifespanT,
    init_options: InitializationOptions | None = None,
    task_status: TaskStatus[None] = TASK_STATUS_IGNORED
) -> None

Drive dispatcher until the underlying channel closes.

The loop-mode driver: builds the kernel, hands on_request/on_notify to dispatcher.run(), and tears down connection.exit_stack (shielded) on the way out. The entry constructs the Connection; this only consumes it.

Source code in src/mcp/server/runner.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
async def serve_connection(
    server: Server[LifespanT],
    dispatcher: Dispatcher[Any],
    *,
    connection: Connection,
    lifespan_state: LifespanT,
    init_options: InitializationOptions | None = None,
    task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
) -> None:
    """Drive ``dispatcher`` until the underlying channel closes.

    The loop-mode driver: builds the kernel, hands `on_request`/`on_notify`
    to `dispatcher.run()`, and tears down `connection.exit_stack` (shielded)
    on the way out. The entry constructs the `Connection`; this only consumes
    it.
    """
    runner = ServerRunner(server, connection, lifespan_state, init_options=init_options)
    try:
        await dispatcher.run(runner.on_request, runner.on_notify, task_status=task_status)
    finally:
        await aclose_shielded(connection)

serve_loop async

serve_loop(
    server: Server[LifespanT],
    read_stream: ReadStream[SessionMessage | Exception],
    write_stream: WriteStream[SessionMessage],
    *,
    lifespan_state: LifespanT,
    session_id: str | None = None,
    init_options: InitializationOptions | None = None,
    raise_exceptions: bool = False
) -> None

Drive server in loop mode over a stream pair until the channel closes.

Builds the loop-mode JSONRPCDispatcher + Connection and hands them to serve_connection, so loop-mode callers share one dispatcher-construction recipe (notably the inline_methods={"initialize"} rule). Callers that own a lifespan (the streamable-HTTP manager) pass it in; callers that don't (Server.run for stdio/memory) enter the lifespan and then call this.

Source code in src/mcp/server/runner.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
async def serve_loop(
    server: Server[LifespanT],
    read_stream: ReadStream[SessionMessage | Exception],
    write_stream: WriteStream[SessionMessage],
    *,
    lifespan_state: LifespanT,
    session_id: str | None = None,
    init_options: InitializationOptions | None = None,
    raise_exceptions: bool = False,
) -> None:
    """Drive ``server`` in loop mode over a stream pair until the channel closes.

    Builds the loop-mode `JSONRPCDispatcher` + `Connection` and hands them to
    `serve_connection`, so loop-mode callers share one dispatcher-construction
    recipe (notably the `inline_methods={"initialize"}` rule). Callers that own
    a lifespan (the streamable-HTTP manager) pass it in; callers that don't
    (`Server.run` for stdio/memory) enter the lifespan and then call this.
    """
    dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(
        read_stream,
        write_stream,
        raise_handler_exceptions=raise_exceptions,
        # Handle `initialize` inline so a client that pipelines it with the
        # next request (spec: SHOULD NOT, not MUST NOT) sees the initialized
        # state instead of failing the init-gate.
        inline_methods=frozenset({"initialize"}),
    )
    connection = Connection.for_loop(dispatcher, session_id=session_id)
    await serve_connection(
        server, dispatcher, connection=connection, lifespan_state=lifespan_state, init_options=init_options
    )

serve_dual_era_loop async

serve_dual_era_loop(
    server: Server[LifespanT],
    read_stream: ReadStream[SessionMessage | Exception],
    write_stream: WriteStream[SessionMessage],
    *,
    lifespan_state: LifespanT,
    session_id: str | None = None,
    init_options: InitializationOptions | None = None,
    raise_exceptions: bool = False
) -> None

Drive server over a duplex stream pair, serving both protocol eras.

The stream-pair counterpart of the modern HTTP entry's era router. Era is a property of the connection, decided by how the client opens it, and mid-stream switching is undefined - so the first era-distinctive message locks the connection (matching the typescript-sdk):

  • initialize locks legacy: the connection behaves exactly like serve_loop for its lifetime, and modern envelope traffic is rejected with INVALID_REQUEST.
  • A request carrying the modern _meta envelope triple - or server/discover, a modern-only method - locks modern: every request is classified (classify_inbound_request) and served single-exchange via serve_one with a born-ready per-request Connection, the same dispatch model as the modern HTTP entry. A later initialize is rejected with UNSUPPORTED_PROTOCOL_VERSION naming the modern versions.

Modern connections push notifications over the duplex pipe but refuse server-initiated requests on both channels (the modern protocol forbids them). A rejected classification (malformed envelope, unsupported version) never locks the era, so a failed probe leaves the legacy handshake available - released auto-negotiating clients fall back on any error code except -32022.

Source code in src/mcp/server/runner.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
async def serve_dual_era_loop(
    server: Server[LifespanT],
    read_stream: ReadStream[SessionMessage | Exception],
    write_stream: WriteStream[SessionMessage],
    *,
    lifespan_state: LifespanT,
    session_id: str | None = None,
    init_options: InitializationOptions | None = None,
    raise_exceptions: bool = False,
) -> None:
    """Drive `server` over a duplex stream pair, serving both protocol eras.

    The stream-pair counterpart of the modern HTTP entry's era router. Era is
    a property of the connection, decided by how the client opens it, and
    mid-stream switching is undefined - so the first era-distinctive message
    locks the connection (matching the typescript-sdk):

    - `initialize` locks legacy: the connection behaves exactly like
      `serve_loop` for its lifetime, and modern envelope traffic is rejected
      with INVALID_REQUEST.
    - A request carrying the modern `_meta` envelope triple - or
      `server/discover`, a modern-only method - locks modern: every request is
      classified (`classify_inbound_request`) and served single-exchange via
      `serve_one` with a born-ready per-request `Connection`, the same
      dispatch model as the modern HTTP entry. A later `initialize` is
      rejected with UNSUPPORTED_PROTOCOL_VERSION naming the modern versions.

    Modern connections push notifications over the duplex pipe but refuse
    server-initiated requests on both channels (the modern protocol forbids
    them). A rejected classification (malformed envelope, unsupported version)
    never locks the era, so a failed probe leaves the legacy handshake
    available - released auto-negotiating clients fall back on any error code
    except -32022.
    """
    dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(
        read_stream,
        write_stream,
        raise_handler_exceptions=raise_exceptions,
        # `initialize` inline for the same pipelining reason as `serve_loop`;
        # `server/discover` inline so the modern era lock commits before the
        # next pipelined message is read.
        inline_methods=frozenset({"initialize", "server/discover"}),
    )
    loop_connection = Connection.for_loop(dispatcher, session_id=session_id)
    loop_runner = ServerRunner(server, loop_connection, lifespan_state, init_options=init_options)
    standalone_outbound = NotifyOnlyOutbound(dispatcher)
    era: Literal["unlocked", "legacy", "modern"] = "unlocked"
    modern_version = LATEST_MODERN_VERSION

    async def serve_modern(
        dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
    ) -> dict[str, Any]:
        nonlocal era, modern_version
        route = classify_inbound_request({"method": method, "params": params})
        if isinstance(route, InboundLadderRejection):
            raise MCPError(code=route.code, message=route.message, data=route.data)
        if era != "modern":
            era, modern_version = "modern", route.protocol_version
        if method == "subscriptions/listen":
            # The registered listen handler assumes the HTTP entry's stream
            # semantics; served over a stream pair it would wedge. Reject until
            # this transport grows its own listen design.
            raise MCPError(
                code=METHOD_NOT_FOUND, message="subscriptions/listen is not served over this transport", data=method
            )
        connection = Connection.from_envelope(
            route.protocol_version,
            route.client_info,
            route.client_capabilities,
            outbound=standalone_outbound,
        )
        return await serve_one(
            server,
            _NoServerRequestsDispatchContext(dctx),
            method,
            params,
            connection=connection,
            lifespan_state=lifespan_state,
        )

    async def on_request(
        dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
    ) -> dict[str, Any]:
        nonlocal era
        if era == "legacy":
            if method == "server/discover" or _has_modern_envelope(params):
                raise MCPError(
                    code=INVALID_REQUEST,
                    message="connection is locked to the legacy handshake era; "
                    "modern envelope requests are not accepted",
                )
            return await loop_runner.on_request(dctx, method, params)
        if era == "modern" and method == "initialize":
            raise MCPError(
                code=UNSUPPORTED_PROTOCOL_VERSION,
                message="connection already negotiated a modern protocol version",
                data=_initialize_after_modern_data(params),
            )
        if era == "modern" or method == "server/discover" or _has_modern_envelope(params):
            return await serve_modern(dctx, method, params)
        result = await loop_runner.on_request(dctx, method, params)
        if method == "initialize":
            # Lock only on success: a failed handshake leaves both eras open.
            era = "legacy"
        return result

    async def on_notify(dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None:
        if era != "modern":
            return await loop_runner.on_notify(dctx, method, params)
        # The envelope is request-only, so notifications inherit the
        # connection's locked version.
        connection = Connection.from_envelope(modern_version, None, None, outbound=standalone_outbound)
        notify_runner = ServerRunner(server, connection, lifespan_state)
        try:
            await notify_runner.on_notify(_NoServerRequestsDispatchContext(dctx), method, params)
        finally:
            await aclose_shielded(connection)

    try:
        await dispatcher.run(on_request, on_notify)
    finally:
        await aclose_shielded(loop_connection)

serve_one async

serve_one(
    server: Server[LifespanT],
    dctx: DispatchContext[TransportContext],
    method: str,
    params: Mapping[str, Any] | None,
    *,
    connection: Connection,
    lifespan_state: LifespanT
) -> dict[str, Any]

Handle a single request (method, params) and return its result dict.

The single-exchange driver: builds the kernel, runs on_request once under dctx, and tears down connection.exit_stack (shielded) on the way out. The entry constructs the (born-ready) Connection and the dctx; this only consumes them.

Raises whatever the handler chain raises (MCPError / ValidationError / unmapped); callers own the exception-to-wire mapping.

Source code in src/mcp/server/runner.py
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
async def serve_one(
    server: Server[LifespanT],
    dctx: DispatchContext[TransportContext],
    method: str,
    params: Mapping[str, Any] | None,
    *,
    connection: Connection,
    lifespan_state: LifespanT,
) -> dict[str, Any]:
    """Handle a single request ``(method, params)`` and return its result dict.

    The single-exchange driver: builds the kernel, runs `on_request` once under
    `dctx`, and tears down `connection.exit_stack` (shielded) on the way out.
    The entry constructs the (born-ready) `Connection` and the `dctx`; this
    only consumes them.

    Raises whatever the handler chain raises (`MCPError` / `ValidationError` /
    unmapped); callers own the exception-to-wire mapping.
    """
    runner = ServerRunner(server, connection, lifespan_state)
    try:
        return await runner.on_request(dctx, method, params)
    finally:
        await aclose_shielded(connection)

modern_on_request

modern_on_request(
    server: Server[LifespanT], lifespan_state: LifespanT
) -> OnRequest

Return an OnRequest callback that serves each call via serve_one with a fresh per-request Connection.

Wire this into the server side of a DirectDispatcher peer-pair to drive an in-process server on the modern per-request-envelope path (each request carries protocol version, client info, and capabilities in params._meta; no initialize handshake). Like serve_one, this raises whatever the handler chain raises - the dispatcher owns the exception-to-error mapping.

Source code in src/mcp/server/runner.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def modern_on_request(server: Server[LifespanT], lifespan_state: LifespanT) -> OnRequest:
    """Return an `OnRequest` callback that serves each call via `serve_one` with a fresh per-request `Connection`.

    Wire this into the server side of a `DirectDispatcher` peer-pair to drive an
    in-process server on the modern per-request-envelope path (each request
    carries protocol version, client info, and capabilities in `params._meta`;
    no `initialize` handshake). Like `serve_one`, this raises whatever the
    handler chain raises - the dispatcher owns the exception-to-error mapping.
    """

    async def handle(
        dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
    ) -> dict[str, Any]:
        meta = (params or {}).get("_meta", {})
        connection = Connection.from_envelope(
            meta.get(PROTOCOL_VERSION_META_KEY, LATEST_MODERN_VERSION),
            meta.get(CLIENT_INFO_META_KEY),
            meta.get(CLIENT_CAPABILITIES_META_KEY),
        )
        return await serve_one(server, dctx, method, params, connection=connection, lifespan_state=lifespan_state)

    return handle