> progressConsumers) {
+ Assert.notNull(progressConsumers, "Progress consumers must not be null");
+ this.progressConsumers.addAll(progressConsumers);
+ return this;
+ }
+
+ /**
+ * Add a provider of {@link McpTransportContext}, providing a context before
+ * calling any client operation. This allows to extract thread-locals and hand
+ * them over to the underlying transport.
+ *
+ * There is no direct equivalent in {@link AsyncSpec}. To achieve the same result,
+ * append {@code contextWrite(McpTransportContext.KEY, context)} to any
+ * {@link McpAsyncClient} call.
+ * @param contextProvider A supplier to create a context
+ * @return This builder for method chaining
+ */
+ public SyncSpec transportContextProvider(Supplier contextProvider) {
+ this.contextProvider = contextProvider;
+ return this;
+ }
+
+ /**
+ * Add a {@link JsonSchemaValidator} to validate the JSON structure of the
+ * structured output.
+ * @param jsonSchemaValidator A validator to validate the JSON structure of the
+ * structured output. Must not be null.
+ * @return This builder for method chaining
+ * @throws IllegalArgumentException if jsonSchemaValidator is null
+ */
+ public SyncSpec jsonSchemaValidator(JsonSchemaValidator jsonSchemaValidator) {
+ Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null");
+ this.jsonSchemaValidator = jsonSchemaValidator;
+ return this;
+ }
+
+ /**
+ * Enables automatic schema caching during callTool operations. When a tool's
+ * output schema is not found in the cache, callTool will automatically fetch and
+ * cache all tool schemas via listTools.
+ * @param enableCallToolSchemaCaching true to enable, false to disable
+ * @return This builder instance for method chaining
+ */
+ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching) {
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ return this;
+ }
+
/**
* Create an instance of {@link McpSyncClient} with the provided configurations or
* sensible defaults.
@@ -349,12 +485,15 @@ public SyncSpec loggingConsumers(List roots = new HashMap<>();
@@ -391,13 +532,23 @@ class AsyncSpec {
private final List, Mono>> resourcesChangeConsumers = new ArrayList<>();
+ private final List, Mono>> resourcesUpdateConsumers = new ArrayList<>();
+
private final List, Mono>> promptsChangeConsumers = new ArrayList<>();
private final List>> loggingConsumers = new ArrayList<>();
+ private final List>> progressConsumers = new ArrayList<>();
+
private Function> samplingHandler;
- private AsyncSpec(ClientMcpTransport transport) {
+ private Function> elicitationHandler;
+
+ private JsonSchemaValidator jsonSchemaValidator;
+
+ private boolean enableCallToolSchemaCaching = false; // Default to false
+
+ private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
}
@@ -417,6 +568,18 @@ public AsyncSpec requestTimeout(Duration requestTimeout) {
return this;
}
+ /**
+ * @param initializationTimeout The duration to wait for the initialization
+ * lifecycle step to complete.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if initializationTimeout is null
+ */
+ public AsyncSpec initializationTimeout(Duration initializationTimeout) {
+ Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
+ this.initializationTimeout = initializationTimeout;
+ return this;
+ }
+
/**
* Sets the client capabilities that will be advertised to the server during
* connection initialization. Capabilities define what features the client
@@ -493,6 +656,21 @@ public AsyncSpec sampling(Function> elicitationHandler) {
+ Assert.notNull(elicitationHandler, "Elicitation handler must not be null");
+ this.elicitationHandler = elicitationHandler;
+ return this;
+ }
+
/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
@@ -524,6 +702,23 @@ public AsyncSpec resourcesChangeConsumer(
return this;
}
+ /**
+ * Adds a consumer to be notified when a specific resource is updated. This allows
+ * the client to react to changes in individual resources, such as updates to
+ * their content or metadata.
+ * @param resourcesUpdateConsumer A consumer function that processes the updated
+ * resource and returns a Mono indicating the completion of the processing. Must
+ * not be null.
+ * @return This builder instance for method chaining.
+ * @throws IllegalArgumentException If the resourcesUpdateConsumer is null.
+ */
+ public AsyncSpec resourcesUpdateConsumer(
+ Function, Mono> resourcesUpdateConsumer) {
+ Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null");
+ this.resourcesUpdateConsumers.add(resourcesUpdateConsumer);
+ return this;
+ }
+
/**
* Adds a consumer to be notified when the available prompts change. This allows
* the client to react to changes in the server's prompt templates, such as new
@@ -568,16 +763,76 @@ public AsyncSpec loggingConsumers(
return this;
}
+ /**
+ * Adds a consumer to be notified of progress notifications from the server. This
+ * allows the client to track long-running operations and provide feedback to
+ * users.
+ * @param progressConsumer A consumer that receives progress notifications. Must
+ * not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if progressConsumer is null
+ */
+ public AsyncSpec progressConsumer(Function> progressConsumer) {
+ Assert.notNull(progressConsumer, "Progress consumer must not be null");
+ this.progressConsumers.add(progressConsumer);
+ return this;
+ }
+
+ /**
+ * Adds a multiple consumers to be notified of progress notifications from the
+ * server. This allows the client to track long-running operations and provide
+ * feedback to users.
+ * @param progressConsumers A list of consumers that receives progress
+ * notifications. Must not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if progressConsumer is null
+ */
+ public AsyncSpec progressConsumers(
+ List>> progressConsumers) {
+ Assert.notNull(progressConsumers, "Progress consumers must not be null");
+ this.progressConsumers.addAll(progressConsumers);
+ return this;
+ }
+
+ /**
+ * Sets the JSON schema validator to use for validating tool responses against
+ * output schemas.
+ * @param jsonSchemaValidator The validator to use. Must not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if jsonSchemaValidator is null
+ */
+ public AsyncSpec jsonSchemaValidator(JsonSchemaValidator jsonSchemaValidator) {
+ Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null");
+ this.jsonSchemaValidator = jsonSchemaValidator;
+ return this;
+ }
+
+ /**
+ * Enables automatic schema caching during callTool operations. When a tool's
+ * output schema is not found in the cache, callTool will automatically fetch and
+ * cache all tool schemas via listTools.
+ * @param enableCallToolSchemaCaching true to enable, false to disable
+ * @return This builder instance for method chaining
+ */
+ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching) {
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ return this;
+ }
+
/**
* Create an instance of {@link McpAsyncClient} with the provided configurations
* or sensible defaults.
* @return a new instance of {@link McpAsyncClient}.
*/
public McpAsyncClient build() {
- return new McpAsyncClient(this.transport, this.requestTimeout,
+ var jsonSchemaValidator = (this.jsonSchemaValidator != null) ? this.jsonSchemaValidator
+ : JsonSchemaValidator.getDefault();
+ return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
+ jsonSchemaValidator,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
- this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
- this.loggingConsumers, this.samplingHandler));
+ this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
+ this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
+ this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
similarity index 61%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
index 284b93f88..127d53337 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
@@ -59,14 +59,21 @@ class McpClientFeatures {
* @param resourcesChangeConsumers the resources change consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List, Mono>> toolsChangeConsumers,
List, Mono>> resourcesChangeConsumers,
+ List, Mono>> resourcesUpdateConsumers,
List, Mono>> promptsChangeConsumers,
List>> loggingConsumers,
- Function> samplingHandler) {
+ List>> progressConsumers,
+ Function> samplingHandler,
+ Function> elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
/**
* Create an instance and validate the arguments.
@@ -76,29 +83,58 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
* @param resourcesChangeConsumers the resources change consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots,
List, Mono>> toolsChangeConsumers,
List, Mono>> resourcesChangeConsumers,
+ List, Mono>> resourcesUpdateConsumers,
List, Mono>> promptsChangeConsumers,
List>> loggingConsumers,
- Function> samplingHandler) {
+ List>> progressConsumers,
+ Function> samplingHandler,
+ Function> elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
this.clientCapabilities = (clientCapabilities != null) ? clientCapabilities
: new McpSchema.ClientCapabilities(null,
!Utils.isEmpty(roots) ? new McpSchema.ClientCapabilities.RootCapabilities(false) : null,
- samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null);
+ samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null,
+ elicitationHandler != null ? new McpSchema.ClientCapabilities.Elicitation() : null);
this.roots = roots != null ? new ConcurrentHashMap<>(roots) : new ConcurrentHashMap<>();
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
+ this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
+ this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
this.samplingHandler = samplingHandler;
+ this.elicitationHandler = elicitationHandler;
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ }
+
+ /**
+ * @deprecated Only exists for backwards-compatibility purposes.
+ */
+ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
+ Map roots,
+ List, Mono>> toolsChangeConsumers,
+ List, Mono>> resourcesChangeConsumers,
+ List, Mono>> resourcesUpdateConsumers,
+ List, Mono>> promptsChangeConsumers,
+ List>> loggingConsumers,
+ Function> samplingHandler,
+ Function> elicitationHandler) {
+ this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
+ resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
+ elicitationHandler, false);
}
/**
@@ -122,8 +158,13 @@ public static Async fromSync(Sync syncSpec) {
.subscribeOn(Schedulers.boundedElastic()));
}
- List, Mono>> promptsChangeConsumers = new ArrayList<>();
+ List, Mono>> resourcesUpdateConsumers = new ArrayList<>();
+ for (Consumer> consumer : syncSpec.resourcesUpdateConsumers()) {
+ resourcesUpdateConsumers.add(r -> Mono.fromRunnable(() -> consumer.accept(r))
+ .subscribeOn(Schedulers.boundedElastic()));
+ }
+ List, Mono>> promptsChangeConsumers = new ArrayList<>();
for (Consumer> consumer : syncSpec.promptsChangeConsumers()) {
promptsChangeConsumers.add(p -> Mono.fromRunnable(() -> consumer.accept(p))
.subscribeOn(Schedulers.boundedElastic()));
@@ -135,12 +176,24 @@ public static Async fromSync(Sync syncSpec) {
.subscribeOn(Schedulers.boundedElastic()));
}
+ List>> progressConsumers = new ArrayList<>();
+ for (Consumer consumer : syncSpec.progressConsumers()) {
+ progressConsumers.add(l -> Mono.fromRunnable(() -> consumer.accept(l))
+ .subscribeOn(Schedulers.boundedElastic()));
+ }
+
Function> samplingHandler = r -> Mono
.fromCallable(() -> syncSpec.samplingHandler().apply(r))
.subscribeOn(Schedulers.boundedElastic());
+
+ Function> elicitationHandler = r -> Mono
+ .fromCallable(() -> syncSpec.elicitationHandler().apply(r))
+ .subscribeOn(Schedulers.boundedElastic());
+
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
- toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers,
- samplingHandler);
+ toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
+ loggingConsumers, progressConsumers, samplingHandler, elicitationHandler,
+ syncSpec.enableCallToolSchemaCaching);
}
}
@@ -155,14 +208,21 @@ public static Async fromSync(Sync syncSpec) {
* @param resourcesChangeConsumers the resources change consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
List>> resourcesChangeConsumers,
+ List>> resourcesUpdateConsumers,
List>> promptsChangeConsumers,
List> loggingConsumers,
- Function samplingHandler) {
+ List> progressConsumers,
+ Function samplingHandler,
+ Function elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
/**
* Create an instance and validate the arguments.
@@ -171,30 +231,59 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
* @param roots the roots.
* @param toolsChangeConsumers the tools change consumers.
* @param resourcesChangeConsumers the resources change consumers.
+ * @param resourcesUpdateConsumers the resource update consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
+ * @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
List>> resourcesChangeConsumers,
+ List>> resourcesUpdateConsumers,
List>> promptsChangeConsumers,
List> loggingConsumers,
- Function samplingHandler) {
+ List> progressConsumers,
+ Function samplingHandler,
+ Function elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
this.clientCapabilities = (clientCapabilities != null) ? clientCapabilities
: new McpSchema.ClientCapabilities(null,
!Utils.isEmpty(roots) ? new McpSchema.ClientCapabilities.RootCapabilities(false) : null,
- samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null);
+ samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null,
+ elicitationHandler != null ? new McpSchema.ClientCapabilities.Elicitation() : null);
this.roots = roots != null ? new HashMap<>(roots) : new HashMap<>();
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
+ this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
+ this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
this.samplingHandler = samplingHandler;
+ this.elicitationHandler = elicitationHandler;
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ }
+
+ /**
+ * @deprecated Only exists for backwards-compatibility purposes.
+ */
+ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
+ Map roots, List>> toolsChangeConsumers,
+ List>> resourcesChangeConsumers,
+ List>> resourcesUpdateConsumers,
+ List>> promptsChangeConsumers,
+ List> loggingConsumers,
+ Function samplingHandler,
+ Function elicitationHandler) {
+ this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
+ resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
+ elicitationHandler, false);
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
similarity index 67%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
index e5d964b7a..7fdaa8941 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
@@ -5,16 +5,19 @@
package io.modelcontextprotocol.client;
import java.time.Duration;
+import java.util.function.Supplier;
-import io.modelcontextprotocol.spec.ClientMcpTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
import io.modelcontextprotocol.spec.McpSchema.GetPromptResult;
import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult;
import io.modelcontextprotocol.util.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
/**
* A synchronous client implementation for the Model Context Protocol (MCP) that wraps an
@@ -47,6 +50,7 @@
*
* @author Dariusz Jędrzejczyk
* @author Christian Tzolov
+ * @author Jihoon Kim
* @see McpClient
* @see McpAsyncClient
* @see McpSchema
@@ -62,17 +66,28 @@ public class McpSyncClient implements AutoCloseable {
private final McpAsyncClient delegate;
+ private final Supplier contextProvider;
+
/**
* Create a new McpSyncClient with the given delegate.
* @param delegate the asynchronous kernel on top of which this synchronous client
* provides a blocking API.
- * @deprecated Use {@link McpClient#sync(ClientMcpTransport)} to obtain an instance.
+ * @param contextProvider the supplier of context before calling any non-blocking
+ * operation on underlying delegate
*/
- @Deprecated
- // TODO make the constructor package private post-deprecation
- public McpSyncClient(McpAsyncClient delegate) {
+ McpSyncClient(McpAsyncClient delegate, Supplier contextProvider) {
Assert.notNull(delegate, "The delegate can not be null");
+ Assert.notNull(contextProvider, "The contextProvider can not be null");
this.delegate = delegate;
+ this.contextProvider = contextProvider;
+ }
+
+ /**
+ * Get the current initialization result.
+ * @return the initialization result.
+ */
+ public McpSchema.InitializeResult getCurrentInitializationResult() {
+ return this.delegate.getCurrentInitializationResult();
}
/**
@@ -83,6 +98,15 @@ public McpSchema.ServerCapabilities getServerCapabilities() {
return this.delegate.getServerCapabilities();
}
+ /**
+ * Get the server instructions that provide guidance to the client on how to interact
+ * with this server.
+ * @return The instructions
+ */
+ public String getServerInstructions() {
+ return this.delegate.getServerInstructions();
+ }
+
/**
* Get the server implementation information.
* @return The server implementation details
@@ -91,6 +115,14 @@ public McpSchema.Implementation getServerInfo() {
return this.delegate.getServerInfo();
}
+ /**
+ * Check if the client-server connection is initialized.
+ * @return true if the client-server connection is initialized
+ */
+ public boolean isInitialized() {
+ return this.delegate.isInitialized();
+ }
+
/**
* Get the client capabilities that define the supported features and functionality.
* @return The client capabilities
@@ -154,14 +186,14 @@ public boolean closeGracefully() {
public McpSchema.InitializeResult initialize() {
// TODO: block takes no argument here as we assume the async client is
// configured with a requestTimeout at all times
- return this.delegate.initialize().block();
+ return withProvidedContext(this.delegate.initialize()).block();
}
/**
* Send a roots/list_changed notification.
*/
public void rootsListChangedNotification() {
- this.delegate.rootsListChangedNotification().block();
+ withProvidedContext(this.delegate.rootsListChangedNotification()).block();
}
/**
@@ -183,7 +215,7 @@ public void removeRoot(String rootUri) {
* @return
*/
public Object ping() {
- return this.delegate.ping().block();
+ return withProvidedContext(this.delegate.ping()).block();
}
// --------------------------
@@ -201,17 +233,18 @@ public Object ping() {
* Boolean indicating if the execution failed (true) or succeeded (false/absent)
*/
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
- return this.delegate.callTool(callToolRequest).block();
+ return withProvidedContext(this.delegate.callTool(callToolRequest)).block();
+
}
/**
* Retrieves the list of all tools provided by the server.
- * @return The list of tools result containing: - tools: List of available tools, each
- * with a name, description, and input schema - nextCursor: Optional cursor for
+ * @return The list of all tools result containing: - tools: List of available tools,
+ * each with a name, description, and input schema - nextCursor: Optional cursor for
* pagination if more tools are available
*/
public McpSchema.ListToolsResult listTools() {
- return this.delegate.listTools().block();
+ return withProvidedContext(this.delegate.listTools()).block();
}
/**
@@ -222,7 +255,8 @@ public McpSchema.ListToolsResult listTools() {
* pagination if more tools are available
*/
public McpSchema.ListToolsResult listTools(String cursor) {
- return this.delegate.listTools(cursor).block();
+ return withProvidedContext(this.delegate.listTools(cursor)).block();
+
}
// --------------------------
@@ -230,20 +264,22 @@ public McpSchema.ListToolsResult listTools(String cursor) {
// --------------------------
/**
- * Send a resources/list request.
- * @param cursor the cursor
- * @return the list of resources result.
+ * Retrieves the list of all resources provided by the server.
+ * @return The list of all resources result
*/
- public McpSchema.ListResourcesResult listResources(String cursor) {
- return this.delegate.listResources(cursor).block();
+ public McpSchema.ListResourcesResult listResources() {
+ return withProvidedContext(this.delegate.listResources()).block();
+
}
/**
- * Send a resources/list request.
- * @return the list of resources result.
+ * Retrieves a paginated list of resources provided by the server.
+ * @param cursor Optional pagination cursor from a previous list request
+ * @return The list of resources result
*/
- public McpSchema.ListResourcesResult listResources() {
- return this.delegate.listResources().block();
+ public McpSchema.ListResourcesResult listResources(String cursor) {
+ return withProvidedContext(this.delegate.listResources(cursor)).block();
+
}
/**
@@ -252,7 +288,8 @@ public McpSchema.ListResourcesResult listResources() {
* @return the resource content.
*/
public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
- return this.delegate.readResource(resource).block();
+ return withProvidedContext(this.delegate.readResource(resource)).block();
+
}
/**
@@ -261,27 +298,30 @@ public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
* @return the resource content.
*/
public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
- return this.delegate.readResource(readResourceRequest).block();
+ return withProvidedContext(this.delegate.readResource(readResourceRequest)).block();
+
+ }
+
+ /**
+ * Retrieves the list of all resource templates provided by the server.
+ * @return The list of all resource templates result.
+ */
+ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
+ return withProvidedContext(this.delegate.listResourceTemplates()).block();
+
}
/**
* Resource templates allow servers to expose parameterized resources using URI
* templates. Arguments may be auto-completed through the completion API.
*
- * Request a list of resource templates the server has.
- * @param cursor the cursor
- * @return the list of resource templates result.
+ * Retrieves a paginated list of resource templates provided by the server.
+ * @param cursor Optional pagination cursor from a previous list request
+ * @return The list of resource templates result.
*/
public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
- return this.delegate.listResourceTemplates(cursor).block();
- }
+ return withProvidedContext(this.delegate.listResourceTemplates(cursor)).block();
- /**
- * Request a list of resource templates the server has.
- * @return the list of resource templates result.
- */
- public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
- return this.delegate.listResourceTemplates().block();
}
/**
@@ -294,7 +334,8 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
* subscribe to.
*/
public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
- this.delegate.subscribeResource(subscribeRequest).block();
+ withProvidedContext(this.delegate.subscribeResource(subscribeRequest)).block();
+
}
/**
@@ -303,22 +344,34 @@ public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
* to unsubscribe from.
*/
public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
- this.delegate.unsubscribeResource(unsubscribeRequest).block();
+ withProvidedContext(this.delegate.unsubscribeResource(unsubscribeRequest)).block();
+
}
// --------------------------
// Prompts
// --------------------------
- public ListPromptsResult listPrompts(String cursor) {
- return this.delegate.listPrompts(cursor).block();
- }
+ /**
+ * Retrieves the list of all prompts provided by the server.
+ * @return The list of all prompts result.
+ */
public ListPromptsResult listPrompts() {
- return this.delegate.listPrompts().block();
+ return withProvidedContext(this.delegate.listPrompts()).block();
+ }
+
+ /**
+ * Retrieves a paginated list of prompts provided by the server.
+ * @param cursor Optional pagination cursor from a previous list request
+ * @return The list of prompts result.
+ */
+ public ListPromptsResult listPrompts(String cursor) {
+ return withProvidedContext(this.delegate.listPrompts(cursor)).block();
+
}
public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
- return this.delegate.getPrompt(getPromptRequest).block();
+ return withProvidedContext(this.delegate.getPrompt(getPromptRequest)).block();
}
/**
@@ -326,7 +379,29 @@ public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
* @param loggingLevel the min logging level
*/
public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
- this.delegate.setLoggingLevel(loggingLevel).block();
+ withProvidedContext(this.delegate.setLoggingLevel(loggingLevel)).block();
+
+ }
+
+ /**
+ * Send a completion/complete request.
+ * @param completeRequest the completion request contains the prompt or resource
+ * reference and arguments for generating suggestions.
+ * @return the completion result containing suggested values.
+ */
+ public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) {
+ return withProvidedContext(this.delegate.completeCompletion(completeRequest)).block();
+
+ }
+
+ /**
+ * For a given action, on assembly, capture the "context" via the
+ * {@link #contextProvider} and store it in the Reactor context.
+ * @param action the action to perform
+ * @return the result of the action
+ */
+ private Mono withProvidedContext(Mono action) {
+ return action.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, this.contextProvider.get()));
}
}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
new file mode 100644
index 000000000..ae093316f
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
@@ -0,0 +1,513 @@
+/*
+ * Copyright 2024 - 2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.spec.HttpHeaders;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
+import io.modelcontextprotocol.spec.McpTransportException;
+import io.modelcontextprotocol.spec.ProtocolVersions;
+import io.modelcontextprotocol.util.Assert;
+import io.modelcontextprotocol.util.Utils;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+/**
+ * Server-Sent Events (SSE) implementation of the
+ * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE
+ * transport specification, using Java's HttpClient.
+ *
+ *
+ * This transport implementation establishes a bidirectional communication channel between
+ * client and server using SSE for server-to-client messages and HTTP POST requests for
+ * client-to-server messages. The transport:
+ *
+ * - Establishes an SSE connection to receive server messages
+ * - Handles endpoint discovery through SSE events
+ * - Manages message serialization/deserialization using Jackson
+ * - Provides graceful connection termination
+ *
+ *
+ *
+ * The transport supports two types of SSE events:
+ *
+ * - 'endpoint' - Contains the URL for sending client messages
+ * - 'message' - Contains JSON-RPC message payload
+ *
+ *
+ * @author Christian Tzolov
+ * @see io.modelcontextprotocol.spec.McpTransport
+ * @see io.modelcontextprotocol.spec.McpClientTransport
+ */
+public class HttpClientSseClientTransport implements McpClientTransport {
+
+ private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05;
+
+ private static final String MCP_PROTOCOL_VERSION_HEADER_NAME = "MCP-Protocol-Version";
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpClientSseClientTransport.class);
+
+ /** SSE event type for JSON-RPC messages */
+ private static final String MESSAGE_EVENT_TYPE = "message";
+
+ /** SSE event type for endpoint discovery */
+ private static final String ENDPOINT_EVENT_TYPE = "endpoint";
+
+ /** Default SSE endpoint path */
+ private static final String DEFAULT_SSE_ENDPOINT = "/sse";
+
+ /** Base URI for the MCP server */
+ private final URI baseUri;
+
+ /** SSE endpoint path */
+ private final String sseEndpoint;
+
+ /**
+ * HTTP client for sending messages to the server. Uses HTTP POST over the message
+ * endpoint
+ */
+ private final HttpClient httpClient;
+
+ /** HTTP request builder for building requests to send messages to the server */
+ private final HttpRequest.Builder requestBuilder;
+
+ /** JSON mapper for message serialization/deserialization */
+ protected McpJsonMapper jsonMapper;
+
+ /** Flag indicating if the transport is in closing state */
+ private volatile boolean isClosing = false;
+
+ /** Holds the SSE subscription disposable */
+ private final AtomicReference sseSubscription = new AtomicReference<>();
+
+ /**
+ * Sink for managing the message endpoint URI provided by the server. Stores the most
+ * recent endpoint URI and makes it available for outbound message processing.
+ */
+ protected final Sinks.One messageEndpointSink = Sinks.one();
+
+ /**
+ * Customizer to modify requests before they are executed.
+ */
+ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
+
+ /**
+ * Creates a new transport instance with custom HTTP client builder, object mapper,
+ * and headers.
+ * @param httpClient the HTTP client to use
+ * @param requestBuilder the HTTP request builder to use
+ * @param baseUri the base URI of the MCP server
+ * @param sseEndpoint the SSE endpoint path
+ * @param jsonMapper the object mapper for JSON serialization/deserialization
+ * @param httpRequestCustomizer customizer for the requestBuilder before executing
+ * requests
+ * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
+ */
+ HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
+ String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
+ Assert.notNull(httpClient, "httpClient must not be null");
+ Assert.notNull(requestBuilder, "requestBuilder must not be null");
+ Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
+ this.baseUri = URI.create(baseUri);
+ this.sseEndpoint = sseEndpoint;
+ this.jsonMapper = jsonMapper;
+ this.httpClient = httpClient;
+ this.requestBuilder = requestBuilder;
+ this.httpRequestCustomizer = httpRequestCustomizer;
+ }
+
+ @Override
+ public List protocolVersions() {
+ return List.of(ProtocolVersions.MCP_2024_11_05);
+ }
+
+ /**
+ * Creates a new builder for {@link HttpClientSseClientTransport}.
+ * @param baseUri the base URI of the MCP server
+ * @return a new builder instance
+ */
+ public static Builder builder(String baseUri) {
+ return new Builder().baseUri(baseUri);
+ }
+
+ /**
+ * Builder for {@link HttpClientSseClientTransport}.
+ */
+ public static class Builder {
+
+ private String baseUri;
+
+ private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
+
+ private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);
+
+ private McpJsonMapper jsonMapper;
+
+ private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
+
+ private McpAsyncHttpClientRequestCustomizer httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.NOOP;
+
+ private Duration connectTimeout = Duration.ofSeconds(10);
+
+ /**
+ * Creates a new builder instance.
+ */
+ Builder() {
+ // Default constructor
+ }
+
+ /**
+ * Creates a new builder with the specified base URI.
+ * @param baseUri the base URI of the MCP server
+ * @deprecated Use {@link HttpClientSseClientTransport#builder(String)} instead.
+ * This constructor is deprecated and will be removed or made {@code protected} or
+ * {@code private} in a future release.
+ */
+ @Deprecated(forRemoval = true)
+ public Builder(String baseUri) {
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ this.baseUri = baseUri;
+ }
+
+ /**
+ * Sets the base URI.
+ * @param baseUri the base URI
+ * @return this builder
+ */
+ Builder baseUri(String baseUri) {
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ this.baseUri = baseUri;
+ return this;
+ }
+
+ /**
+ * Sets the SSE endpoint path.
+ * @param sseEndpoint the SSE endpoint path
+ * @return this builder
+ */
+ public Builder sseEndpoint(String sseEndpoint) {
+ Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
+ this.sseEndpoint = sseEndpoint;
+ return this;
+ }
+
+ /**
+ * Sets the HTTP client builder.
+ * @param clientBuilder the HTTP client builder
+ * @return this builder
+ */
+ public Builder clientBuilder(HttpClient.Builder clientBuilder) {
+ Assert.notNull(clientBuilder, "clientBuilder must not be null");
+ this.clientBuilder = clientBuilder;
+ return this;
+ }
+
+ /**
+ * Customizes the HTTP client builder.
+ * @param clientCustomizer the consumer to customize the HTTP client builder
+ * @return this builder
+ */
+ public Builder customizeClient(final Consumer clientCustomizer) {
+ Assert.notNull(clientCustomizer, "clientCustomizer must not be null");
+ clientCustomizer.accept(clientBuilder);
+ return this;
+ }
+
+ /**
+ * Sets the HTTP request builder.
+ * @param requestBuilder the HTTP request builder
+ * @return this builder
+ */
+ public Builder requestBuilder(HttpRequest.Builder requestBuilder) {
+ Assert.notNull(requestBuilder, "requestBuilder must not be null");
+ this.requestBuilder = requestBuilder;
+ return this;
+ }
+
+ /**
+ * Customizes the HTTP client builder.
+ * @param requestCustomizer the consumer to customize the HTTP request builder
+ * @return this builder
+ */
+ public Builder customizeRequest(final Consumer requestCustomizer) {
+ Assert.notNull(requestCustomizer, "requestCustomizer must not be null");
+ requestCustomizer.accept(requestBuilder);
+ return this;
+ }
+
+ /**
+ * Sets the JSON mapper implementation to use for serialization/deserialization.
+ * @param jsonMapper the JSON mapper
+ * @return this builder
+ */
+ public Builder jsonMapper(McpJsonMapper jsonMapper) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ this.jsonMapper = jsonMapper;
+ return this;
+ }
+
+ /**
+ * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
+ * executing them.
+ *
+ * This overrides the customizer from
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}.
+ *
+ * Do NOT use a blocking {@link McpSyncHttpClientRequestCustomizer} in a
+ * non-blocking context. Use
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}
+ * instead.
+ * @param syncHttpRequestCustomizer the request customizer
+ * @return this builder
+ */
+ public Builder httpRequestCustomizer(McpSyncHttpClientRequestCustomizer syncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.fromSync(syncHttpRequestCustomizer);
+ return this;
+ }
+
+ /**
+ * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
+ * executing them.
+ *
+ * This overrides the customizer from
+ * {@link #httpRequestCustomizer(McpSyncHttpClientRequestCustomizer)}.
+ *
+ * Do NOT use a blocking implementation in a non-blocking context.
+ * @param asyncHttpRequestCustomizer the request customizer
+ * @return this builder
+ */
+ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer asyncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = asyncHttpRequestCustomizer;
+ return this;
+ }
+
+ /**
+ * Sets the connection timeout for the HTTP client.
+ * @param connectTimeout the connection timeout duration
+ * @return this builder
+ */
+ public Builder connectTimeout(Duration connectTimeout) {
+ Assert.notNull(connectTimeout, "connectTimeout must not be null");
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ /**
+ * Builds a new {@link HttpClientSseClientTransport} instance.
+ * @return a new transport instance
+ */
+ public HttpClientSseClientTransport build() {
+ HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
+ return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint,
+ jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer);
+ }
+
+ }
+
+ @Override
+ public Mono connect(Function, Mono> handler) {
+ var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
+
+ return Mono.deferContextual(ctx -> {
+ var builder = requestBuilder.copy()
+ .uri(uri)
+ .header("Accept", "text/event-stream")
+ .header("Cache-Control", "no-cache")
+ .header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
+ .GET();
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
+ }).flatMap(requestBuilder -> Mono.create(sink -> {
+ Disposable connection = Flux.create(sseSink -> this.httpClient
+ .sendAsync(requestBuilder.build(),
+ responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
+ .exceptionallyCompose(e -> {
+ sseSink.error(e);
+ return CompletableFuture.failedFuture(e);
+ }))
+ .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
+ .flatMap(responseEvent -> {
+ if (isClosing) {
+ return Mono.empty();
+ }
+
+ int statusCode = responseEvent.responseInfo().statusCode();
+
+ if (statusCode >= 200 && statusCode < 300) {
+ try {
+ if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
+ String messageEndpointUri = responseEvent.sseEvent().data();
+ if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
+ sink.success();
+ return Flux.empty(); // No further processing needed
+ }
+ else {
+ sink.error(new RuntimeException("Failed to handle SSE endpoint event"));
+ }
+ }
+ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
+ JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper,
+ responseEvent.sseEvent().data());
+ sink.success();
+ return Flux.just(message);
+ }
+ else {
+ logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent());
+ sink.success();
+ }
+ }
+ catch (IOException e) {
+ sink.error(new McpTransportException("Error processing SSE event", e));
+ }
+ }
+ return Flux.error(
+ new RuntimeException("Failed to send message: " + responseEvent));
+
+ })
+ .flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
+ .onErrorComplete(t -> {
+ if (!isClosing) {
+ logger.warn("SSE stream observed an error", t);
+ sink.error(t);
+ }
+ return true;
+ })
+ .doFinally(s -> {
+ Disposable ref = this.sseSubscription.getAndSet(null);
+ if (ref != null && !ref.isDisposed()) {
+ ref.dispose();
+ }
+ })
+ .contextWrite(sink.contextView())
+ .subscribe();
+
+ this.sseSubscription.set(connection);
+ }));
+ }
+
+ /**
+ * Sends a JSON-RPC message to the server.
+ *
+ *
+ * This method waits for the message endpoint to be discovered before sending the
+ * message. The message is serialized to JSON and sent as an HTTP POST request.
+ * @param message the JSON-RPC message to send
+ * @return a Mono that completes when the message is sent
+ * @throws McpError if the message endpoint is not available or the wait times out
+ */
+ @Override
+ public Mono sendMessage(JSONRPCMessage message) {
+
+ return this.messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
+ if (isClosing) {
+ return Mono.empty();
+ }
+
+ return this.serializeMessage(message)
+ .flatMap(body -> sendHttpPost(messageEndpointUri, body).handle((response, sink) -> {
+ if (response.statusCode() != 200 && response.statusCode() != 201 && response.statusCode() != 202
+ && response.statusCode() != 206) {
+ sink.error(new RuntimeException("Sending message failed with a non-OK HTTP code: "
+ + response.statusCode() + " - " + response.body()));
+ }
+ else {
+ sink.next(response);
+ sink.complete();
+ }
+ }))
+ .doOnError(error -> {
+ if (!isClosing) {
+ logger.error("Error sending message: {}", error.getMessage());
+ }
+ });
+ }).then();
+
+ }
+
+ private Mono serializeMessage(final JSONRPCMessage message) {
+ return Mono.defer(() -> {
+ try {
+ return Mono.just(jsonMapper.writeValueAsString(message));
+ }
+ catch (IOException e) {
+ return Mono.error(new McpTransportException("Failed to serialize message", e));
+ }
+ });
+ }
+
+ private Mono> sendHttpPost(final String endpoint, final String body) {
+ final URI requestUri = Utils.resolveUri(baseUri, endpoint);
+ return Mono.deferContextual(ctx -> {
+ var builder = this.requestBuilder.copy()
+ .uri(requestUri)
+ .header(HttpHeaders.CONTENT_TYPE, "application/json")
+ .header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
+ .POST(HttpRequest.BodyPublishers.ofString(body));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body, transportContext));
+ }).flatMap(customizedBuilder -> {
+ var request = customizedBuilder.build();
+ return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
+ });
+ }
+
+ /**
+ * Gracefully closes the transport connection.
+ *
+ *
+ * Sets the closing flag and disposes of the SSE subscription. This prevents new
+ * messages from being sent and allows ongoing operations to complete.
+ * @return a Mono that completes when the closing process is initiated
+ */
+ @Override
+ public Mono closeGracefully() {
+ return Mono.fromRunnable(() -> {
+ isClosing = true;
+ Disposable subscription = sseSubscription.get();
+ if (subscription != null && !subscription.isDisposed()) {
+ subscription.dispose();
+ }
+ });
+ }
+
+ /**
+ * Unmarshal data to the specified type using the configured object mapper.
+ * @param data the data to unmarshal
+ * @param typeRef the type reference for the target type
+ * @param the target type
+ * @return the unmarshalled object
+ */
+ @Override
+ public T unmarshalFrom(Object data, TypeRef typeRef) {
+ return this.jsonMapper.convertValue(data, typeRef);
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
new file mode 100644
index 000000000..0a8dff363
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
@@ -0,0 +1,832 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandler;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import io.modelcontextprotocol.client.McpAsyncClient;
+import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
+import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
+import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
+import io.modelcontextprotocol.spec.HttpHeaders;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpTransportException;
+import io.modelcontextprotocol.spec.McpTransportSession;
+import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
+import io.modelcontextprotocol.spec.McpTransportStream;
+import io.modelcontextprotocol.spec.ProtocolVersions;
+import io.modelcontextprotocol.util.Assert;
+import io.modelcontextprotocol.util.Utils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+/**
+ * An implementation of the Streamable HTTP protocol as defined by the
+ * 2025-03-26 version of the MCP specification.
+ *
+ *
+ * The transport is capable of resumability and reconnects. It reacts to transport-level
+ * session invalidation and will propagate {@link McpTransportSessionNotFoundException
+ * appropriate exceptions} to the higher level abstraction layer when needed in order to
+ * allow proper state management. The implementation handles servers that are stateful and
+ * provide session meta information, but can also communicate with stateless servers that
+ * do not provide a session identifier and do not support SSE streams.
+ *
+ *
+ * This implementation does not handle backwards compatibility with the "HTTP
+ * with SSE" transport. In order to communicate over the phased-out
+ * 2024-11-05 protocol, use {@link HttpClientSseClientTransport} or
+ * {@code WebFluxSseClientTransport}.
+ *
+ *
+ * @author Christian Tzolov
+ * @see Streamable
+ * HTTP transport specification
+ */
+public class HttpClientStreamableHttpTransport implements McpClientTransport {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpClientStreamableHttpTransport.class);
+
+ private static final String DEFAULT_ENDPOINT = "/mcp";
+
+ /**
+ * HTTP client for sending messages to the server. Uses HTTP POST over the message
+ * endpoint
+ */
+ private final HttpClient httpClient;
+
+ /** HTTP request builder for building requests to send messages to the server */
+ private final HttpRequest.Builder requestBuilder;
+
+ /**
+ * Event type for JSON-RPC messages received through the SSE connection. The server
+ * sends messages with this event type to transmit JSON-RPC protocol data.
+ */
+ private static final String MESSAGE_EVENT_TYPE = "message";
+
+ private static final String APPLICATION_JSON = "application/json";
+
+ private static final String TEXT_EVENT_STREAM = "text/event-stream";
+
+ public static int NOT_FOUND = 404;
+
+ public static int METHOD_NOT_ALLOWED = 405;
+
+ public static int BAD_REQUEST = 400;
+
+ private final McpJsonMapper jsonMapper;
+
+ private final URI baseUri;
+
+ private final String endpoint;
+
+ private final boolean openConnectionOnStartup;
+
+ private final boolean resumableStreams;
+
+ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
+
+ private final AtomicReference> activeSession = new AtomicReference<>();
+
+ private final AtomicReference, Mono>> handler = new AtomicReference<>();
+
+ private final AtomicReference> exceptionHandler = new AtomicReference<>();
+
+ private final List supportedProtocolVersions;
+
+ private final String latestSupportedProtocolVersion;
+
+ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
+ HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
+ boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
+ List supportedProtocolVersions) {
+ this.jsonMapper = jsonMapper;
+ this.httpClient = httpClient;
+ this.requestBuilder = requestBuilder;
+ this.baseUri = URI.create(baseUri);
+ this.endpoint = endpoint;
+ this.resumableStreams = resumableStreams;
+ this.openConnectionOnStartup = openConnectionOnStartup;
+ this.activeSession.set(createTransportSession());
+ this.httpRequestCustomizer = httpRequestCustomizer;
+ this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
+ this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream()
+ .sorted(Comparator.reverseOrder())
+ .findFirst()
+ .get();
+ }
+
+ @Override
+ public List protocolVersions() {
+ return supportedProtocolVersions;
+ }
+
+ public static Builder builder(String baseUri) {
+ return new Builder(baseUri);
+ }
+
+ @Override
+ public Mono connect(Function, Mono> handler) {
+ return Mono.deferContextual(ctx -> {
+ this.handler.set(handler);
+ if (this.openConnectionOnStartup) {
+ logger.debug("Eagerly opening connection on startup");
+ return this.reconnect(null).onErrorComplete(t -> {
+ logger.warn("Eager connect failed ", t);
+ return true;
+ }).then();
+ }
+ return Mono.empty();
+ });
+ }
+
+ private McpTransportSession createTransportSession() {
+ Function> onClose = sessionId -> sessionId == null ? Mono.empty()
+ : createDelete(sessionId);
+ return new DefaultMcpTransportSession(onClose);
+ }
+
+ private McpTransportSession createClosedSession(McpTransportSession existingSession) {
+ var existingSessionId = Optional.ofNullable(existingSession)
+ .filter(session -> !(session instanceof ClosedMcpTransportSession))
+ .flatMap(McpTransportSession::sessionId)
+ .orElse(null);
+ return new ClosedMcpTransportSession<>(existingSessionId);
+ }
+
+ private Publisher createDelete(String sessionId) {
+
+ var uri = Utils.resolveUri(this.baseUri, this.endpoint);
+ return Mono.deferContextual(ctx -> {
+ var builder = this.requestBuilder.copy()
+ .uri(uri)
+ .header("Cache-Control", "no-cache")
+ .header(HttpHeaders.MCP_SESSION_ID, sessionId)
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
+ .DELETE();
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext));
+ }).flatMap(requestBuilder -> {
+ var request = requestBuilder.build();
+ return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
+ }).then();
+ }
+
+ @Override
+ public void setExceptionHandler(Consumer handler) {
+ logger.debug("Exception handler registered");
+ this.exceptionHandler.set(handler);
+ }
+
+ private void handleException(Throwable t) {
+ logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
+ if (t instanceof McpTransportSessionNotFoundException) {
+ McpTransportSession> invalidSession = this.activeSession.getAndSet(createTransportSession());
+ logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
+ invalidSession.close();
+ }
+ Consumer handler = this.exceptionHandler.get();
+ if (handler != null) {
+ handler.accept(t);
+ }
+ }
+
+ @Override
+ public Mono closeGracefully() {
+ return Mono.defer(() -> {
+ logger.debug("Graceful close triggered");
+ McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
+ if (currentSession != null) {
+ return Mono.from(currentSession.closeGracefully());
+ }
+ return Mono.empty();
+ });
+ }
+
+ private Mono reconnect(McpTransportStream stream) {
+
+ return Mono.deferContextual(ctx -> {
+
+ if (stream != null) {
+ logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
+ }
+ else {
+ logger.debug("Reconnecting with no prior stream");
+ }
+
+ final AtomicReference disposableRef = new AtomicReference<>();
+ final McpTransportSession transportSession = this.activeSession.get();
+ var uri = Utils.resolveUri(this.baseUri, this.endpoint);
+
+ Disposable connection = Mono.deferContextual(connectionCtx -> {
+ HttpRequest.Builder requestBuilder = this.requestBuilder.copy();
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID,
+ transportSession.sessionId().get());
+ }
+
+ if (stream != null && stream.lastId().isPresent()) {
+ requestBuilder = requestBuilder.header(HttpHeaders.LAST_EVENT_ID, stream.lastId().get());
+ }
+
+ var builder = requestBuilder.uri(uri)
+ .header(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM)
+ .header("Cache-Control", "no-cache")
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
+ .GET();
+ var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
+ })
+ .flatMapMany(
+ requestBuilder -> Flux.create(
+ sseSink -> this.httpClient
+ .sendAsync(requestBuilder.build(),
+ responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
+ sseSink))
+ .whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ sseSink.error(throwable);
+ }
+ else {
+ logger.debug("SSE connection established successfully");
+ }
+ }))
+ .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
+ .flatMap(responseEvent -> {
+ int statusCode = responseEvent.responseInfo().statusCode();
+
+ if (statusCode >= 200 && statusCode < 300) {
+
+ if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
+ try {
+ // We don't support batching ATM and probably
+ // won't since the next version considers
+ // removing it.
+ McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(
+ this.jsonMapper, responseEvent.sseEvent().data());
+
+ Tuple2, Iterable> idWithMessages = Tuples
+ .of(Optional.ofNullable(responseEvent.sseEvent().id()),
+ List.of(message));
+
+ McpTransportStream sessionStream = stream != null ? stream
+ : new DefaultMcpTransportStream<>(this.resumableStreams,
+ this::reconnect);
+ logger.debug("Connected stream {}", sessionStream.streamId());
+
+ return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
+
+ }
+ catch (IOException ioException) {
+ return Flux.error(new McpTransportException(
+ "Error parsing JSON-RPC message: " + responseEvent, ioException));
+ }
+ }
+ else {
+ logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
+ return Flux.empty();
+ }
+ }
+ else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
+ logger
+ .debug("The server does not support SSE streams, using request-response mode.");
+ return Flux.empty();
+ }
+ else if (statusCode == NOT_FOUND) {
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id
+ // and the response is 404, we consider it a
+ // session not found error.
+ logger.debug("Session not found for session ID: {}",
+ transportSession.sessionId().get());
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionIdRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(
+ new McpTransportException("Server Not Found. Status code:" + statusCode
+ + ", response-event:" + responseEvent));
+ }
+ else if (statusCode == BAD_REQUEST) {
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id
+ // and thre response is 404, we consider it a
+ // session not found error.
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionIdRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(
+ new McpTransportException("Bad Request. Status code:" + statusCode
+ + ", response-event:" + responseEvent));
+
+ }
+
+ return Flux.error(new McpTransportException(
+ "Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
+ }).flatMap(
+ jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
+ .onErrorMap(CompletionException.class, t -> t.getCause())
+ .onErrorComplete(t -> {
+ this.handleException(t);
+ return true;
+ })
+ .doFinally(s -> {
+ Disposable ref = disposableRef.getAndSet(null);
+ if (ref != null) {
+ transportSession.removeConnection(ref);
+ }
+ }))
+ .contextWrite(ctx)
+ .subscribe();
+
+ disposableRef.set(connection);
+ transportSession.addConnection(connection);
+ return Mono.just(connection);
+ });
+
+ }
+
+ private BodyHandler toSendMessageBodySubscriber(FluxSink sink) {
+
+ BodyHandler responseBodyHandler = responseInfo -> {
+
+ String contentType = responseInfo.headers().firstValue(HttpHeaders.CONTENT_TYPE).orElse("").toLowerCase();
+
+ if (contentType.contains(TEXT_EVENT_STREAM)) {
+ // For SSE streams, use line subscriber that returns Void
+ logger.debug("Received SSE stream response, using line subscriber");
+ return ResponseSubscribers.sseToBodySubscriber(responseInfo, sink);
+ }
+ else if (contentType.contains(APPLICATION_JSON)) {
+ // For JSON responses and others, use string subscriber
+ logger.debug("Received response, using string subscriber");
+ return ResponseSubscribers.aggregateBodySubscriber(responseInfo, sink);
+ }
+
+ logger.debug("Received Bodyless response, using discarding subscriber");
+ return ResponseSubscribers.bodilessBodySubscriber(responseInfo, sink);
+ };
+
+ return responseBodyHandler;
+
+ }
+
+ public String toString(McpSchema.JSONRPCMessage message) {
+ try {
+ return this.jsonMapper.writeValueAsString(message);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to serialize JSON-RPC message", e);
+ }
+ }
+
+ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
+ return Mono.create(deliveredSink -> {
+ logger.debug("Sending message {}", sentMessage);
+
+ final AtomicReference disposableRef = new AtomicReference<>();
+ final McpTransportSession