Skip to content

request_state

Integrity protection for the multi-round-trip requestState (MCP 2026-07-28).

The spec requires servers to treat the client-echoed requestState as attacker-controlled: RequestStateBoundary seals every outgoing value and verifies every inbound echo, so handlers only ever see plaintext they minted.

InvalidRequestState

Bases: Exception

A sealed requestState token failed verification.

The message is a log-only reason code; the boundary never puts it on the wire.

Source code in src/mcp/server/request_state.py
46
47
48
49
50
class InvalidRequestState(Exception):
    """A sealed `requestState` token failed verification.

    The message is a log-only reason code; the boundary never puts it on the wire.
    """

RequestStateCodec

Bases: Protocol

Authenticated crypto over the framework's request-state envelope.

The framework stamps and re-verifies every envelope claim (expiry, request binding, principal); a codec only provides integrity and, ideally, confidentiality (a sign-only codec leaves the payload client-readable).

Requirements: unseal(seal(payload)) round-trips, and unseal raises InvalidRequestState for any token it did not mint unmodified; tokens never name their algorithm (version with a format prefix bound under the authentication tag, RFC 8725); comparisons are constant-time. Both methods are synchronous, so cache key material rather than calling a KMS per token.

Source code in src/mcp/server/request_state.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class RequestStateCodec(Protocol):
    """Authenticated crypto over the framework's request-state envelope.

    The framework stamps and re-verifies every envelope claim (expiry, request
    binding, principal); a codec only provides integrity and, ideally,
    confidentiality (a sign-only codec leaves the payload client-readable).

    Requirements: `unseal(seal(payload))` round-trips, and `unseal` raises
    `InvalidRequestState` for any token it did not mint unmodified; tokens
    never name their algorithm (version with a format prefix bound under the
    authentication tag, RFC 8725); comparisons are constant-time. Both methods
    are synchronous, so cache key material rather than calling a KMS per token.
    """

    def seal(self, payload: bytes) -> str:
        """Return an opaque URL-safe token protecting `payload`."""
        ...

    def unseal(self, token: str) -> bytes:
        """Reverse `seal`.

        Raises:
            InvalidRequestState: Malformed, unauthentic, or unknown-key token.
        """
        ...

seal

seal(payload: bytes) -> str

Return an opaque URL-safe token protecting payload.

Source code in src/mcp/server/request_state.py
67
68
69
def seal(self, payload: bytes) -> str:
    """Return an opaque URL-safe token protecting `payload`."""
    ...

unseal

unseal(token: str) -> bytes

Reverse seal.

Raises:

Type Description
InvalidRequestState

Malformed, unauthentic, or unknown-key token.

Source code in src/mcp/server/request_state.py
71
72
73
74
75
76
77
def unseal(self, token: str) -> bytes:
    """Reverse `seal`.

    Raises:
        InvalidRequestState: Malformed, unauthentic, or unknown-key token.
    """
    ...

authenticated_principal

authenticated_principal(
    ctx: ServerRequestContext[Any, Any],
) -> str | None

Default principal binding: the authenticated (client, issuer, subject) identity.

Uses the same components session ownership uses, so two users of one OAuth client are distinct principals whenever the token verifier supplies a subject, and the binding degrades to the client identity when it does not. Returns None (state not principal-bound) on unauthenticated transports.

Source code in src/mcp/server/request_state.py
80
81
82
83
84
85
86
87
88
89
90
91
def authenticated_principal(ctx: ServerRequestContext[Any, Any]) -> str | None:
    """Default principal binding: the authenticated (client, issuer, subject) identity.

    Uses the same components session ownership uses, so two users of one OAuth
    client are distinct principals whenever the token verifier supplies a
    subject, and the binding degrades to the client identity when it does not.
    Returns `None` (state not principal-bound) on unauthenticated transports.
    """
    token = get_access_token()
    if token is None:
        return None
    return compact_json(principal_components(token))

RequestStateSecurity

Policy for protecting requestState: codec, TTL, principal, audience.

Exactly one of keys or codec:

RequestStateSecurity(keys=[secret])      # built-in AES-256-GCM
RequestStateSecurity(codec=MyKmsCodec()) # bring your own crypto
RequestStateSecurity.ephemeral()         # process-local key

keys is the rotation ring: keys[0] seals, every key unseals. Zero-downtime rotation, each phase fully rolled out before the next: keys=[old, new], then keys=[new, old], then keys=[new] after one TTL.

The boundary enforces expiry, request binding, audience, and principal for every codec, fail-closed in both directions. audience=None defers to the boundary's default_audience (MCPServer passes its server name).

Source code in src/mcp/server/request_state.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class RequestStateSecurity:
    """Policy for protecting `requestState`: codec, TTL, principal, audience.

    Exactly one of `keys` or `codec`:

        RequestStateSecurity(keys=[secret])      # built-in AES-256-GCM
        RequestStateSecurity(codec=MyKmsCodec()) # bring your own crypto
        RequestStateSecurity.ephemeral()         # process-local key

    `keys` is the rotation ring: `keys[0]` seals, every key unseals.
    Zero-downtime rotation, each phase fully rolled out before the next:
    `keys=[old, new]`, then `keys=[new, old]`, then `keys=[new]` after one TTL.

    The boundary enforces expiry, request binding, audience, and principal for
    every codec, fail-closed in both directions. `audience=None` defers to the
    boundary's `default_audience` (`MCPServer` passes its server name).
    """

    codec: RequestStateCodec
    ttl: float
    bind_principal: Callable[[ServerRequestContext[Any, Any]], str | None] | None
    audience: str | None

    def __init__(
        self,
        *,
        keys: Sequence[bytes | bytearray | str] | None = None,
        codec: RequestStateCodec | None = None,
        ttl: float = 600.0,
        bind_principal: Callable[[ServerRequestContext[Any, Any]], str | None] | None = authenticated_principal,
        audience: str | None = None,
    ) -> None:
        if (keys is None) == (codec is None):
            raise ValueError("RequestStateSecurity takes exactly one of keys= or codec=")
        if not (math.isfinite(ttl) and ttl > 0):
            raise ValueError(f"request-state ttl must be a positive finite number, got {ttl!r}")
        if keys is not None:
            self.codec = AESGCMRequestStateCodec(keys)
        else:
            assert codec is not None
            self.codec = codec
        self.ttl = ttl
        self.bind_principal = bind_principal
        self.audience = audience

    @classmethod
    def ephemeral(cls, *, ttl: float = 600.0, audience: str | None = None) -> RequestStateSecurity:
        """Protection under a key generated now and held only by this process.

        This is the policy `MCPServer` installs when `request_state_security=`
        is omitted; call it yourself on the lowlevel tier or to set `ttl`/
        `audience`. Suits single-process deployments (stdio, one HTTP worker):
        state minted before a restart or by another worker is rejected.
        Multi-instance deployments must share a key via `keys=[...]`.
        """
        return cls(keys=[os.urandom(32)], ttl=ttl, audience=audience)

ephemeral classmethod

ephemeral(
    *, ttl: float = 600.0, audience: str | None = None
) -> RequestStateSecurity

Protection under a key generated now and held only by this process.

This is the policy MCPServer installs when request_state_security= is omitted; call it yourself on the lowlevel tier or to set ttl/ audience. Suits single-process deployments (stdio, one HTTP worker): state minted before a restart or by another worker is rejected. Multi-instance deployments must share a key via keys=[...].

Source code in src/mcp/server/request_state.py
139
140
141
142
143
144
145
146
147
148
149
@classmethod
def ephemeral(cls, *, ttl: float = 600.0, audience: str | None = None) -> RequestStateSecurity:
    """Protection under a key generated now and held only by this process.

    This is the policy `MCPServer` installs when `request_state_security=`
    is omitted; call it yourself on the lowlevel tier or to set `ttl`/
    `audience`. Suits single-process deployments (stdio, one HTTP worker):
    state minted before a restart or by another worker is rejected.
    Multi-instance deployments must share a key via `keys=[...]`.
    """
    return cls(keys=[os.urandom(32)], ttl=ttl, audience=audience)

compact_json

compact_json(value: Any, *, sort_keys: bool = False) -> str

Canonical JSON for everything the state path digests or seals.

ASCII output keeps the encode total: a lone surrogate in client-supplied text escapes instead of raising. Anything consuming this must parse with stdlib json.loads, which accepts those escapes (pydantic's JSON parser does not).

Source code in src/mcp/server/request_state.py
159
160
161
162
163
164
165
166
167
def compact_json(value: Any, *, sort_keys: bool = False) -> str:
    """Canonical JSON for everything the state path digests or seals.

    ASCII output keeps the encode total: a lone surrogate in client-supplied
    text escapes instead of raising. Anything consuming this must parse with
    stdlib `json.loads`, which accepts those escapes (pydantic's JSON parser
    does not).
    """
    return json.dumps(value, sort_keys=sort_keys, separators=(",", ":"))

AESGCMRequestStateCodec

Built-in codec: AES-256-GCM under key(s) derived with HKDF-SHA256.

Tokens are encrypted, not merely signed, so clients cannot read the state. keys[0] seals; all keys unseal (rotation, see RequestStateSecurity). Each token carries a 4-byte non-secret key fingerprint for an O(1) ring lookup, and the "v1." prefix and fingerprint are bound into the GCM associated data, so a token cannot be replayed into another format version or ring slot. Key bytes are copied at construction.

Source code in src/mcp/server/request_state.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class AESGCMRequestStateCodec:
    """Built-in codec: AES-256-GCM under key(s) derived with HKDF-SHA256.

    Tokens are encrypted, not merely signed, so clients cannot read the state.
    `keys[0]` seals; all keys unseal (rotation, see `RequestStateSecurity`).
    Each token carries a 4-byte non-secret key fingerprint for an O(1) ring
    lookup, and the "v1." prefix and fingerprint are bound into the GCM
    associated data, so a token cannot be replayed into another format version
    or ring slot. Key bytes are copied at construction.
    """

    def __init__(self, keys: Sequence[bytes | bytearray | str]) -> None:
        for i, key in enumerate(cast("Sequence[object]", keys)):
            if not isinstance(key, bytes | bytearray | str):
                # Never coerce: bytes(32) would silently build an all-zero key.
                raise TypeError(
                    f"request-state keys must be bytes, bytearray, or str; keys[{i}] is {type(key).__name__}"
                )
        material = [k.encode() if isinstance(k, str) else bytes(k) for k in keys]
        if not material:
            raise ValueError("AESGCMRequestStateCodec requires at least one key")
        for i, k in enumerate(material):
            if len(k) < 32:
                raise ValueError(
                    f"request-state keys must be at least 32 bytes of secret randomness; "
                    f"keys[{i}] is {len(k)} bytes. "
                    'Generate one with: python -c "import secrets; print(secrets.token_hex(32))"'
                )
        self._ring: dict[bytes, AESGCM] = {}
        self._mint_kid = b""
        for i, secret in enumerate(material):
            key = _derive_key(secret)
            kid = hashlib.sha256(_KID_INFO + key).digest()[:_KID_LEN]
            if kid in self._ring:
                raise ValueError(f"keys[{i}] duplicates an earlier ring key")
            self._ring[kid] = AESGCM(key)
            if i == 0:
                self._mint_kid = kid

    def seal(self, payload: bytes) -> str:
        kid = self._mint_kid
        nonce = os.urandom(_NONCE_LEN)
        sealed = self._ring[kid].encrypt(nonce, payload, _TOKEN_PREFIX.encode() + kid)
        return _TOKEN_PREFIX + _b64u(kid + nonce + sealed)

    def unseal(self, token: str) -> bytes:
        if not token.startswith(_TOKEN_PREFIX):
            raise InvalidRequestState("malformed")
        try:
            raw = _b64u_decode(token[len(_TOKEN_PREFIX) :])
        except ValueError as exc:
            raise InvalidRequestState("malformed") from exc
        if len(raw) < _KID_LEN + _NONCE_LEN + 16:
            raise InvalidRequestState("malformed")
        kid, nonce, sealed = raw[:_KID_LEN], raw[_KID_LEN : _KID_LEN + _NONCE_LEN], raw[_KID_LEN + _NONCE_LEN :]
        aead = self._ring.get(kid)
        if aead is None:
            raise InvalidRequestState("unknown key")
        try:
            return aead.decrypt(nonce, sealed, _TOKEN_PREFIX.encode() + kid)
        except InvalidTag:
            raise InvalidRequestState("seal") from None

RequestStateBoundary

Server middleware sealing/unsealing requestState at the wire boundary.

Acts only on the multi-round-trip carriers (tools/call, prompts/get, resources/read); every other method passes through untouched.

Inbound state is verified (codec unseal plus claims check) and replaced with the plaintext the server minted before any interceptor or handler runs; failure answers -32602 with the frozen message "Invalid or expired requestState", the real reason going to the server log only. Outbound, an input_required result carrying requestState is sealed in a fresh claims envelope; handlers and resolvers never call the codec.

default_audience seeds the audience claim when the policy sets none, and must be stated explicitly: it is the service identity that stops state minted by another service sharing the same keys. MCPServer installs this middleware with its server name by default (under an ephemeral policy unless request_state_security= supplies one); lowlevel Server users append one to server.middleware, passing their server's name (or None to deliberately leave tokens audience-free).

Source code in src/mcp/server/request_state.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
class RequestStateBoundary:
    """Server middleware sealing/unsealing `requestState` at the wire boundary.

    Acts only on the multi-round-trip carriers (tools/call, prompts/get,
    resources/read); every other method passes through untouched.

    Inbound state is verified (codec unseal plus claims check) and replaced
    with the plaintext the server minted before any interceptor or handler
    runs; failure answers -32602 with the frozen message "Invalid or expired
    requestState", the real reason going to the server log only. Outbound, an
    `input_required` result carrying `requestState` is sealed in a fresh
    claims envelope; handlers and resolvers never call the codec.

    `default_audience` seeds the audience claim when the policy sets none, and
    must be stated explicitly: it is the service identity that stops state
    minted by another service sharing the same keys. `MCPServer` installs this
    middleware with its server name by default (under an ephemeral policy
    unless `request_state_security=` supplies one); lowlevel `Server` users
    append one to `server.middleware`, passing their server's name (or `None`
    to deliberately leave tokens audience-free).
    """

    def __init__(self, security: RequestStateSecurity, *, default_audience: str | None) -> None:
        self._security = security
        self._audience = security.audience if security.audience is not None else default_audience

    async def __call__(self, ctx: ServerRequestContext[Any, Any], call_next: CallNext) -> HandlerResult:
        if ctx.method not in _MRTR_METHODS:
            return await call_next(ctx)
        binding: _RoundBinding | None = None
        if ctx.params is not None and ctx.params.get("requestState") is not None:
            # An explicit JSON null counts as absent: stripping the field is already in any client's power.
            plaintext, binding = self._unseal(ctx)
            ctx = replace(ctx, params={**ctx.params, "requestState": plaintext})
        result = await call_next(ctx)
        return self._seal_result(ctx, result, binding)

    def _unseal(self, ctx: ServerRequestContext[Any, Any]) -> tuple[str, _RoundBinding]:
        assert ctx.params is not None
        wire = ctx.params["requestState"]
        if not isinstance(wire, str):
            _reject(ctx.method, "non-string requestState")
        security = self._security
        try:
            payload = security.codec.unseal(wire)
        except InvalidRequestState as exc:
            _reject(ctx.method, str(exc))
        except Exception:  # deny-on-error: a buggy custom codec must fail closed
            logger.exception("requestState codec raised during unseal on %s", ctx.method)
            _reject(ctx.method, "codec error")
        try:
            claims = json.loads(payload)
            version, iat, exp, inner = claims["v"], claims["iat"], claims["exp"], claims["s"]
        except (ValueError, KeyError, TypeError):
            _reject(ctx.method, "malformed")
        if version != _ENVELOPE_VERSION or not isinstance(inner, str):
            _reject(ctx.method, "malformed")
        now = time.time()
        # Accept-conditions are stated positively so a NaN claim fails the comparison and rejects.
        if not isinstance(iat, int | float) or not (iat <= now + _FUTURE_SKEW):
            _reject(ctx.method, "minted in the future")
        if not isinstance(exp, int | float) or not (now < exp):
            _reject(ctx.method, "expired")
        target, args_digest = _request_identity(ctx.method, ctx.params)
        if claims.get("m") != ctx.method or claims.get("t") != target or claims.get("a") != args_digest:
            _reject(ctx.method, "request binding")
        if claims.get("aud") != self._audience:
            _reject(ctx.method, "audience")

        def fail_verify(reason: str) -> NoReturn:
            _reject(ctx.method, reason)

        principal = _bound_principal(security, ctx, fail_verify)
        claim = claims.get("p")
        if (claim is None) != (principal is None):
            _reject(ctx.method, "principal drift")
        if claim is not None and principal is not None:
            if not isinstance(claim, str) or not _principal_matches(claim, principal):
                _reject(ctx.method, "principal")
        return inner, (target, args_digest, principal)

    def _seal_result(
        self, ctx: ServerRequestContext[Any, Any], result: HandlerResult, binding: _RoundBinding | None
    ) -> HandlerResult:
        # Spec-path results arrive as wire mappings; a short-circuiting middleware may return a model.
        if not is_input_required(result):
            return result
        state = result.get("requestState") if isinstance(result, Mapping) else result.request_state
        if state is None:
            return result
        if isinstance(result, Mapping):
            if not isinstance(state, str):
                # Only a short-circuiting middleware can put a non-string here; nothing to seal.
                return result
            return {**result, "requestState": self._seal(ctx, state, binding)}
        return result.model_copy(update={"request_state": self._seal(ctx, state, binding)})

    def _seal(self, ctx: ServerRequestContext[Any, Any], state: str, binding: _RoundBinding | None = None) -> str:
        security = self._security
        if binding is None:

            def fail_seal(reason: str) -> NoReturn:
                logger.error("refusing to seal requestState on %s: %s", ctx.method, reason)
                raise MCPError(code=INTERNAL_ERROR, message="Internal error")

            target, args_digest = _request_identity(ctx.method, ctx.params)
            binding = (target, args_digest, _bound_principal(security, ctx, fail_seal))
        target, args_digest, principal = binding
        now = time.time()
        claims: dict[str, Any] = {
            "v": _ENVELOPE_VERSION,
            "iat": now,
            "exp": now + security.ttl,
            "m": ctx.method,
            "t": target,
            "a": args_digest,
            "s": state,
        }
        if self._audience is not None:
            claims["aud"] = self._audience
        if principal is not None:
            claims["p"] = _principal_claim(principal)
        payload = compact_json(claims).encode()
        try:
            return security.codec.seal(payload)
        except Exception:  # deny-on-error: a raising custom codec must not leak its failure
            logger.exception("requestState codec raised during seal on %s", ctx.method)
            raise MCPError(code=INTERNAL_ERROR, message="Internal error") from None