diff --git a/conformance-tests/conformance-baseline.yml b/conformance-tests/conformance-baseline.yml index 4ab144063..d2990c155 100644 --- a/conformance-tests/conformance-baseline.yml +++ b/conformance-tests/conformance-baseline.yml @@ -2,11 +2,6 @@ # This file lists known failing scenarios that are expected to fail until fixed. # See: https://github.com/modelcontextprotocol/conformance/blob/main/SDK_INTEGRATION.md -server: - # Resource subscription not implemented in SDK - - resources-subscribe - - resources-unsubscribe - client: # SSE retry field handling not implemented # - Client does not parse or respect retry: field timing diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 32256987a..a4f054953 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -25,9 +25,9 @@ import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion; import io.modelcontextprotocol.spec.McpSchema.ErrorCodes; import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; -import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; import io.modelcontextprotocol.spec.McpSchema.PromptReference; import io.modelcontextprotocol.spec.McpSchema.ResourceReference; +import io.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification; import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest; import io.modelcontextprotocol.spec.McpSchema.Tool; import io.modelcontextprotocol.spec.McpServerSession; @@ -109,6 +109,8 @@ public class McpAsyncServer { private final ConcurrentHashMap resourceTemplates = new ConcurrentHashMap<>(); + private final Map resourceSubscriptions = new ConcurrentHashMap<>(); + private final ConcurrentHashMap prompts = new ConcurrentHashMap<>(); // FIXME: this field is deprecated and should be remvoed together with the @@ -215,6 +217,11 @@ private Map> prepareRequestHandlers() { requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler()); + + if (this.serverCapabilities.resources().subscribe()) { + requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeHandler()); + requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeHandler()); + } } // Add prompts API handlers if provider exists @@ -560,15 +567,26 @@ public Mono addResource(McpServerFeatures.AsyncResourceSpecification resou } return Mono.defer(() -> { - var previous = this.resources.put(resourceSpecification.resource().uri(), resourceSpecification); + var resourceUri = resourceSpecification.resource().uri(); + + var previous = this.resources.put(resourceUri, resourceSpecification); if (previous != null) { - logger.warn("Replace existing Resource with URI '{}'", resourceSpecification.resource().uri()); + logger.warn("Replace existing Resource with URI '{}'", resourceUri); } else { - logger.debug("Added resource handler: {}", resourceSpecification.resource().uri()); + logger.debug("Added resource handler: {}", resourceUri); + } + + if (this.serverCapabilities.resources().subscribe() + && this.resourceSubscriptions.containsKey(resourceUri)) { + Mono updated = this.notifyResourcesUpdated(new ResourcesUpdatedNotification(resourceUri)); + if (this.serverCapabilities.resources().listChanged()) { + return updated.then(this.notifyResourcesListChanged()); + } + return updated; } if (this.serverCapabilities.resources().listChanged()) { - return notifyResourcesListChanged(); + return this.notifyResourcesListChanged(); } return Mono.empty(); }); @@ -600,8 +618,9 @@ public Mono removeResource(String resourceUri) { McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri); if (removed != null) { logger.debug("Removed resource handler: {}", resourceUri); + if (this.serverCapabilities.resources().listChanged()) { - return notifyResourcesListChanged(); + return this.notifyResourcesListChanged(); } return Mono.empty(); } @@ -734,6 +753,33 @@ private McpRequestHandler resourcesReadRequestHand }; } + private McpRequestHandler resourcesSubscribeHandler() { + return (ex, params) -> { + McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() { + }); + + var resourceUri = resourceSubscribeRequest.uri(); + + this.resourceSubscriptions.put(resourceUri, resourceSubscribeRequest); + + return Mono.just(Map.of()); + }; + } + + private McpRequestHandler resourcesUnsubscribeHandler() { + + return (ex, params) -> { + McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() { + }); + + var resourceUri = resourceSubscribeRequest.uri(); + + this.resourceSubscriptions.remove(resourceUri); + + return Mono.just(Map.of()); + }; + } + private Optional findResourceSpecification(String uri) { var result = this.resources.values() .stream() diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java new file mode 100644 index 000000000..3cf9f6707 --- /dev/null +++ b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java @@ -0,0 +1,217 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.server; + +import java.time.Duration; +import java.util.List; + +import io.modelcontextprotocol.MockMcpServerTransport; +import io.modelcontextprotocol.MockMcpServerTransportProvider; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.ReadResourceResult; +import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities; +import io.modelcontextprotocol.spec.ProtocolVersions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** + * Tests for resource subscribe/unsubscribe support in {@link McpAsyncServer}. + * + * Covers the handlers registered under {@code resources/subscribe} and + * {@code resources/unsubscribe}, the capability guard that keeps them absent when + * {@code subscribe=false}, and the {@code notifications/resources/updated} notification + * emitted by {@link McpAsyncServer#addResource} when a subscribed URI is (re-)added. + */ +class ResourceSubscriptionTests { + + private static final String RESOURCE_URI = "test://resource/item"; + + private MockMcpServerTransport mockTransport; + + private MockMcpServerTransportProvider mockTransportProvider; + + private McpAsyncServer mcpAsyncServer; + + @BeforeEach + void setUp() { + mockTransport = new MockMcpServerTransport(); + mockTransportProvider = new MockMcpServerTransportProvider(mockTransport); + } + + @AfterEach + void tearDown() { + if (mcpAsyncServer != null) { + assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))) + .doesNotThrowAnyException(); + } + } + + // ------------------------------------------------------------------------- + // Capability guard + // ------------------------------------------------------------------------- + + @Test + void subscribeHandlerNotRegisteredWhenSubscribeCapabilityDisabled() { + mcpAsyncServer = McpServer.async(mockTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().resources(false, false).build()) + .build(); + + initializeSession(); + mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); + + McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage(); + assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class); + McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response; + assertThat(jsonResponse.error()).isNotNull(); + assertThat(jsonResponse.error().code()).isEqualTo(McpSchema.ErrorCodes.METHOD_NOT_FOUND); + } + + // ------------------------------------------------------------------------- + // Subscribe / unsubscribe happy paths + // ------------------------------------------------------------------------- + + @Test + void subscribeRequestReturnsEmptyResult() { + mcpAsyncServer = McpServer.async(mockTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().resources(true, false).build()) + .build(); + + initializeSession(); + mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); + + McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage(); + assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class); + McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response; + assertThat(jsonResponse.id()).isEqualTo("req-1"); + assertThat(jsonResponse.error()).isNull(); + } + + // ------------------------------------------------------------------------- + // addResource notification behaviour + // ------------------------------------------------------------------------- + + @Test + void addSubscribedResourceSendsResourcesUpdatedNotification() { + mcpAsyncServer = McpServer.async(mockTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().resources(true, false).build()) + .build(); + + initializeSession(); + // Client subscribes first + mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); + mockTransport.clearSentMessages(); + + mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); + + List sent = mockTransport.getAllSentMessages(); + assertThat(sent).hasSize(1); + assertThat(sent.get(0)).isInstanceOf(McpSchema.JSONRPCNotification.class); + McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0); + assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED); + } + + @Test + void addSubscribedResourceWithListChangedSendsBothNotifications() { + mcpAsyncServer = McpServer.async(mockTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .build(); + + initializeSession(); + mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); + mockTransport.clearSentMessages(); + + mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); + + List sent = mockTransport.getAllSentMessages(); + assertThat(sent).hasSize(2); + + List methods = sent.stream() + .filter(m -> m instanceof McpSchema.JSONRPCNotification) + .map(m -> ((McpSchema.JSONRPCNotification) m).method()) + .toList(); + assertThat(methods).containsExactly(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED); + } + + @Test + void addUnsubscribedResourceDoesNotSendResourcesUpdatedNotification() { + mcpAsyncServer = McpServer.async(mockTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().resources(true, true).build()) + .build(); + + initializeSession(); + // No subscribe call — resource URI is not in the subscription map + mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); + + List sent = mockTransport.getAllSentMessages(); + assertThat(sent).hasSize(1); + McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0); + assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED); + } + + @Test + void addResourceAfterUnsubscribeDoesNotSendResourcesUpdatedNotification() { + mcpAsyncServer = McpServer.async(mockTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().resources(true, false).build()) + .build(); + + initializeSession(); + mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); + mockTransportProvider.simulateIncomingMessage(unsubscribeRequest("req-2", RESOURCE_URI)); + mockTransport.clearSentMessages(); + + mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); + + // No notifications expected: not subscribed, listChanged=false + assertThat(mockTransport.getAllSentMessages()).isEmpty(); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Performs the MCP initialization handshake so that the session's exchangeSink is + * populated and subsequent request handlers can be invoked. + */ + private void initializeSession() { + mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, + McpSchema.METHOD_INITIALIZE, "init-req", new McpSchema.InitializeRequest( + ProtocolVersions.MCP_2025_11_25, null, new McpSchema.Implementation("test-client", "1.0.0")))); + + mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, + McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)); + + mockTransport.clearSentMessages(); + } + + private static McpSchema.JSONRPCRequest subscribeRequest(String id, String uri) { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_SUBSCRIBE, id, + new McpSchema.SubscribeRequest(uri)); + } + + private static McpSchema.JSONRPCRequest unsubscribeRequest(String id, String uri) { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, id, + new McpSchema.SubscribeRequest(uri)); + } + + private static McpServerFeatures.AsyncResourceSpecification resourceSpec(String uri) { + McpSchema.Resource resource = McpSchema.Resource.builder().uri(uri).name("test-resource").build(); + return new McpServerFeatures.AsyncResourceSpecification(resource, + (exchange, req) -> Mono.just(new ReadResourceResult(List.of()))); + } + +}