Add resource subscribe/unsubscribe support to McpAsyncServer#838
Add resource subscribe/unsubscribe support to McpAsyncServer#838
Conversation
- Register resources/subscribe and resources/unsubscribe handlers when the subscribe server capability is enabled - Track active subscriptions in a ConcurrentHashMap keyed by resource URI - Emit notifications/resources/updated when addResource() is called for a subscribed URI; also send listChanged notification if enabled - Add ResourceSubscriptionTests covering the capability guard, subscribe/ unsubscribe happy paths, and the updated notification flow Note: as with some other notificaitons this impl. is not bound to user sessiosns Resolves: #837, #776 Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
|
|
||
| private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>(); | ||
|
|
||
| private final Map<String, McpSchema.SubscribeRequest> resourceSubscriptions = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
- This construct should include a session ID so that we know which client to notify about which resource.
- The
SubscribeRequestis not needed right now, but perhaps the meta-information can be of use for later. For now I'd keep the field asMap<String, Set<String>>of uri -> set(session_id). - Use this map from
io.modelcontextprotocol.server.McpAsyncServer#notifyResourcesUpdatedto filter the clients to notify about a resource update (look at uri fromio.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification#uri)
There was a problem hiding this comment.
I realized this requires the McpTransportProvider to expose an API that accepts the sessionId or a set of them. For subscription related notifications, it has to be the listening stream (because the act of subscribing which has to receive a response, so even if the request upgrades to an SSE stream, it has to be closed after the response). So it makes sense to expose it at this level.
| if (this.serverCapabilities.resources().subscribe() | ||
| && this.resourceSubscriptions.containsKey(resourceUri)) { | ||
| Mono<Void> updated = this.notifyResourcesUpdated(new ResourcesUpdatedNotification(resourceUri)); | ||
| if (this.serverCapabilities.resources().listChanged()) { | ||
| return updated.then(this.notifyResourcesListChanged()); | ||
| } | ||
| return updated; | ||
| } |
There was a problem hiding this comment.
This logic is unnecessary. A resource is not updated when it's added but when it's modify. This code block should be removed.
|
|
||
| var resourceUri = resourceSubscribeRequest.uri(); | ||
|
|
||
| this.resourceSubscriptions.put(resourceUri, resourceSubscribeRequest); |
There was a problem hiding this comment.
use ex.sessionId() for the mapping.
|
|
||
| var resourceUri = resourceSubscribeRequest.uri(); | ||
|
|
||
| this.resourceSubscriptions.remove(resourceUri); |
There was a problem hiding this comment.
Only remove the particular session, not the resource uri from the mapping - only remove it if no sessions are left that have a subscription.
Note: as with some other notifications this impl. is not bound to user sessions
Resolves: #837, #776