Skip to content

subscriptions

Server-side subscriptions/listen support (2026-07-28, SEP-2575).

On the 2026-07-28 wire there is no standing GET stream: a client opts in to server events by sending a subscriptions/listen request whose response IS the stream. This module provides the two pieces a server needs:

  • SubscriptionBus: the pluggable fan-out seam. The bus carries typed ServerEvent values, not wire notifications - the listen handler owns subscription-id stamping and per-stream filtering, so a custom bus (e.g. backed by Redis pub/sub for multi-replica deployments) never sees JSON-RPC. The in-process default is InMemorySubscriptionBus.
  • ListenHandler: the request handler that serves subscriptions/listen. MCPServer registers one automatically; lowlevel Server users pass an instance as on_subscriptions_listen=.

Per the spec, the handler acknowledges first (the ack is the first frame on the stream), tags every frame with the listen request's JSON-RPC id under _meta["io.modelcontextprotocol/subscriptionId"], and never delivers an event kind the client did not request. Delivery is fire-and-forget with no replay: a dropped stream is not resumable - clients re-listen and refetch.

SUBSCRIPTION_ID_META_KEY module-attribute

SUBSCRIPTION_ID_META_KEY = (
    "io.modelcontextprotocol/subscriptionId"
)

The _meta key carrying the subscription id on every listen-stream frame.

The value is the subscriptions/listen request's JSON-RPC id, verbatim.

ToolsListChanged dataclass

The server's tool list changed.

Source code in src/mcp/server/subscriptions.py
62
63
64
@dataclass(frozen=True)
class ToolsListChanged:
    """The server's tool list changed."""

PromptsListChanged dataclass

The server's prompt list changed.

Source code in src/mcp/server/subscriptions.py
67
68
69
@dataclass(frozen=True)
class PromptsListChanged:
    """The server's prompt list changed."""

ResourcesListChanged dataclass

The server's resource list changed.

Source code in src/mcp/server/subscriptions.py
72
73
74
@dataclass(frozen=True)
class ResourcesListChanged:
    """The server's resource list changed."""

ResourceUpdated dataclass

The resource at uri changed and may need to be read again.

Source code in src/mcp/server/subscriptions.py
77
78
79
80
81
@dataclass(frozen=True)
class ResourceUpdated:
    """The resource at `uri` changed and may need to be read again."""

    uri: str

ServerEvent module-attribute

An event a server publishes for delivery to listen subscribers.

SubscriptionBus

Bases: Protocol

Fan-out seam between event publishers and open listen streams.

Implement this over an external pub/sub backend (Redis, NATS, ...) to fan events out across replicas: publish forwards the event to the backend, and each replica's bus invokes its local listeners for events arriving from the backend. The same instance can be shared across servers.

publish is async so backend implementations can do network I/O. subscribe is synchronous local registration. Listeners are synchronous, must not raise, and are invoked on the server's event loop.

Source code in src/mcp/server/subscriptions.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class SubscriptionBus(Protocol):
    """Fan-out seam between event publishers and open listen streams.

    Implement this over an external pub/sub backend (Redis, NATS, ...) to fan
    events out across replicas: `publish` forwards the event to the backend,
    and each replica's bus invokes its local listeners for events arriving
    from the backend. The same instance can be shared across servers.

    `publish` is async so backend implementations can do network I/O.
    `subscribe` is synchronous local registration. Listeners are synchronous,
    must not raise, and are invoked on the server's event loop.
    """

    async def publish(self, event: ServerEvent) -> None:
        """Deliver `event` to every subscribed listener."""
        ...

    def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
        """Register `listener` and return an idempotent unsubscribe callable."""
        ...

publish async

publish(event: ServerEvent) -> None

Deliver event to every subscribed listener.

Source code in src/mcp/server/subscriptions.py
101
102
103
async def publish(self, event: ServerEvent) -> None:
    """Deliver `event` to every subscribed listener."""
    ...

subscribe

subscribe(
    listener: Callable[[ServerEvent], None],
) -> Callable[[], None]

Register listener and return an idempotent unsubscribe callable.

Source code in src/mcp/server/subscriptions.py
105
106
107
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
    """Register `listener` and return an idempotent unsubscribe callable."""
    ...

InMemorySubscriptionBus

In-process SubscriptionBus: synchronous fan-out to listeners in subscription order.

Source code in src/mcp/server/subscriptions.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class InMemorySubscriptionBus:
    """In-process `SubscriptionBus`: synchronous fan-out to listeners in subscription order."""

    def __init__(self) -> None:
        # Keyed by a per-subscription token so the same callable can be
        # registered more than once (bound methods compare equal).
        self._listeners: dict[object, Callable[[ServerEvent], None]] = {}

    async def publish(self, event: ServerEvent) -> None:
        """Deliver `event` to every subscribed listener.

        A raising listener is logged and skipped: one bad listener must not
        starve the others or fail the publishing handler. Ends with a
        checkpoint so a burst of publishes from one task lets listen streams
        drain between events instead of overflowing their buffers unread.
        """
        for listener in list(self._listeners.values()):
            try:
                listener(event)
            except Exception:  # fan-out boundary: isolate listeners from each other
                logger.exception("subscription listener raised; continuing")
        await anyio.lowlevel.checkpoint()

    def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
        """Register `listener` and return an idempotent unsubscribe callable."""
        token = object()
        self._listeners[token] = listener

        def unsubscribe() -> None:
            self._listeners.pop(token, None)

        return unsubscribe

publish async

publish(event: ServerEvent) -> None

Deliver event to every subscribed listener.

A raising listener is logged and skipped: one bad listener must not starve the others or fail the publishing handler. Ends with a checkpoint so a burst of publishes from one task lets listen streams drain between events instead of overflowing their buffers unread.

Source code in src/mcp/server/subscriptions.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def publish(self, event: ServerEvent) -> None:
    """Deliver `event` to every subscribed listener.

    A raising listener is logged and skipped: one bad listener must not
    starve the others or fail the publishing handler. Ends with a
    checkpoint so a burst of publishes from one task lets listen streams
    drain between events instead of overflowing their buffers unread.
    """
    for listener in list(self._listeners.values()):
        try:
            listener(event)
        except Exception:  # fan-out boundary: isolate listeners from each other
            logger.exception("subscription listener raised; continuing")
    await anyio.lowlevel.checkpoint()

subscribe

subscribe(
    listener: Callable[[ServerEvent], None],
) -> Callable[[], None]

Register listener and return an idempotent unsubscribe callable.

Source code in src/mcp/server/subscriptions.py
133
134
135
136
137
138
139
140
141
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
    """Register `listener` and return an idempotent unsubscribe callable."""
    token = object()
    self._listeners[token] = listener

    def unsubscribe() -> None:
        self._listeners.pop(token, None)

    return unsubscribe

ListenHandler

Serves subscriptions/listen: one call is one subscription stream.

Register on a lowlevel Server via on_subscriptions_listen= (or add_request_handler); MCPServer does so automatically. Each call acknowledges the honored filter first, then forwards matching bus events onto the request's response stream until the client disconnects (which cancels the handler; the stream just ends, per the spec's abrupt-close contract) or close ends all streams gracefully.

Requires a transport that can stream a request's response (streamable HTTP's SSE mode).

max_subscriptions bounds concurrent streams (further listen requests are rejected with INTERNAL_ERROR, before the ack). max_buffered_events bounds each stream's event backlog: a stream whose client has stopped reading is ended at the cap (the client re-listens and refetches - there is no replay, so ending the stream loses nothing the backlog wasn't already losing).

Source code in src/mcp/server/subscriptions.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class ListenHandler:
    """Serves `subscriptions/listen`: one call is one subscription stream.

    Register on a lowlevel `Server` via `on_subscriptions_listen=` (or
    `add_request_handler`); `MCPServer` does so automatically. Each call
    acknowledges the honored filter first, then forwards matching bus events
    onto the request's response stream until the client disconnects (which
    cancels the handler; the stream just ends, per the spec's abrupt-close
    contract) or `close` ends all streams gracefully.

    Requires a transport that can stream a request's response (streamable
    HTTP's SSE mode).

    `max_subscriptions` bounds concurrent streams (further listen requests are
    rejected with `INTERNAL_ERROR`, before the ack). `max_buffered_events`
    bounds each stream's event backlog: a stream whose client has stopped
    reading is ended at the cap (the client re-listens and refetches - there
    is no replay, so ending the stream loses nothing the backlog wasn't
    already losing).
    """

    def __init__(self, bus: SubscriptionBus, *, max_subscriptions: int = 1024, max_buffered_events: int = 1024) -> None:
        self._bus = bus
        self._max_subscriptions = max_subscriptions
        self._max_buffered_events = max_buffered_events
        self._streams: set[anyio.streams.memory.MemoryObjectSendStream[ServerEvent]] = set()

    async def __call__(
        self,
        ctx: ServerRequestContext[Any, Any],
        params: SubscriptionsListenRequestParams,
    ) -> SubscriptionsListenResult:
        """Serve one listen stream."""
        subscription_id = ctx.request_id
        if subscription_id is None:
            raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id")
        if len(self._streams) >= self._max_subscriptions:
            raise MCPError(INTERNAL_ERROR, "Subscription limit reached")
        honored = _honored_subset(params.notifications)
        honored_uris = frozenset(honored.resource_subscriptions or ())
        meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id}

        # Buffered so publishers don't block on a slow consumer (the transport
        # write happens in this handler task, not the publisher's). A stream
        # whose backlog hits the cap is ended - see the class docstring.
        send, recv = anyio.create_memory_object_stream[ServerEvent](self._max_buffered_events)

        def deliver(event: ServerEvent) -> None:
            if _event_matches(honored, honored_uris, event):
                try:
                    send.send_nowait(event)
                except anyio.ClosedResourceError:
                    # `close` closed this stream; the loop below is unwinding.
                    pass
                except anyio.WouldBlock:
                    logger.warning("listen stream %r backlog full; ending the stream", subscription_id)
                    # Release the subscription slot now: the handler's own
                    # cleanup can be wedged in a transport write that closing
                    # this buffer cannot wake (a client that stopped reading).
                    self._streams.discard(send)
                    send.close()

        # Subscribe before sending the ack so an event published while the
        # ack write is suspended is buffered rather than lost. The ack is
        # still the first frame: this task alone writes the stream, and it
        # only starts draining the buffer after the ack send returns.
        unsubscribe = self._bus.subscribe(deliver)
        self._streams.add(send)
        try:
            await ctx.session.send_notification(
                SubscriptionsAcknowledgedNotification(
                    params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta)
                ),
                related_request_id=subscription_id,
            )
            async for event in recv:
                await ctx.session.send_notification(
                    _event_to_notification(event, meta), related_request_id=subscription_id
                )
        finally:
            _safe_unsubscribe(unsubscribe)
            self._streams.discard(send)
            send.close()
            recv.close()
        return SubscriptionsListenResult(_meta=meta)

    def close(self) -> None:
        """Initiate graceful closure of every open listen stream.

        Each stream then drains its buffered events and sends its
        `SubscriptionsListenResult` (stamped with the subscription id) as the
        final frame from its own handler task - the spec's graceful closure
        flow, telling clients the stream ended deliberately rather than
        dropping. This method only initiates that; it does not wait for the
        streams to finish flushing.
        """
        for stream in list(self._streams):
            stream.close()

__call__ async

Serve one listen stream.

Source code in src/mcp/server/subscriptions.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
async def __call__(
    self,
    ctx: ServerRequestContext[Any, Any],
    params: SubscriptionsListenRequestParams,
) -> SubscriptionsListenResult:
    """Serve one listen stream."""
    subscription_id = ctx.request_id
    if subscription_id is None:
        raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id")
    if len(self._streams) >= self._max_subscriptions:
        raise MCPError(INTERNAL_ERROR, "Subscription limit reached")
    honored = _honored_subset(params.notifications)
    honored_uris = frozenset(honored.resource_subscriptions or ())
    meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id}

    # Buffered so publishers don't block on a slow consumer (the transport
    # write happens in this handler task, not the publisher's). A stream
    # whose backlog hits the cap is ended - see the class docstring.
    send, recv = anyio.create_memory_object_stream[ServerEvent](self._max_buffered_events)

    def deliver(event: ServerEvent) -> None:
        if _event_matches(honored, honored_uris, event):
            try:
                send.send_nowait(event)
            except anyio.ClosedResourceError:
                # `close` closed this stream; the loop below is unwinding.
                pass
            except anyio.WouldBlock:
                logger.warning("listen stream %r backlog full; ending the stream", subscription_id)
                # Release the subscription slot now: the handler's own
                # cleanup can be wedged in a transport write that closing
                # this buffer cannot wake (a client that stopped reading).
                self._streams.discard(send)
                send.close()

    # Subscribe before sending the ack so an event published while the
    # ack write is suspended is buffered rather than lost. The ack is
    # still the first frame: this task alone writes the stream, and it
    # only starts draining the buffer after the ack send returns.
    unsubscribe = self._bus.subscribe(deliver)
    self._streams.add(send)
    try:
        await ctx.session.send_notification(
            SubscriptionsAcknowledgedNotification(
                params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta)
            ),
            related_request_id=subscription_id,
        )
        async for event in recv:
            await ctx.session.send_notification(
                _event_to_notification(event, meta), related_request_id=subscription_id
            )
    finally:
        _safe_unsubscribe(unsubscribe)
        self._streams.discard(send)
        send.close()
        recv.close()
    return SubscriptionsListenResult(_meta=meta)

close

close() -> None

Initiate graceful closure of every open listen stream.

Each stream then drains its buffered events and sends its SubscriptionsListenResult (stamped with the subscription id) as the final frame from its own handler task - the spec's graceful closure flow, telling clients the stream ended deliberately rather than dropping. This method only initiates that; it does not wait for the streams to finish flushing.

Source code in src/mcp/server/subscriptions.py
285
286
287
288
289
290
291
292
293
294
295
296
def close(self) -> None:
    """Initiate graceful closure of every open listen stream.

    Each stream then drains its buffered events and sends its
    `SubscriptionsListenResult` (stamped with the subscription id) as the
    final frame from its own handler task - the spec's graceful closure
    flow, telling clients the stream ended deliberately rather than
    dropping. This method only initiates that; it does not wait for the
    streams to finish flushing.
    """
    for stream in list(self._streams):
        stream.close()