> progressConsumers) {
+ Assert.notNull(progressConsumers, "Progress consumers must not be null");
+ this.progressConsumers.addAll(progressConsumers);
+ return this;
+ }
+
+ /**
+ * Add a provider of {@link McpTransportContext}, providing a context before
+ * calling any client operation. This allows to extract thread-locals and hand
+ * them over to the underlying transport.
+ *
+ * There is no direct equivalent in {@link AsyncSpec}. To achieve the same result,
+ * append {@code contextWrite(McpTransportContext.KEY, context)} to any
+ * {@link McpAsyncClient} call.
+ * @param contextProvider A supplier to create a context
+ * @return This builder for method chaining
+ */
+ public SyncSpec transportContextProvider(Supplier contextProvider) {
+ this.contextProvider = contextProvider;
+ return this;
+ }
+
+ /**
+ * Add a {@link JsonSchemaValidator} to validate the JSON structure of the
+ * structured output.
+ * @param jsonSchemaValidator A validator to validate the JSON structure of the
+ * structured output. Must not be null.
+ * @return This builder for method chaining
+ * @throws IllegalArgumentException if jsonSchemaValidator is null
+ */
+ public SyncSpec jsonSchemaValidator(JsonSchemaValidator jsonSchemaValidator) {
+ Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null");
+ this.jsonSchemaValidator = jsonSchemaValidator;
+ return this;
+ }
+
+ /**
+ * Enables automatic schema caching during callTool operations. When a tool's
+ * output schema is not found in the cache, callTool will automatically fetch and
+ * cache all tool schemas via listTools.
+ * @param enableCallToolSchemaCaching true to enable, false to disable
+ * @return This builder instance for method chaining
+ */
+ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching) {
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ return this;
+ }
+
/**
* Create an instance of {@link McpSyncClient} with the provided configurations or
* sensible defaults.
@@ -385,12 +486,14 @@ public SyncSpec loggingConsumers(List roots = new HashMap<>();
@@ -435,10 +538,16 @@ class AsyncSpec {
private final List>> loggingConsumers = new ArrayList<>();
+ private final List>> progressConsumers = new ArrayList<>();
+
private Function> samplingHandler;
private Function> elicitationHandler;
+ private JsonSchemaValidator jsonSchemaValidator;
+
+ private boolean enableCallToolSchemaCaching = false; // Default to false
+
private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
@@ -654,17 +763,76 @@ public AsyncSpec loggingConsumers(
return this;
}
+ /**
+ * Adds a consumer to be notified of progress notifications from the server. This
+ * allows the client to track long-running operations and provide feedback to
+ * users.
+ * @param progressConsumer A consumer that receives progress notifications. Must
+ * not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if progressConsumer is null
+ */
+ public AsyncSpec progressConsumer(Function> progressConsumer) {
+ Assert.notNull(progressConsumer, "Progress consumer must not be null");
+ this.progressConsumers.add(progressConsumer);
+ return this;
+ }
+
+ /**
+ * Adds a multiple consumers to be notified of progress notifications from the
+ * server. This allows the client to track long-running operations and provide
+ * feedback to users.
+ * @param progressConsumers A list of consumers that receives progress
+ * notifications. Must not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if progressConsumer is null
+ */
+ public AsyncSpec progressConsumers(
+ List>> progressConsumers) {
+ Assert.notNull(progressConsumers, "Progress consumers must not be null");
+ this.progressConsumers.addAll(progressConsumers);
+ return this;
+ }
+
+ /**
+ * Sets the JSON schema validator to use for validating tool responses against
+ * output schemas.
+ * @param jsonSchemaValidator The validator to use. Must not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if jsonSchemaValidator is null
+ */
+ public AsyncSpec jsonSchemaValidator(JsonSchemaValidator jsonSchemaValidator) {
+ Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null");
+ this.jsonSchemaValidator = jsonSchemaValidator;
+ return this;
+ }
+
+ /**
+ * Enables automatic schema caching during callTool operations. When a tool's
+ * output schema is not found in the cache, callTool will automatically fetch and
+ * cache all tool schemas via listTools.
+ * @param enableCallToolSchemaCaching true to enable, false to disable
+ * @return This builder instance for method chaining
+ */
+ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching) {
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ return this;
+ }
+
/**
* Create an instance of {@link McpAsyncClient} with the provided configurations
* or sensible defaults.
* @return a new instance of {@link McpAsyncClient}.
*/
public McpAsyncClient build() {
+ var jsonSchemaValidator = (this.jsonSchemaValidator != null) ? this.jsonSchemaValidator
+ : JsonSchemaValidator.getDefault();
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
+ jsonSchemaValidator,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
- this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler,
- this.elicitationHandler));
+ this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
+ this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
similarity index 75%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
index bd1a0985a..127d53337 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
@@ -59,8 +59,10 @@ class McpClientFeatures {
* @param resourcesChangeConsumers the resources change consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List, Mono>> toolsChangeConsumers,
@@ -68,8 +70,10 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List, Mono>> resourcesUpdateConsumers,
List, Mono>> promptsChangeConsumers,
List>> loggingConsumers,
+ List>> progressConsumers,
Function> samplingHandler,
- Function> elicitationHandler) {
+ Function> elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
/**
* Create an instance and validate the arguments.
@@ -79,8 +83,10 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
* @param resourcesChangeConsumers the resources change consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots,
@@ -89,8 +95,10 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List, Mono>> resourcesUpdateConsumers,
List, Mono>> promptsChangeConsumers,
List>> loggingConsumers,
+ List>> progressConsumers,
Function> samplingHandler,
- Function> elicitationHandler) {
+ Function> elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
@@ -106,8 +114,27 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
+ this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
this.samplingHandler = samplingHandler;
this.elicitationHandler = elicitationHandler;
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ }
+
+ /**
+ * @deprecated Only exists for backwards-compatibility purposes.
+ */
+ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
+ Map roots,
+ List, Mono>> toolsChangeConsumers,
+ List, Mono>> resourcesChangeConsumers,
+ List, Mono>> resourcesUpdateConsumers,
+ List, Mono>> promptsChangeConsumers,
+ List>> loggingConsumers,
+ Function> samplingHandler,
+ Function> elicitationHandler) {
+ this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
+ resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
+ elicitationHandler, false);
}
/**
@@ -149,6 +176,12 @@ public static Async fromSync(Sync syncSpec) {
.subscribeOn(Schedulers.boundedElastic()));
}
+ List>> progressConsumers = new ArrayList<>();
+ for (Consumer consumer : syncSpec.progressConsumers()) {
+ progressConsumers.add(l -> Mono.fromRunnable(() -> consumer.accept(l))
+ .subscribeOn(Schedulers.boundedElastic()));
+ }
+
Function> samplingHandler = r -> Mono
.fromCallable(() -> syncSpec.samplingHandler().apply(r))
.subscribeOn(Schedulers.boundedElastic());
@@ -159,7 +192,8 @@ public static Async fromSync(Sync syncSpec) {
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
- loggingConsumers, samplingHandler, elicitationHandler);
+ loggingConsumers, progressConsumers, samplingHandler, elicitationHandler,
+ syncSpec.enableCallToolSchemaCaching);
}
}
@@ -174,8 +208,10 @@ public static Async fromSync(Sync syncSpec) {
* @param resourcesChangeConsumers the resources change consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
@@ -183,8 +219,10 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
List>> resourcesUpdateConsumers,
List>> promptsChangeConsumers,
List> loggingConsumers,
+ List> progressConsumers,
Function samplingHandler,
- Function elicitationHandler) {
+ Function elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
/**
* Create an instance and validate the arguments.
@@ -196,8 +234,10 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
* @param resourcesUpdateConsumers the resource update consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
+ * @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
@@ -205,8 +245,10 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
List>> resourcesUpdateConsumers,
List>> promptsChangeConsumers,
List> loggingConsumers,
+ List> progressConsumers,
Function samplingHandler,
- Function elicitationHandler) {
+ Function elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
@@ -222,8 +264,26 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
+ this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
this.samplingHandler = samplingHandler;
this.elicitationHandler = elicitationHandler;
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ }
+
+ /**
+ * @deprecated Only exists for backwards-compatibility purposes.
+ */
+ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
+ Map roots, List>> toolsChangeConsumers,
+ List>> resourcesChangeConsumers,
+ List>> resourcesUpdateConsumers,
+ List>> promptsChangeConsumers,
+ List> loggingConsumers,
+ Function samplingHandler,
+ Function elicitationHandler) {
+ this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
+ resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
+ elicitationHandler, false);
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
similarity index 81%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
index 27b020f05..7fdaa8941 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
@@ -5,17 +5,19 @@
package io.modelcontextprotocol.client;
import java.time.Duration;
-import java.util.List;
+import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
import io.modelcontextprotocol.spec.McpSchema.GetPromptResult;
import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult;
import io.modelcontextprotocol.util.Assert;
+import reactor.core.publisher.Mono;
/**
* A synchronous client implementation for the Model Context Protocol (MCP) that wraps an
@@ -64,14 +66,28 @@ public class McpSyncClient implements AutoCloseable {
private final McpAsyncClient delegate;
+ private final Supplier contextProvider;
+
/**
* Create a new McpSyncClient with the given delegate.
* @param delegate the asynchronous kernel on top of which this synchronous client
* provides a blocking API.
+ * @param contextProvider the supplier of context before calling any non-blocking
+ * operation on underlying delegate
*/
- McpSyncClient(McpAsyncClient delegate) {
+ McpSyncClient(McpAsyncClient delegate, Supplier contextProvider) {
Assert.notNull(delegate, "The delegate can not be null");
+ Assert.notNull(contextProvider, "The contextProvider can not be null");
this.delegate = delegate;
+ this.contextProvider = contextProvider;
+ }
+
+ /**
+ * Get the current initialization result.
+ * @return the initialization result.
+ */
+ public McpSchema.InitializeResult getCurrentInitializationResult() {
+ return this.delegate.getCurrentInitializationResult();
}
/**
@@ -170,14 +186,14 @@ public boolean closeGracefully() {
public McpSchema.InitializeResult initialize() {
// TODO: block takes no argument here as we assume the async client is
// configured with a requestTimeout at all times
- return this.delegate.initialize().block();
+ return withProvidedContext(this.delegate.initialize()).block();
}
/**
* Send a roots/list_changed notification.
*/
public void rootsListChangedNotification() {
- this.delegate.rootsListChangedNotification().block();
+ withProvidedContext(this.delegate.rootsListChangedNotification()).block();
}
/**
@@ -199,7 +215,7 @@ public void removeRoot(String rootUri) {
* @return
*/
public Object ping() {
- return this.delegate.ping().block();
+ return withProvidedContext(this.delegate.ping()).block();
}
// --------------------------
@@ -217,7 +233,8 @@ public Object ping() {
* Boolean indicating if the execution failed (true) or succeeded (false/absent)
*/
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
- return this.delegate.callTool(callToolRequest).block();
+ return withProvidedContext(this.delegate.callTool(callToolRequest)).block();
+
}
/**
@@ -227,7 +244,7 @@ public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolReque
* pagination if more tools are available
*/
public McpSchema.ListToolsResult listTools() {
- return this.delegate.listTools().block();
+ return withProvidedContext(this.delegate.listTools()).block();
}
/**
@@ -238,7 +255,8 @@ public McpSchema.ListToolsResult listTools() {
* pagination if more tools are available
*/
public McpSchema.ListToolsResult listTools(String cursor) {
- return this.delegate.listTools(cursor).block();
+ return withProvidedContext(this.delegate.listTools(cursor)).block();
+
}
// --------------------------
@@ -250,7 +268,8 @@ public McpSchema.ListToolsResult listTools(String cursor) {
* @return The list of all resources result
*/
public McpSchema.ListResourcesResult listResources() {
- return this.delegate.listResources().block();
+ return withProvidedContext(this.delegate.listResources()).block();
+
}
/**
@@ -259,7 +278,8 @@ public McpSchema.ListResourcesResult listResources() {
* @return The list of resources result
*/
public McpSchema.ListResourcesResult listResources(String cursor) {
- return this.delegate.listResources(cursor).block();
+ return withProvidedContext(this.delegate.listResources(cursor)).block();
+
}
/**
@@ -268,7 +288,8 @@ public McpSchema.ListResourcesResult listResources(String cursor) {
* @return the resource content.
*/
public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
- return this.delegate.readResource(resource).block();
+ return withProvidedContext(this.delegate.readResource(resource)).block();
+
}
/**
@@ -277,7 +298,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
* @return the resource content.
*/
public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
- return this.delegate.readResource(readResourceRequest).block();
+ return withProvidedContext(this.delegate.readResource(readResourceRequest)).block();
+
}
/**
@@ -285,7 +307,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest r
* @return The list of all resource templates result.
*/
public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
- return this.delegate.listResourceTemplates().block();
+ return withProvidedContext(this.delegate.listResourceTemplates()).block();
+
}
/**
@@ -297,7 +320,8 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
* @return The list of resource templates result.
*/
public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
- return this.delegate.listResourceTemplates(cursor).block();
+ return withProvidedContext(this.delegate.listResourceTemplates(cursor)).block();
+
}
/**
@@ -310,7 +334,8 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor
* subscribe to.
*/
public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
- this.delegate.subscribeResource(subscribeRequest).block();
+ withProvidedContext(this.delegate.subscribeResource(subscribeRequest)).block();
+
}
/**
@@ -319,7 +344,8 @@ public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
* to unsubscribe from.
*/
public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
- this.delegate.unsubscribeResource(unsubscribeRequest).block();
+ withProvidedContext(this.delegate.unsubscribeResource(unsubscribeRequest)).block();
+
}
// --------------------------
@@ -331,7 +357,7 @@ public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest)
* @return The list of all prompts result.
*/
public ListPromptsResult listPrompts() {
- return this.delegate.listPrompts().block();
+ return withProvidedContext(this.delegate.listPrompts()).block();
}
/**
@@ -340,11 +366,12 @@ public ListPromptsResult listPrompts() {
* @return The list of prompts result.
*/
public ListPromptsResult listPrompts(String cursor) {
- return this.delegate.listPrompts(cursor).block();
+ return withProvidedContext(this.delegate.listPrompts(cursor)).block();
+
}
public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
- return this.delegate.getPrompt(getPromptRequest).block();
+ return withProvidedContext(this.delegate.getPrompt(getPromptRequest)).block();
}
/**
@@ -352,7 +379,8 @@ public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
* @param loggingLevel the min logging level
*/
public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
- this.delegate.setLoggingLevel(loggingLevel).block();
+ withProvidedContext(this.delegate.setLoggingLevel(loggingLevel)).block();
+
}
/**
@@ -362,7 +390,18 @@ public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
* @return the completion result containing suggested values.
*/
public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) {
- return this.delegate.completeCompletion(completeRequest).block();
+ return withProvidedContext(this.delegate.completeCompletion(completeRequest)).block();
+
+ }
+
+ /**
+ * For a given action, on assembly, capture the "context" via the
+ * {@link #contextProvider} and store it in the Reactor context.
+ * @param action the action to perform
+ * @return the result of the action
+ */
+ private Mono withProvidedContext(Mono action) {
+ return action.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, this.contextProvider.get()));
}
}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
new file mode 100644
index 000000000..ae093316f
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
@@ -0,0 +1,513 @@
+/*
+ * Copyright 2024 - 2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.spec.HttpHeaders;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
+import io.modelcontextprotocol.spec.McpTransportException;
+import io.modelcontextprotocol.spec.ProtocolVersions;
+import io.modelcontextprotocol.util.Assert;
+import io.modelcontextprotocol.util.Utils;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+/**
+ * Server-Sent Events (SSE) implementation of the
+ * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE
+ * transport specification, using Java's HttpClient.
+ *
+ *
+ * This transport implementation establishes a bidirectional communication channel between
+ * client and server using SSE for server-to-client messages and HTTP POST requests for
+ * client-to-server messages. The transport:
+ *
+ * - Establishes an SSE connection to receive server messages
+ * - Handles endpoint discovery through SSE events
+ * - Manages message serialization/deserialization using Jackson
+ * - Provides graceful connection termination
+ *
+ *
+ *
+ * The transport supports two types of SSE events:
+ *
+ * - 'endpoint' - Contains the URL for sending client messages
+ * - 'message' - Contains JSON-RPC message payload
+ *
+ *
+ * @author Christian Tzolov
+ * @see io.modelcontextprotocol.spec.McpTransport
+ * @see io.modelcontextprotocol.spec.McpClientTransport
+ */
+public class HttpClientSseClientTransport implements McpClientTransport {
+
+ private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05;
+
+ private static final String MCP_PROTOCOL_VERSION_HEADER_NAME = "MCP-Protocol-Version";
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpClientSseClientTransport.class);
+
+ /** SSE event type for JSON-RPC messages */
+ private static final String MESSAGE_EVENT_TYPE = "message";
+
+ /** SSE event type for endpoint discovery */
+ private static final String ENDPOINT_EVENT_TYPE = "endpoint";
+
+ /** Default SSE endpoint path */
+ private static final String DEFAULT_SSE_ENDPOINT = "/sse";
+
+ /** Base URI for the MCP server */
+ private final URI baseUri;
+
+ /** SSE endpoint path */
+ private final String sseEndpoint;
+
+ /**
+ * HTTP client for sending messages to the server. Uses HTTP POST over the message
+ * endpoint
+ */
+ private final HttpClient httpClient;
+
+ /** HTTP request builder for building requests to send messages to the server */
+ private final HttpRequest.Builder requestBuilder;
+
+ /** JSON mapper for message serialization/deserialization */
+ protected McpJsonMapper jsonMapper;
+
+ /** Flag indicating if the transport is in closing state */
+ private volatile boolean isClosing = false;
+
+ /** Holds the SSE subscription disposable */
+ private final AtomicReference sseSubscription = new AtomicReference<>();
+
+ /**
+ * Sink for managing the message endpoint URI provided by the server. Stores the most
+ * recent endpoint URI and makes it available for outbound message processing.
+ */
+ protected final Sinks.One messageEndpointSink = Sinks.one();
+
+ /**
+ * Customizer to modify requests before they are executed.
+ */
+ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
+
+ /**
+ * Creates a new transport instance with custom HTTP client builder, object mapper,
+ * and headers.
+ * @param httpClient the HTTP client to use
+ * @param requestBuilder the HTTP request builder to use
+ * @param baseUri the base URI of the MCP server
+ * @param sseEndpoint the SSE endpoint path
+ * @param jsonMapper the object mapper for JSON serialization/deserialization
+ * @param httpRequestCustomizer customizer for the requestBuilder before executing
+ * requests
+ * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
+ */
+ HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
+ String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
+ Assert.notNull(httpClient, "httpClient must not be null");
+ Assert.notNull(requestBuilder, "requestBuilder must not be null");
+ Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
+ this.baseUri = URI.create(baseUri);
+ this.sseEndpoint = sseEndpoint;
+ this.jsonMapper = jsonMapper;
+ this.httpClient = httpClient;
+ this.requestBuilder = requestBuilder;
+ this.httpRequestCustomizer = httpRequestCustomizer;
+ }
+
+ @Override
+ public List protocolVersions() {
+ return List.of(ProtocolVersions.MCP_2024_11_05);
+ }
+
+ /**
+ * Creates a new builder for {@link HttpClientSseClientTransport}.
+ * @param baseUri the base URI of the MCP server
+ * @return a new builder instance
+ */
+ public static Builder builder(String baseUri) {
+ return new Builder().baseUri(baseUri);
+ }
+
+ /**
+ * Builder for {@link HttpClientSseClientTransport}.
+ */
+ public static class Builder {
+
+ private String baseUri;
+
+ private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
+
+ private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);
+
+ private McpJsonMapper jsonMapper;
+
+ private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
+
+ private McpAsyncHttpClientRequestCustomizer httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.NOOP;
+
+ private Duration connectTimeout = Duration.ofSeconds(10);
+
+ /**
+ * Creates a new builder instance.
+ */
+ Builder() {
+ // Default constructor
+ }
+
+ /**
+ * Creates a new builder with the specified base URI.
+ * @param baseUri the base URI of the MCP server
+ * @deprecated Use {@link HttpClientSseClientTransport#builder(String)} instead.
+ * This constructor is deprecated and will be removed or made {@code protected} or
+ * {@code private} in a future release.
+ */
+ @Deprecated(forRemoval = true)
+ public Builder(String baseUri) {
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ this.baseUri = baseUri;
+ }
+
+ /**
+ * Sets the base URI.
+ * @param baseUri the base URI
+ * @return this builder
+ */
+ Builder baseUri(String baseUri) {
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ this.baseUri = baseUri;
+ return this;
+ }
+
+ /**
+ * Sets the SSE endpoint path.
+ * @param sseEndpoint the SSE endpoint path
+ * @return this builder
+ */
+ public Builder sseEndpoint(String sseEndpoint) {
+ Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
+ this.sseEndpoint = sseEndpoint;
+ return this;
+ }
+
+ /**
+ * Sets the HTTP client builder.
+ * @param clientBuilder the HTTP client builder
+ * @return this builder
+ */
+ public Builder clientBuilder(HttpClient.Builder clientBuilder) {
+ Assert.notNull(clientBuilder, "clientBuilder must not be null");
+ this.clientBuilder = clientBuilder;
+ return this;
+ }
+
+ /**
+ * Customizes the HTTP client builder.
+ * @param clientCustomizer the consumer to customize the HTTP client builder
+ * @return this builder
+ */
+ public Builder customizeClient(final Consumer clientCustomizer) {
+ Assert.notNull(clientCustomizer, "clientCustomizer must not be null");
+ clientCustomizer.accept(clientBuilder);
+ return this;
+ }
+
+ /**
+ * Sets the HTTP request builder.
+ * @param requestBuilder the HTTP request builder
+ * @return this builder
+ */
+ public Builder requestBuilder(HttpRequest.Builder requestBuilder) {
+ Assert.notNull(requestBuilder, "requestBuilder must not be null");
+ this.requestBuilder = requestBuilder;
+ return this;
+ }
+
+ /**
+ * Customizes the HTTP client builder.
+ * @param requestCustomizer the consumer to customize the HTTP request builder
+ * @return this builder
+ */
+ public Builder customizeRequest(final Consumer requestCustomizer) {
+ Assert.notNull(requestCustomizer, "requestCustomizer must not be null");
+ requestCustomizer.accept(requestBuilder);
+ return this;
+ }
+
+ /**
+ * Sets the JSON mapper implementation to use for serialization/deserialization.
+ * @param jsonMapper the JSON mapper
+ * @return this builder
+ */
+ public Builder jsonMapper(McpJsonMapper jsonMapper) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ this.jsonMapper = jsonMapper;
+ return this;
+ }
+
+ /**
+ * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
+ * executing them.
+ *
+ * This overrides the customizer from
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}.
+ *
+ * Do NOT use a blocking {@link McpSyncHttpClientRequestCustomizer} in a
+ * non-blocking context. Use
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}
+ * instead.
+ * @param syncHttpRequestCustomizer the request customizer
+ * @return this builder
+ */
+ public Builder httpRequestCustomizer(McpSyncHttpClientRequestCustomizer syncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.fromSync(syncHttpRequestCustomizer);
+ return this;
+ }
+
+ /**
+ * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
+ * executing them.
+ *
+ * This overrides the customizer from
+ * {@link #httpRequestCustomizer(McpSyncHttpClientRequestCustomizer)}.
+ *
+ * Do NOT use a blocking implementation in a non-blocking context.
+ * @param asyncHttpRequestCustomizer the request customizer
+ * @return this builder
+ */
+ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer asyncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = asyncHttpRequestCustomizer;
+ return this;
+ }
+
+ /**
+ * Sets the connection timeout for the HTTP client.
+ * @param connectTimeout the connection timeout duration
+ * @return this builder
+ */
+ public Builder connectTimeout(Duration connectTimeout) {
+ Assert.notNull(connectTimeout, "connectTimeout must not be null");
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ /**
+ * Builds a new {@link HttpClientSseClientTransport} instance.
+ * @return a new transport instance
+ */
+ public HttpClientSseClientTransport build() {
+ HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
+ return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint,
+ jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer);
+ }
+
+ }
+
+ @Override
+ public Mono connect(Function, Mono> handler) {
+ var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
+
+ return Mono.deferContextual(ctx -> {
+ var builder = requestBuilder.copy()
+ .uri(uri)
+ .header("Accept", "text/event-stream")
+ .header("Cache-Control", "no-cache")
+ .header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
+ .GET();
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
+ }).flatMap(requestBuilder -> Mono.create(sink -> {
+ Disposable connection = Flux.create(sseSink -> this.httpClient
+ .sendAsync(requestBuilder.build(),
+ responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
+ .exceptionallyCompose(e -> {
+ sseSink.error(e);
+ return CompletableFuture.failedFuture(e);
+ }))
+ .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
+ .flatMap(responseEvent -> {
+ if (isClosing) {
+ return Mono.empty();
+ }
+
+ int statusCode = responseEvent.responseInfo().statusCode();
+
+ if (statusCode >= 200 && statusCode < 300) {
+ try {
+ if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
+ String messageEndpointUri = responseEvent.sseEvent().data();
+ if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
+ sink.success();
+ return Flux.empty(); // No further processing needed
+ }
+ else {
+ sink.error(new RuntimeException("Failed to handle SSE endpoint event"));
+ }
+ }
+ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
+ JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper,
+ responseEvent.sseEvent().data());
+ sink.success();
+ return Flux.just(message);
+ }
+ else {
+ logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent());
+ sink.success();
+ }
+ }
+ catch (IOException e) {
+ sink.error(new McpTransportException("Error processing SSE event", e));
+ }
+ }
+ return Flux.error(
+ new RuntimeException("Failed to send message: " + responseEvent));
+
+ })
+ .flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
+ .onErrorComplete(t -> {
+ if (!isClosing) {
+ logger.warn("SSE stream observed an error", t);
+ sink.error(t);
+ }
+ return true;
+ })
+ .doFinally(s -> {
+ Disposable ref = this.sseSubscription.getAndSet(null);
+ if (ref != null && !ref.isDisposed()) {
+ ref.dispose();
+ }
+ })
+ .contextWrite(sink.contextView())
+ .subscribe();
+
+ this.sseSubscription.set(connection);
+ }));
+ }
+
+ /**
+ * Sends a JSON-RPC message to the server.
+ *
+ *
+ * This method waits for the message endpoint to be discovered before sending the
+ * message. The message is serialized to JSON and sent as an HTTP POST request.
+ * @param message the JSON-RPC message to send
+ * @return a Mono that completes when the message is sent
+ * @throws McpError if the message endpoint is not available or the wait times out
+ */
+ @Override
+ public Mono sendMessage(JSONRPCMessage message) {
+
+ return this.messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
+ if (isClosing) {
+ return Mono.empty();
+ }
+
+ return this.serializeMessage(message)
+ .flatMap(body -> sendHttpPost(messageEndpointUri, body).handle((response, sink) -> {
+ if (response.statusCode() != 200 && response.statusCode() != 201 && response.statusCode() != 202
+ && response.statusCode() != 206) {
+ sink.error(new RuntimeException("Sending message failed with a non-OK HTTP code: "
+ + response.statusCode() + " - " + response.body()));
+ }
+ else {
+ sink.next(response);
+ sink.complete();
+ }
+ }))
+ .doOnError(error -> {
+ if (!isClosing) {
+ logger.error("Error sending message: {}", error.getMessage());
+ }
+ });
+ }).then();
+
+ }
+
+ private Mono serializeMessage(final JSONRPCMessage message) {
+ return Mono.defer(() -> {
+ try {
+ return Mono.just(jsonMapper.writeValueAsString(message));
+ }
+ catch (IOException e) {
+ return Mono.error(new McpTransportException("Failed to serialize message", e));
+ }
+ });
+ }
+
+ private Mono> sendHttpPost(final String endpoint, final String body) {
+ final URI requestUri = Utils.resolveUri(baseUri, endpoint);
+ return Mono.deferContextual(ctx -> {
+ var builder = this.requestBuilder.copy()
+ .uri(requestUri)
+ .header(HttpHeaders.CONTENT_TYPE, "application/json")
+ .header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
+ .POST(HttpRequest.BodyPublishers.ofString(body));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body, transportContext));
+ }).flatMap(customizedBuilder -> {
+ var request = customizedBuilder.build();
+ return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
+ });
+ }
+
+ /**
+ * Gracefully closes the transport connection.
+ *
+ *
+ * Sets the closing flag and disposes of the SSE subscription. This prevents new
+ * messages from being sent and allows ongoing operations to complete.
+ * @return a Mono that completes when the closing process is initiated
+ */
+ @Override
+ public Mono closeGracefully() {
+ return Mono.fromRunnable(() -> {
+ isClosing = true;
+ Disposable subscription = sseSubscription.get();
+ if (subscription != null && !subscription.isDisposed()) {
+ subscription.dispose();
+ }
+ });
+ }
+
+ /**
+ * Unmarshal data to the specified type using the configured object mapper.
+ * @param data the data to unmarshal
+ * @param typeRef the type reference for the target type
+ * @param the target type
+ * @return the unmarshalled object
+ */
+ @Override
+ public T unmarshalFrom(Object data, TypeRef typeRef) {
+ return this.jsonMapper.convertValue(data, typeRef);
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
new file mode 100644
index 000000000..0a8dff363
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
@@ -0,0 +1,832 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandler;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import io.modelcontextprotocol.client.McpAsyncClient;
+import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
+import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
+import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
+import io.modelcontextprotocol.spec.HttpHeaders;
+import io.modelcontextprotocol.spec.McpClientTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpTransportException;
+import io.modelcontextprotocol.spec.McpTransportSession;
+import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
+import io.modelcontextprotocol.spec.McpTransportStream;
+import io.modelcontextprotocol.spec.ProtocolVersions;
+import io.modelcontextprotocol.util.Assert;
+import io.modelcontextprotocol.util.Utils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+/**
+ * An implementation of the Streamable HTTP protocol as defined by the
+ * 2025-03-26 version of the MCP specification.
+ *
+ *
+ * The transport is capable of resumability and reconnects. It reacts to transport-level
+ * session invalidation and will propagate {@link McpTransportSessionNotFoundException
+ * appropriate exceptions} to the higher level abstraction layer when needed in order to
+ * allow proper state management. The implementation handles servers that are stateful and
+ * provide session meta information, but can also communicate with stateless servers that
+ * do not provide a session identifier and do not support SSE streams.
+ *
+ *
+ * This implementation does not handle backwards compatibility with the "HTTP
+ * with SSE" transport. In order to communicate over the phased-out
+ * 2024-11-05 protocol, use {@link HttpClientSseClientTransport} or
+ * {@code WebFluxSseClientTransport}.
+ *
+ *
+ * @author Christian Tzolov
+ * @see Streamable
+ * HTTP transport specification
+ */
+public class HttpClientStreamableHttpTransport implements McpClientTransport {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpClientStreamableHttpTransport.class);
+
+ private static final String DEFAULT_ENDPOINT = "/mcp";
+
+ /**
+ * HTTP client for sending messages to the server. Uses HTTP POST over the message
+ * endpoint
+ */
+ private final HttpClient httpClient;
+
+ /** HTTP request builder for building requests to send messages to the server */
+ private final HttpRequest.Builder requestBuilder;
+
+ /**
+ * Event type for JSON-RPC messages received through the SSE connection. The server
+ * sends messages with this event type to transmit JSON-RPC protocol data.
+ */
+ private static final String MESSAGE_EVENT_TYPE = "message";
+
+ private static final String APPLICATION_JSON = "application/json";
+
+ private static final String TEXT_EVENT_STREAM = "text/event-stream";
+
+ public static int NOT_FOUND = 404;
+
+ public static int METHOD_NOT_ALLOWED = 405;
+
+ public static int BAD_REQUEST = 400;
+
+ private final McpJsonMapper jsonMapper;
+
+ private final URI baseUri;
+
+ private final String endpoint;
+
+ private final boolean openConnectionOnStartup;
+
+ private final boolean resumableStreams;
+
+ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
+
+ private final AtomicReference> activeSession = new AtomicReference<>();
+
+ private final AtomicReference, Mono>> handler = new AtomicReference<>();
+
+ private final AtomicReference> exceptionHandler = new AtomicReference<>();
+
+ private final List supportedProtocolVersions;
+
+ private final String latestSupportedProtocolVersion;
+
+ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
+ HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
+ boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
+ List supportedProtocolVersions) {
+ this.jsonMapper = jsonMapper;
+ this.httpClient = httpClient;
+ this.requestBuilder = requestBuilder;
+ this.baseUri = URI.create(baseUri);
+ this.endpoint = endpoint;
+ this.resumableStreams = resumableStreams;
+ this.openConnectionOnStartup = openConnectionOnStartup;
+ this.activeSession.set(createTransportSession());
+ this.httpRequestCustomizer = httpRequestCustomizer;
+ this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
+ this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream()
+ .sorted(Comparator.reverseOrder())
+ .findFirst()
+ .get();
+ }
+
+ @Override
+ public List protocolVersions() {
+ return supportedProtocolVersions;
+ }
+
+ public static Builder builder(String baseUri) {
+ return new Builder(baseUri);
+ }
+
+ @Override
+ public Mono connect(Function, Mono> handler) {
+ return Mono.deferContextual(ctx -> {
+ this.handler.set(handler);
+ if (this.openConnectionOnStartup) {
+ logger.debug("Eagerly opening connection on startup");
+ return this.reconnect(null).onErrorComplete(t -> {
+ logger.warn("Eager connect failed ", t);
+ return true;
+ }).then();
+ }
+ return Mono.empty();
+ });
+ }
+
+ private McpTransportSession createTransportSession() {
+ Function> onClose = sessionId -> sessionId == null ? Mono.empty()
+ : createDelete(sessionId);
+ return new DefaultMcpTransportSession(onClose);
+ }
+
+ private McpTransportSession createClosedSession(McpTransportSession existingSession) {
+ var existingSessionId = Optional.ofNullable(existingSession)
+ .filter(session -> !(session instanceof ClosedMcpTransportSession))
+ .flatMap(McpTransportSession::sessionId)
+ .orElse(null);
+ return new ClosedMcpTransportSession<>(existingSessionId);
+ }
+
+ private Publisher createDelete(String sessionId) {
+
+ var uri = Utils.resolveUri(this.baseUri, this.endpoint);
+ return Mono.deferContextual(ctx -> {
+ var builder = this.requestBuilder.copy()
+ .uri(uri)
+ .header("Cache-Control", "no-cache")
+ .header(HttpHeaders.MCP_SESSION_ID, sessionId)
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
+ .DELETE();
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext));
+ }).flatMap(requestBuilder -> {
+ var request = requestBuilder.build();
+ return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
+ }).then();
+ }
+
+ @Override
+ public void setExceptionHandler(Consumer handler) {
+ logger.debug("Exception handler registered");
+ this.exceptionHandler.set(handler);
+ }
+
+ private void handleException(Throwable t) {
+ logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
+ if (t instanceof McpTransportSessionNotFoundException) {
+ McpTransportSession> invalidSession = this.activeSession.getAndSet(createTransportSession());
+ logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
+ invalidSession.close();
+ }
+ Consumer handler = this.exceptionHandler.get();
+ if (handler != null) {
+ handler.accept(t);
+ }
+ }
+
+ @Override
+ public Mono closeGracefully() {
+ return Mono.defer(() -> {
+ logger.debug("Graceful close triggered");
+ McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
+ if (currentSession != null) {
+ return Mono.from(currentSession.closeGracefully());
+ }
+ return Mono.empty();
+ });
+ }
+
+ private Mono reconnect(McpTransportStream stream) {
+
+ return Mono.deferContextual(ctx -> {
+
+ if (stream != null) {
+ logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
+ }
+ else {
+ logger.debug("Reconnecting with no prior stream");
+ }
+
+ final AtomicReference disposableRef = new AtomicReference<>();
+ final McpTransportSession transportSession = this.activeSession.get();
+ var uri = Utils.resolveUri(this.baseUri, this.endpoint);
+
+ Disposable connection = Mono.deferContextual(connectionCtx -> {
+ HttpRequest.Builder requestBuilder = this.requestBuilder.copy();
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID,
+ transportSession.sessionId().get());
+ }
+
+ if (stream != null && stream.lastId().isPresent()) {
+ requestBuilder = requestBuilder.header(HttpHeaders.LAST_EVENT_ID, stream.lastId().get());
+ }
+
+ var builder = requestBuilder.uri(uri)
+ .header(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM)
+ .header("Cache-Control", "no-cache")
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
+ .GET();
+ var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
+ })
+ .flatMapMany(
+ requestBuilder -> Flux.create(
+ sseSink -> this.httpClient
+ .sendAsync(requestBuilder.build(),
+ responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
+ sseSink))
+ .whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ sseSink.error(throwable);
+ }
+ else {
+ logger.debug("SSE connection established successfully");
+ }
+ }))
+ .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
+ .flatMap(responseEvent -> {
+ int statusCode = responseEvent.responseInfo().statusCode();
+
+ if (statusCode >= 200 && statusCode < 300) {
+
+ if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
+ try {
+ // We don't support batching ATM and probably
+ // won't since the next version considers
+ // removing it.
+ McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(
+ this.jsonMapper, responseEvent.sseEvent().data());
+
+ Tuple2, Iterable> idWithMessages = Tuples
+ .of(Optional.ofNullable(responseEvent.sseEvent().id()),
+ List.of(message));
+
+ McpTransportStream sessionStream = stream != null ? stream
+ : new DefaultMcpTransportStream<>(this.resumableStreams,
+ this::reconnect);
+ logger.debug("Connected stream {}", sessionStream.streamId());
+
+ return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
+
+ }
+ catch (IOException ioException) {
+ return Flux.error(new McpTransportException(
+ "Error parsing JSON-RPC message: " + responseEvent, ioException));
+ }
+ }
+ else {
+ logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
+ return Flux.empty();
+ }
+ }
+ else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
+ logger
+ .debug("The server does not support SSE streams, using request-response mode.");
+ return Flux.empty();
+ }
+ else if (statusCode == NOT_FOUND) {
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id
+ // and the response is 404, we consider it a
+ // session not found error.
+ logger.debug("Session not found for session ID: {}",
+ transportSession.sessionId().get());
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionIdRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(
+ new McpTransportException("Server Not Found. Status code:" + statusCode
+ + ", response-event:" + responseEvent));
+ }
+ else if (statusCode == BAD_REQUEST) {
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id
+ // and thre response is 404, we consider it a
+ // session not found error.
+ String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionIdRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(
+ new McpTransportException("Bad Request. Status code:" + statusCode
+ + ", response-event:" + responseEvent));
+
+ }
+
+ return Flux.error(new McpTransportException(
+ "Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
+ }).flatMap(
+ jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
+ .onErrorMap(CompletionException.class, t -> t.getCause())
+ .onErrorComplete(t -> {
+ this.handleException(t);
+ return true;
+ })
+ .doFinally(s -> {
+ Disposable ref = disposableRef.getAndSet(null);
+ if (ref != null) {
+ transportSession.removeConnection(ref);
+ }
+ }))
+ .contextWrite(ctx)
+ .subscribe();
+
+ disposableRef.set(connection);
+ transportSession.addConnection(connection);
+ return Mono.just(connection);
+ });
+
+ }
+
+ private BodyHandler toSendMessageBodySubscriber(FluxSink sink) {
+
+ BodyHandler responseBodyHandler = responseInfo -> {
+
+ String contentType = responseInfo.headers().firstValue(HttpHeaders.CONTENT_TYPE).orElse("").toLowerCase();
+
+ if (contentType.contains(TEXT_EVENT_STREAM)) {
+ // For SSE streams, use line subscriber that returns Void
+ logger.debug("Received SSE stream response, using line subscriber");
+ return ResponseSubscribers.sseToBodySubscriber(responseInfo, sink);
+ }
+ else if (contentType.contains(APPLICATION_JSON)) {
+ // For JSON responses and others, use string subscriber
+ logger.debug("Received response, using string subscriber");
+ return ResponseSubscribers.aggregateBodySubscriber(responseInfo, sink);
+ }
+
+ logger.debug("Received Bodyless response, using discarding subscriber");
+ return ResponseSubscribers.bodilessBodySubscriber(responseInfo, sink);
+ };
+
+ return responseBodyHandler;
+
+ }
+
+ public String toString(McpSchema.JSONRPCMessage message) {
+ try {
+ return this.jsonMapper.writeValueAsString(message);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to serialize JSON-RPC message", e);
+ }
+ }
+
+ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
+ return Mono.create(deliveredSink -> {
+ logger.debug("Sending message {}", sentMessage);
+
+ final AtomicReference disposableRef = new AtomicReference<>();
+ final McpTransportSession transportSession = this.activeSession.get();
+
+ var uri = Utils.resolveUri(this.baseUri, this.endpoint);
+ String jsonBody = this.toString(sentMessage);
+
+ Disposable connection = Mono.deferContextual(ctx -> {
+ HttpRequest.Builder requestBuilder = this.requestBuilder.copy();
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID,
+ transportSession.sessionId().get());
+ }
+
+ var builder = requestBuilder.uri(uri)
+ .header(HttpHeaders.ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
+ .header(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON)
+ .header(HttpHeaders.CACHE_CONTROL, "no-cache")
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
+ .POST(HttpRequest.BodyPublishers.ofString(jsonBody));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono
+ .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext));
+ }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> {
+
+ // Create the async request with proper body subscriber selection
+ Mono.fromFuture(this.httpClient
+ .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink))
+ .whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ responseEventSink.error(throwable);
+ }
+ else {
+ logger.debug("SSE connection established successfully");
+ }
+ })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
+
+ })).flatMap(responseEvent -> {
+ if (transportSession.markInitialized(
+ responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
+ // Once we have a session, we try to open an async stream for
+ // the server to send notifications and requests out-of-band.
+
+ reconnect(null).contextWrite(deliveredSink.contextView()).subscribe();
+ }
+
+ String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
+
+ int statusCode = responseEvent.responseInfo().statusCode();
+
+ if (statusCode >= 200 && statusCode < 300) {
+
+ String contentType = responseEvent.responseInfo()
+ .headers()
+ .firstValue(HttpHeaders.CONTENT_TYPE)
+ .orElse("")
+ .toLowerCase();
+
+ String contentLength = responseEvent.responseInfo()
+ .headers()
+ .firstValue(HttpHeaders.CONTENT_LENGTH)
+ .orElse(null);
+
+ // For empty content or HTTP code 202 (ACCEPTED), assume success
+ if (contentType.isBlank() || "0".equals(contentLength) || statusCode == 202) {
+ // if (contentType.isBlank() || "0".equals(contentLength)) {
+ logger.debug("No body returned for POST in session {}", sessionRepresentation);
+ // No content type means no response body, so we can just
+ // return an empty stream
+ deliveredSink.success();
+ return Flux.empty();
+ }
+ else if (contentType.contains(TEXT_EVENT_STREAM)) {
+ return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent())
+ .flatMap(sseEvent -> {
+ try {
+ // We don't support batching ATM and probably
+ // won't
+ // since the
+ // next version considers removing it.
+ McpSchema.JSONRPCMessage message = McpSchema
+ .deserializeJsonRpcMessage(this.jsonMapper, sseEvent.data());
+
+ Tuple2, Iterable> idWithMessages = Tuples
+ .of(Optional.ofNullable(sseEvent.id()), List.of(message));
+
+ McpTransportStream sessionStream = new DefaultMcpTransportStream<>(
+ this.resumableStreams, this::reconnect);
+
+ logger.debug("Connected stream {}", sessionStream.streamId());
+
+ deliveredSink.success();
+
+ return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
+ }
+ catch (IOException ioException) {
+ return Flux.error(new McpTransportException(
+ "Error parsing JSON-RPC message: " + responseEvent, ioException));
+ }
+ });
+ }
+ else if (contentType.contains(APPLICATION_JSON)) {
+ deliveredSink.success();
+ String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
+ if (sentMessage instanceof McpSchema.JSONRPCNotification) {
+ logger.warn("Notification: {} received non-compliant response: {}", sentMessage,
+ Utils.hasText(data) ? data : "[empty]");
+ return Mono.empty();
+ }
+
+ try {
+ return Mono.just(McpSchema.deserializeJsonRpcMessage(jsonMapper, data));
+ }
+ catch (IOException e) {
+ return Mono.error(new McpTransportException(
+ "Error deserializing JSON-RPC message: " + responseEvent, e));
+ }
+ }
+ logger.warn("Unknown media type {} returned for POST in session {}", contentType,
+ sessionRepresentation);
+
+ return Flux.error(
+ new RuntimeException("Unknown media type returned: " + contentType));
+ }
+ else if (statusCode == NOT_FOUND) {
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id and the
+ // response is 404, we consider it a session not found error.
+ logger.debug("Session not found for session ID: {}", transportSession.sessionId().get());
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(new McpTransportException(
+ "Server Not Found. Status code:" + statusCode + ", response-event:" + responseEvent));
+ }
+ else if (statusCode == BAD_REQUEST) {
+ // Some implementations can return 400 when presented with a
+ // session id that it doesn't know about, so we will
+ // invalidate the session
+ // https://github.com/modelcontextprotocol/typescript-sdk/issues/389
+
+ if (transportSession != null && transportSession.sessionId().isPresent()) {
+ // only if the request was sent with a session id and the
+ // response is 404, we consider it a session not found error.
+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
+ "Session not found for session ID: " + sessionRepresentation);
+ return Flux.error(exception);
+ }
+ return Flux.error(new McpTransportException(
+ "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent));
+ }
+
+ return Flux.error(
+ new RuntimeException("Failed to send message: " + responseEvent));
+ })
+ .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
+ .onErrorMap(CompletionException.class, t -> t.getCause())
+ .onErrorComplete(t -> {
+ // handle the error first
+ this.handleException(t);
+ // inform the caller of sendMessage
+ deliveredSink.error(t);
+ return true;
+ })
+ .doFinally(s -> {
+ logger.debug("SendMessage finally: {}", s);
+ Disposable ref = disposableRef.getAndSet(null);
+ if (ref != null) {
+ transportSession.removeConnection(ref);
+ }
+ })
+ .contextWrite(deliveredSink.contextView())
+ .subscribe();
+
+ disposableRef.set(connection);
+ transportSession.addConnection(connection);
+ });
+ }
+
+ private static String sessionIdOrPlaceholder(McpTransportSession> transportSession) {
+ return transportSession.sessionId().orElse("[missing_session_id]");
+ }
+
+ @Override
+ public T unmarshalFrom(Object data, TypeRef typeRef) {
+ return this.jsonMapper.convertValue(data, typeRef);
+ }
+
+ /**
+ * Builder for {@link HttpClientStreamableHttpTransport}.
+ */
+ public static class Builder {
+
+ private final String baseUri;
+
+ private McpJsonMapper jsonMapper;
+
+ private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);
+
+ private String endpoint = DEFAULT_ENDPOINT;
+
+ private boolean resumableStreams = true;
+
+ private boolean openConnectionOnStartup = false;
+
+ private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
+
+ private McpAsyncHttpClientRequestCustomizer httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.NOOP;
+
+ private Duration connectTimeout = Duration.ofSeconds(10);
+
+ private List supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
+ ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18);
+
+ /**
+ * Creates a new builder with the specified base URI.
+ * @param baseUri the base URI of the MCP server
+ */
+ private Builder(String baseUri) {
+ Assert.hasText(baseUri, "baseUri must not be empty");
+ this.baseUri = baseUri;
+ }
+
+ /**
+ * Sets the HTTP client builder.
+ * @param clientBuilder the HTTP client builder
+ * @return this builder
+ */
+ public Builder clientBuilder(HttpClient.Builder clientBuilder) {
+ Assert.notNull(clientBuilder, "clientBuilder must not be null");
+ this.clientBuilder = clientBuilder;
+ return this;
+ }
+
+ /**
+ * Customizes the HTTP client builder.
+ * @param clientCustomizer the consumer to customize the HTTP client builder
+ * @return this builder
+ */
+ public Builder customizeClient(final Consumer clientCustomizer) {
+ Assert.notNull(clientCustomizer, "clientCustomizer must not be null");
+ clientCustomizer.accept(clientBuilder);
+ return this;
+ }
+
+ /**
+ * Sets the HTTP request builder.
+ * @param requestBuilder the HTTP request builder
+ * @return this builder
+ */
+ public Builder requestBuilder(HttpRequest.Builder requestBuilder) {
+ Assert.notNull(requestBuilder, "requestBuilder must not be null");
+ this.requestBuilder = requestBuilder;
+ return this;
+ }
+
+ /**
+ * Customizes the HTTP client builder.
+ * @param requestCustomizer the consumer to customize the HTTP request builder
+ * @return this builder
+ */
+ public Builder customizeRequest(final Consumer requestCustomizer) {
+ Assert.notNull(requestCustomizer, "requestCustomizer must not be null");
+ requestCustomizer.accept(requestBuilder);
+ return this;
+ }
+
+ /**
+ * Configure a custom {@link McpJsonMapper} implementation to use.
+ * @param jsonMapper instance to use
+ * @return the builder instance
+ */
+ public Builder jsonMapper(McpJsonMapper jsonMapper) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ this.jsonMapper = jsonMapper;
+ return this;
+ }
+
+ /**
+ * Configure the endpoint to make HTTP requests against.
+ * @param endpoint endpoint to use
+ * @return the builder instance
+ */
+ public Builder endpoint(String endpoint) {
+ Assert.hasText(endpoint, "endpoint must be a non-empty String");
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ /**
+ * Configure whether to use the stream resumability feature by keeping track of
+ * SSE event ids.
+ * @param resumableStreams if {@code true} event ids will be tracked and upon
+ * disconnection, the last seen id will be used upon reconnection as a header to
+ * resume consuming messages.
+ * @return the builder instance
+ */
+ public Builder resumableStreams(boolean resumableStreams) {
+ this.resumableStreams = resumableStreams;
+ return this;
+ }
+
+ /**
+ * Configure whether the client should open an SSE connection upon startup. Not
+ * all servers support this (although it is in theory possible with the current
+ * specification), so use with caution. By default, this value is {@code false}.
+ * @param openConnectionOnStartup if {@code true} the {@link #connect(Function)}
+ * method call will try to open an SSE connection before sending any JSON-RPC
+ * request
+ * @return the builder instance
+ */
+ public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
+ this.openConnectionOnStartup = openConnectionOnStartup;
+ return this;
+ }
+
+ /**
+ * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
+ * executing them.
+ *
+ * This overrides the customizer from
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}.
+ *
+ * Do NOT use a blocking {@link McpSyncHttpClientRequestCustomizer} in a
+ * non-blocking context. Use
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}
+ * instead.
+ * @param syncHttpRequestCustomizer the request customizer
+ * @return this builder
+ */
+ public Builder httpRequestCustomizer(McpSyncHttpClientRequestCustomizer syncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.fromSync(syncHttpRequestCustomizer);
+ return this;
+ }
+
+ /**
+ * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
+ * executing them.
+ *
+ * This overrides the customizer from
+ * {@link #httpRequestCustomizer(McpSyncHttpClientRequestCustomizer)}.
+ *
+ * Do NOT use a blocking implementation in a non-blocking context.
+ * @param asyncHttpRequestCustomizer the request customizer
+ * @return this builder
+ */
+ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer asyncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = asyncHttpRequestCustomizer;
+ return this;
+ }
+
+ /**
+ * Sets the connection timeout for the HTTP client.
+ * @param connectTimeout the connection timeout duration
+ * @return this builder
+ */
+ public Builder connectTimeout(Duration connectTimeout) {
+ Assert.notNull(connectTimeout, "connectTimeout must not be null");
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ /**
+ * Sets the list of supported protocol versions used in version negotiation. By
+ * default, the client will send the latest of those versions in the
+ * {@code MCP-Protocol-Version} header.
+ *
+ * Setting this value only updates the values used in version negotiation, and
+ * does NOT impact the actual capabilities of the transport. It should only be
+ * used for compatibility with servers having strict requirements around the
+ * {@code MCP-Protocol-Version} header.
+ * @param supportedProtocolVersions protocol versions supported by this transport
+ * @return this builder
+ * @see version
+ * negotiation specification
+ * @see Protocol
+ * Version Header
+ */
+ public Builder supportedProtocolVersions(List supportedProtocolVersions) {
+ Assert.notEmpty(supportedProtocolVersions, "supportedProtocolVersions must not be empty");
+ this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
+ return this;
+ }
+
+ /**
+ * Construct a fresh instance of {@link HttpClientStreamableHttpTransport} using
+ * the current builder configuration.
+ * @return a new instance of {@link HttpClientStreamableHttpTransport}
+ */
+ public HttpClientStreamableHttpTransport build() {
+ HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
+ return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper,
+ httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
+ httpRequestCustomizer, supportedProtocolVersions);
+ }
+
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
new file mode 100644
index 000000000..29dc23c35
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
@@ -0,0 +1,327 @@
+/*
+* Copyright 2024 - 2024 the original author or authors.
+*/
+
+package io.modelcontextprotocol.client.transport;
+
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodySubscriber;
+import java.net.http.HttpResponse.ResponseInfo;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import org.reactivestreams.FlowAdapters;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.modelcontextprotocol.spec.McpTransportException;
+import reactor.core.publisher.BaseSubscriber;
+import reactor.core.publisher.FluxSink;
+
+/**
+ * Utility class providing various {@link BodySubscriber} implementations for handling
+ * different types of HTTP response bodies in the context of Model Context Protocol (MCP)
+ * clients.
+ *
+ *
+ * Defines subscribers for processing Server-Sent Events (SSE), aggregate responses, and
+ * bodiless responses.
+ *
+ * @author Christian Tzolov
+ * @author Dariusz Jędrzejczyk
+ */
+class ResponseSubscribers {
+
+ private static final Logger logger = LoggerFactory.getLogger(ResponseSubscribers.class);
+
+ record SseEvent(String id, String event, String data) {
+ }
+
+ sealed interface ResponseEvent permits SseResponseEvent, AggregateResponseEvent, DummyEvent {
+
+ ResponseInfo responseInfo();
+
+ }
+
+ record DummyEvent(ResponseInfo responseInfo) implements ResponseEvent {
+
+ }
+
+ record SseResponseEvent(ResponseInfo responseInfo, SseEvent sseEvent) implements ResponseEvent {
+ }
+
+ record AggregateResponseEvent(ResponseInfo responseInfo, String data) implements ResponseEvent {
+ }
+
+ static BodySubscriber sseToBodySubscriber(ResponseInfo responseInfo, FluxSink sink) {
+ return HttpResponse.BodySubscribers
+ .fromLineSubscriber(FlowAdapters.toFlowSubscriber(new SseLineSubscriber(responseInfo, sink)));
+ }
+
+ static BodySubscriber aggregateBodySubscriber(ResponseInfo responseInfo, FluxSink sink) {
+ return HttpResponse.BodySubscribers
+ .fromLineSubscriber(FlowAdapters.toFlowSubscriber(new AggregateSubscriber(responseInfo, sink)));
+ }
+
+ static BodySubscriber bodilessBodySubscriber(ResponseInfo responseInfo, FluxSink sink) {
+ return HttpResponse.BodySubscribers
+ .fromLineSubscriber(FlowAdapters.toFlowSubscriber(new BodilessResponseLineSubscriber(responseInfo, sink)));
+ }
+
+ static class SseLineSubscriber extends BaseSubscriber {
+
+ /**
+ * Pattern to extract data content from SSE "data:" lines.
+ */
+ private static final Pattern EVENT_DATA_PATTERN = Pattern.compile("^data:(.+)$", Pattern.MULTILINE);
+
+ /**
+ * Pattern to extract event ID from SSE "id:" lines.
+ */
+ private static final Pattern EVENT_ID_PATTERN = Pattern.compile("^id:(.+)$", Pattern.MULTILINE);
+
+ /**
+ * Pattern to extract event type from SSE "event:" lines.
+ */
+ private static final Pattern EVENT_TYPE_PATTERN = Pattern.compile("^event:(.+)$", Pattern.MULTILINE);
+
+ /**
+ * The sink for emitting parsed response events.
+ */
+ private final FluxSink sink;
+
+ /**
+ * StringBuilder for accumulating multi-line event data.
+ */
+ private final StringBuilder eventBuilder;
+
+ /**
+ * Current event's ID, if specified.
+ */
+ private final AtomicReference currentEventId;
+
+ /**
+ * Current event's type, if specified.
+ */
+ private final AtomicReference currentEventType;
+
+ /**
+ * The response information from the HTTP response. Send with each event to
+ * provide context.
+ */
+ private ResponseInfo responseInfo;
+
+ /**
+ * Creates a new LineSubscriber that will emit parsed SSE events to the provided
+ * sink.
+ * @param sink the {@link FluxSink} to emit parsed {@link ResponseEvent} objects
+ * to
+ */
+ public SseLineSubscriber(ResponseInfo responseInfo, FluxSink sink) {
+ this.sink = sink;
+ this.eventBuilder = new StringBuilder();
+ this.currentEventId = new AtomicReference<>();
+ this.currentEventType = new AtomicReference<>();
+ this.responseInfo = responseInfo;
+ }
+
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+
+ sink.onRequest(n -> {
+ subscription.request(n);
+ });
+
+ // Register disposal callback to cancel subscription when Flux is disposed
+ sink.onDispose(() -> {
+ subscription.cancel();
+ });
+ }
+
+ @Override
+ protected void hookOnNext(String line) {
+ if (line.isEmpty()) {
+ // Empty line means end of event
+ if (this.eventBuilder.length() > 0) {
+ String eventData = this.eventBuilder.toString();
+ SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
+
+ this.sink.next(new SseResponseEvent(responseInfo, sseEvent));
+ this.eventBuilder.setLength(0);
+ }
+ }
+ else {
+ if (line.startsWith("data:")) {
+ var matcher = EVENT_DATA_PATTERN.matcher(line);
+ if (matcher.find()) {
+ this.eventBuilder.append(matcher.group(1).trim()).append("\n");
+ }
+ upstream().request(1);
+ }
+ else if (line.startsWith("id:")) {
+ var matcher = EVENT_ID_PATTERN.matcher(line);
+ if (matcher.find()) {
+ this.currentEventId.set(matcher.group(1).trim());
+ }
+ upstream().request(1);
+ }
+ else if (line.startsWith("event:")) {
+ var matcher = EVENT_TYPE_PATTERN.matcher(line);
+ if (matcher.find()) {
+ this.currentEventType.set(matcher.group(1).trim());
+ }
+ upstream().request(1);
+ }
+ else if (line.startsWith(":")) {
+ // Ignore comment lines starting with ":"
+ // This is a no-op, just to skip comments
+ logger.debug("Ignoring comment line: {}", line);
+ upstream().request(1);
+ }
+ else {
+ // If the response is not successful, emit an error
+ this.sink.error(new McpTransportException(
+ "Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line));
+
+ }
+ }
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ if (this.eventBuilder.length() > 0) {
+ String eventData = this.eventBuilder.toString();
+ SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
+ this.sink.next(new SseResponseEvent(responseInfo, sseEvent));
+ }
+ this.sink.complete();
+ }
+
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ this.sink.error(throwable);
+ }
+
+ }
+
+ static class AggregateSubscriber extends BaseSubscriber {
+
+ /**
+ * The sink for emitting parsed response events.
+ */
+ private final FluxSink