diff --git a/README.md b/README.md
index 436104c63..7bda15006 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,9 @@
# MCP Java SDK
+[](https://opensource.org/license/MIT)
[](https://github.com/modelcontextprotocol/java-sdk/actions/workflows/publish-snapshot.yml)
+[](https://central.sonatype.com/artifact/io.modelcontextprotocol.sdk/mcp)
+[](https://www.oracle.com/java/technologies/javase/jdk17-archive-downloads.html)
+
A set of projects that provide Java SDK integration for the [Model Context Protocol](https://modelcontextprotocol.org/docs/concepts/architecture).
This SDK enables Java applications to interact with AI models and tools through a standardized interface, supporting both synchronous and asynchronous communication patterns.
@@ -43,6 +47,7 @@ Please follow the [Contributing Guidelines](CONTRIBUTING.md).
- Christian Tzolov
- Dariusz Jędrzejczyk
+- Daniel Garnier-Moiroux
## Links
@@ -50,6 +55,133 @@ Please follow the [Contributing Guidelines](CONTRIBUTING.md).
- [Issue Tracker](https://github.com/modelcontextprotocol/java-sdk/issues)
- [CI/CD](https://github.com/modelcontextprotocol/java-sdk/actions)
+## Architecture and Design Decisions
+
+### Introduction
+
+Building a general-purpose MCP Java SDK requires making technology decisions in areas where the JDK provides limited or no support. The Java ecosystem is powerful but fragmented: multiple valid approaches exist, each with strong communities.
+Our goal is not to prescribe "the one true way," but to provide a reference implementation of the MCP specification that is:
+
+* **Pragmatic** – makes developers productive quickly
+* **Interoperable** – aligns with widely used libraries and practices
+* **Pluggable** – allows alternatives where projects prefer different stacks
+* **Grounded in team familiarity** – we chose technologies the team can be productive with today, while remaining open to community contributions that broaden the SDK
+
+### Key Choices and Considerations
+
+The SDK had to make decisions in the following areas:
+
+1. **JSON serialization** – mapping between JSON and Java types
+
+2. **Programming model** – supporting asynchronous processing, cancellation, and streaming while staying simple for blocking use cases
+
+3. **Observability** – logging and enabling integration with metrics/tracing
+
+4. **Remote clients and servers** – supporting both consuming MCP servers (client transport) and exposing MCP endpoints (server transport with authorization)
+
+The following sections explain what we chose, why it made sense, and how the choices align with the SDK's goals.
+
+### 1. JSON Serialization
+
+* **SDK Choice**: Jackson for JSON serialization and deserialization, behind an SDK abstraction (`mcp-json`)
+
+* **Why**: Jackson is widely adopted across the Java ecosystem, provides strong performance and a mature annotation model, and is familiar to the SDK team and many potential contributors.
+
+* **How we expose it**: Public APIs use a zero-dependency abstraction (`mcp-json`). Jackson is shipped as the default implementation (`mcp-jackson2`), but alternatives can be plugged in.
+
+* **How it fits the SDK**: This offers a pragmatic default while keeping flexibility for projects that prefer different JSON libraries.
+
+### 2. Programming Model
+
+* **SDK Choice**: Reactive Streams for public APIs, with Project Reactor as the internal implementation and a synchronous facade for blocking use cases
+
+* **Why**: MCP builds on JSON-RPC's asynchronous nature and defines a bidirectional protocol on top of it, enabling asynchronous and streaming interactions. MCP explicitly supports:
+
+ * Multiple in-flight requests and responses
+ * Notifications that do not expect a reply
+ * STDIO transports for inter-process communication using pipes
+ * Streaming transports such as Server-Sent Events and Streamable HTTP
+
+ These requirements call for a programming model more powerful than single-result futures like `CompletableFuture`.
+
+ * **Reactive Streams: the Community Standard**
+
+ Reactive Streams is a small Java specification that standardizes asynchronous stream processing with backpressure. It defines four minimal interfaces (Publisher, Subscriber, Subscription, and Processor). These interfaces are widely recognized as the standard contract for async, non-blocking pipelines in Java.
+
+ * **Reactive Streams Implementation**
+
+ The SDK uses Project Reactor as its implementation of the Reactive Streams specification. Reactor is mature, widely adopted, provides rich operators, and integrates well with observability through context propagation. Team familiarity also allowed us to deliver a solid foundation quickly.
+ We plan to convert the public API to only expose Reactive Streams interfaces. By defining the public API in terms of Reactive Streams interfaces and using Reactor internally, the SDK stays standards-based while benefiting from a practical, production-ready implementation.
+
+ * **Synchronous Facade in the SDK**
+
+ Not all MCP use cases require streaming pipelines. Many scenarios are as simple as "send a request and block until I get the result."
+ To support this, the SDK provides a synchronous facade layered on top of the reactive core. Developers can stay in a blocking model when it's enough, while still having access to asynchronous streaming when needed.
+
+* **How it fits the SDK**: This design balances scalability, approachability, and future evolution such as Virtual Threads and Structured Concurrency in upcoming JDKs.
+
+### 3. Observability
+
+* **SDK Choice**: SLF4J for logging; Reactor Context for observability propagation
+
+* **Why**: SLF4J is the de facto logging facade in Java, with broad compatibility. Reactor Context enables propagation of observability data such as correlation IDs and tracing state across async boundaries. This ensures interoperability with modern observability frameworks.
+
+* **How we expose it**: Public APIs log through SLF4J only, with no backend included. Observability metadata flows through Reactor pipelines. The SDK itself does not ship metrics or tracing implementations.
+
+* **How it fits the SDK**: This provides reliable logging by default and seamless integration with Micrometer, OpenTelemetry, or similar systems for metrics and tracing.
+
+### 4. Remote MCP Clients and Servers
+
+MCP supports both clients (applications consuming MCP servers) and servers (applications exposing MCP endpoints). The SDK provides support for both sides.
+
+#### Client Transport in the SDK
+
+* **SDK Choice**: JDK HttpClient (Java 11+) as the default client, with optional Spring WebClient support
+
+* **Why**: The JDK HttpClient is built-in, portable, and supports streaming responses. This keeps the default lightweight with no extra dependencies. Spring WebClient support is available for Spring-based projects.
+
+* **How we expose it**: MCP Client APIs are transport-agnostic. The core module ships with JDK HttpClient transport. A Spring module provides WebClient integration.
+
+* **How it fits the SDK**: This ensures all applications can talk to MCP servers out of the box, while allowing richer integration in Spring and other environments.
+
+#### Server Transport in the SDK
+
+* **SDK Choice**: Jakarta Servlet implementation in core, with optional Spring WebFlux and Spring WebMVC providers
+
+* **Why**: Servlet is the most widely deployed Java server API. WebFlux and WebMVC cover a significant part of the Spring community. Together these provide reach across blocking and non-blocking models.
+
+* **How we expose it**: Server APIs are transport-agnostic. Core includes Servlet support. Spring modules extend support for WebFlux and WebMVC.
+
+* **How it fits the SDK**: This allows developers to expose MCP servers in the most common Java environments today, while enabling other transport implementations such as Netty, Vert.x, or Helidon.
+
+#### Authorization in the SDK
+
+* **SDK Choice**: Pluggable authorization hooks for MCP servers; no built-in implementation
+
+* **Why**: MCP servers must restrict access to authenticated and authorized clients. Authorization needs differ across environments such as Spring Security, MicroProfile JWT, or custom solutions. Providing hooks avoids lock-in and leverages proven libraries.
+
+* **How we expose it**: Authorization is integrated into the server transport layer. The SDK does not include its own authorization system.
+
+* **How it fits the SDK**: This keeps server-side security ecosystem-neutral, while ensuring applications can plug in their preferred authorization strategy.
+
+### Project Structure of the SDK
+
+The SDK is organized into modules to separate concerns and allow adopters to bring in only what they need:
+* `mcp-bom` – Dependency versions
+* `mcp-core` – Reference implementation (STDIO, JDK HttpClient, Servlet)
+* `mcp-json` – JSON abstraction
+* `mcp-jackson2` – Jackson implementation of JSON binding
+* `mcp` – Convenience bundle (core + Jackson)
+* `mcp-test` – Shared testing utilities
+* `mcp-spring` – Spring integrations (WebClient, WebFlux, WebMVC)
+
+For example, a minimal adopter may depend only on `mcp` (core + Jackson), while a Spring-based application can use `mcp-spring` for deeper framework integration.
+
+### Future Directions
+
+The SDK is designed to evolve with the Java ecosystem. Areas we are actively watching include:
+Concurrency in the JDK – Virtual Threads and Structured Concurrency may simplify the synchronous API story
+
## License
This project is licensed under the [MIT License](LICENSE).
diff --git a/mcp-bom/pom.xml b/mcp-bom/pom.xml
index 83d8bc510..447c9e0bd 100644
--- a/mcp-bom/pom.xml
+++ b/mcp-bom/pom.xml
@@ -7,7 +7,7 @@
io.modelcontextprotocol.sdkmcp-parent
- 0.12.0-SNAPSHOT
+ 0.18.0-SNAPSHOTmcp-bom
@@ -27,12 +27,33 @@
+
+ io.modelcontextprotocol.sdk
+ mcp-core
+ ${project.version}
+
+
+
io.modelcontextprotocol.sdkmcp${project.version}
+
+
+ io.modelcontextprotocol.sdk
+ mcp-json
+ ${project.version}
+
+
+
+
+ io.modelcontextprotocol.sdk
+ mcp-json-jackson2
+ ${project.version}
+
+
io.modelcontextprotocol.sdk
diff --git a/mcp-core/pom.xml b/mcp-core/pom.xml
new file mode 100644
index 000000000..9e23ffd79
--- /dev/null
+++ b/mcp-core/pom.xml
@@ -0,0 +1,235 @@
+
+
+ 4.0.0
+
+ io.modelcontextprotocol.sdk
+ mcp-parent
+ 0.18.0-SNAPSHOT
+
+ mcp-core
+ jar
+ Java MCP SDK Core
+ Core classes of the Java SDK implementation of the Model Context Protocol, enabling seamless integration with language models and AI tools
+ https://github.com/modelcontextprotocol/java-sdk
+
+
+ https://github.com/modelcontextprotocol/java-sdk
+ git://github.com/modelcontextprotocol/java-sdk.git
+ git@github.com/modelcontextprotocol/java-sdk.git
+
+
+
+
+
+ biz.aQute.bnd
+ bnd-maven-plugin
+ ${bnd-maven-plugin.version}
+
+
+ bnd-process
+
+ bnd-process
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ ${project.build.outputDirectory}/META-INF/MANIFEST.MF
+
+
+
+
+
+
+
+
+ io.modelcontextprotocol.sdk
+ mcp-json
+ 0.18.0-SNAPSHOT
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j-api.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${jackson.version}
+
+
+
+ io.projectreactor
+ reactor-core
+
+
+
+
+
+ jakarta.servlet
+ jakarta.servlet-api
+ ${jakarta.servlet.version}
+ provided
+
+
+
+
+ io.modelcontextprotocol.sdk
+ mcp-json-jackson2
+ 0.18.0-SNAPSHOT
+ test
+
+
+
+ org.springframework
+ spring-webmvc
+ ${springframework.version}
+ test
+
+
+
+
+ io.projectreactor.netty
+ reactor-netty-http
+ test
+
+
+
+
+ org.springframework
+ spring-context
+ ${springframework.version}
+ test
+
+
+
+ org.springframework
+ spring-test
+ ${springframework.version}
+ test
+
+
+
+ org.assertj
+ assertj-core
+ ${assert4j.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${junit.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+
+
+ net.bytebuddy
+ byte-buddy
+ ${byte-buddy.version}
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+ test
+
+
+
+ net.javacrumbs.json-unit
+ json-unit-assertj
+ ${json-unit-assertj.version}
+ test
+
+
+
+
+ org.apache.tomcat.embed
+ tomcat-embed-core
+ ${tomcat.version}
+ test
+
+
+ org.apache.tomcat.embed
+ tomcat-embed-websocket
+ ${tomcat.version}
+ test
+
+
+
+ org.testcontainers
+ toxiproxy
+ ${toxiproxy.version}
+ test
+
+
+
+
+ com.google.code.gson
+ gson
+ 2.10.1
+ test
+
+
+
+
+
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java
similarity index 89%
rename from mcp/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java
index 2cc1c5dba..07d86f40e 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java
@@ -11,14 +11,13 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.modelcontextprotocol.spec.McpClientSession;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.util.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.ContextView;
@@ -99,21 +98,30 @@ class LifecycleInitializer {
*/
private final Duration initializationTimeout;
+ /**
+ * Post-initialization hook to perform additional operations after every successful
+ * initialization.
+ */
+ private final Function> postInitializationHook;
+
public LifecycleInitializer(McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo,
List protocolVersions, Duration initializationTimeout,
- Function sessionSupplier) {
+ Function sessionSupplier,
+ Function> postInitializationHook) {
Assert.notNull(sessionSupplier, "Session supplier must not be null");
Assert.notNull(clientCapabilities, "Client capabilities must not be null");
Assert.notNull(clientInfo, "Client info must not be null");
Assert.notEmpty(protocolVersions, "Protocol versions must not be empty");
Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
+ Assert.notNull(postInitializationHook, "Post-initialization hook must not be null");
this.sessionSupplier = sessionSupplier;
this.clientCapabilities = clientCapabilities;
this.clientInfo = clientInfo;
this.protocolVersions = Collections.unmodifiableList(new ArrayList<>(protocolVersions));
this.initializationTimeout = initializationTimeout;
+ this.postInitializationHook = postInitializationHook;
}
/**
@@ -148,10 +156,6 @@ interface Initialization {
}
- /**
- * Default implementation of the {@link Initialization} interface that manages the MCP
- * client initialization process.
- */
private static class DefaultInitialization implements Initialization {
/**
@@ -199,29 +203,20 @@ private void setMcpClientSession(McpClientSession mcpClientSession) {
this.mcpClientSession.set(mcpClientSession);
}
- /**
- * Returns a Mono that completes when the MCP client initialization is complete.
- * This allows subscribers to wait for the initialization to finish before
- * proceeding with further operations.
- * @return A Mono that emits the result of the MCP initialization process
- */
private Mono await() {
return this.initSink.asMono();
}
- /**
- * Completes the initialization process with the given result. It caches the
- * result and emits it to all subscribers waiting for the initialization to
- * complete.
- * @param initializeResult The result of the MCP initialization process
- */
private void complete(McpSchema.InitializeResult initializeResult) {
- // first ensure the result is cached
- this.result.set(initializeResult);
// inform all the subscribers waiting for the initialization
this.initSink.emitValue(initializeResult, Sinks.EmitFailureHandler.FAIL_FAST);
}
+ private void cacheResult(McpSchema.InitializeResult initializeResult) {
+ // first ensure the result is cached
+ this.result.set(initializeResult);
+ }
+
private void error(Throwable t) {
this.initSink.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST);
}
@@ -263,7 +258,7 @@ public void handleException(Throwable t) {
}
// Providing an empty operation since we are only interested in triggering
// the implicit initialization step.
- withIntitialization("re-initializing", result -> Mono.empty()).subscribe();
+ this.withInitialization("re-initializing", result -> Mono.empty()).subscribe();
}
}
@@ -275,7 +270,7 @@ public void handleException(Throwable t) {
* @param operation The operation to execute when the client is initialized
* @return A Mono that completes with the result of the operation
*/
- public Mono withIntitialization(String actionName, Function> operation) {
+ public Mono withInitialization(String actionName, Function> operation) {
return Mono.deferContextual(ctx -> {
DefaultInitialization newInit = new DefaultInitialization();
DefaultInitialization previous = this.initializationRef.compareAndExchange(null, newInit);
@@ -283,19 +278,24 @@ public Mono withIntitialization(String actionName, Function initializationJob = needsToInitialize ? doInitialize(newInit, ctx)
- : previous.await();
+ Mono initializationJob = needsToInitialize
+ ? this.doInitialize(newInit, this.postInitializationHook, ctx) : previous.await();
return initializationJob.map(initializeResult -> this.initializationRef.get())
.timeout(this.initializationTimeout)
.onErrorResume(ex -> {
+ this.initializationRef.compareAndSet(newInit, null);
return Mono.error(new RuntimeException("Client failed to initialize " + actionName, ex));
})
- .flatMap(operation);
+ .flatMap(res -> operation.apply(res)
+ .contextWrite(c -> c.put(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ res.initializeResult().protocolVersion())));
});
}
- private Mono doInitialize(DefaultInitialization initialization, ContextView ctx) {
+ private Mono doInitialize(DefaultInitialization initialization,
+ Function> postInitOperation, ContextView ctx) {
+
initialization.setMcpClientSession(this.sessionSupplier.apply(ctx));
McpClientSession mcpClientSession = initialization.mcpSession();
@@ -321,7 +321,12 @@ private Mono doInitialize(DefaultInitialization init
}
return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
+ .contextWrite(
+ c -> c.put(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, initializeResult.protocolVersion()))
.thenReturn(initializeResult);
+ }).flatMap(initializeResult -> {
+ initialization.cacheResult(initializeResult);
+ return postInitOperation.apply(initialization).thenReturn(initializeResult);
}).doOnNext(initialization::complete).onErrorResume(ex -> {
initialization.error(ex);
return Mono.error(ex);
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
similarity index 82%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
index eb6d42f68..e6a09cd08 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java
@@ -15,14 +15,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-
+import io.modelcontextprotocol.client.LifecycleInitializer.Initialization;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientSession;
+import io.modelcontextprotocol.spec.McpClientSession.NotificationHandler;
+import io.modelcontextprotocol.spec.McpClientSession.RequestHandler;
import io.modelcontextprotocol.spec.McpClientTransport;
-import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
@@ -36,10 +35,10 @@
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.PaginatedRequest;
import io.modelcontextprotocol.spec.McpSchema.Root;
-import io.modelcontextprotocol.spec.McpClientSession.NotificationHandler;
-import io.modelcontextprotocol.spec.McpClientSession.RequestHandler;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -76,6 +75,7 @@
* @author Dariusz Jędrzejczyk
* @author Christian Tzolov
* @author Jihoon Kim
+ * @author Anurag Pant
* @see McpClient
* @see McpSchema
* @see McpClientSession
@@ -85,27 +85,29 @@ public class McpAsyncClient {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClient.class);
- private static final TypeReference VOID_TYPE_REFERENCE = new TypeReference<>() {
+ private static final TypeRef VOID_TYPE_REFERENCE = new TypeRef<>() {
};
- public static final TypeReference
*/
public Mono initialize() {
- return this.initializer.withIntitialization("by explicit API call", init -> Mono.just(init.initializeResult()));
+ return this.initializer.withInitialization("by explicit API call", init -> Mono.just(init.initializeResult()));
}
// --------------------------
@@ -400,13 +444,14 @@ public Mono initialize() {
* @return A Mono that completes with the server's ping response
*/
public Mono ping() {
- return this.initializer.withIntitialization("pinging the server",
+ return this.initializer.withInitialization("pinging the server",
init -> init.mcpSession().sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF));
}
// --------------------------
// Roots
// --------------------------
+
/**
* Adds a new root to the client's root list.
* @param root The root to add.
@@ -481,7 +526,7 @@ public Mono removeRoot(String rootUri) {
* @return A Mono that completes when the notification is sent.
*/
public Mono rootsListChangedNotification() {
- return this.initializer.withIntitialization("sending roots list changed notification",
+ return this.initializer.withInitialization("sending roots list changed notification",
init -> init.mcpSession().sendNotification(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED));
}
@@ -512,7 +557,7 @@ private RequestHandler samplingCreateMessageHandler() {
// --------------------------
private RequestHandler elicitationCreateHandler() {
return params -> {
- ElicitRequest request = transport.unmarshalFrom(params, new TypeReference<>() {
+ ElicitRequest request = transport.unmarshalFrom(params, new TypeRef<>() {
});
return this.elicitationHandler.apply(request);
@@ -522,10 +567,10 @@ private RequestHandler elicitationCreateHandler() {
// --------------------------
// Tools
// --------------------------
- private static final TypeReference CALL_TOOL_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef CALL_TOOL_RESULT_TYPE_REF = new TypeRef<>() {
};
- private static final TypeReference LIST_TOOLS_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef LIST_TOOLS_RESULT_TYPE_REF = new TypeRef<>() {
};
/**
@@ -540,27 +585,57 @@ private RequestHandler elicitationCreateHandler() {
* @see #listTools()
*/
public Mono callTool(McpSchema.CallToolRequest callToolRequest) {
- return this.initializer.withIntitialization("calling tools", init -> {
+ return this.initializer.withInitialization("calling tool", init -> {
if (init.initializeResult().capabilities().tools() == null) {
return Mono.error(new IllegalStateException("Server does not provide tools capability"));
}
+
return init.mcpSession()
- .sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF);
+ .sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF)
+ .flatMap(result -> Mono.just(validateToolResult(callToolRequest.name(), result)));
});
}
+ private McpSchema.CallToolResult validateToolResult(String toolName, McpSchema.CallToolResult result) {
+
+ if (!this.enableCallToolSchemaCaching || result == null || result.isError() == Boolean.TRUE) {
+ // if tool schema caching is disabled or tool call resulted in an error - skip
+ // validation and return the result as it is
+ return result;
+ }
+
+ Map optOutputSchema = this.toolsOutputSchemaCache.get(toolName);
+
+ if (optOutputSchema == null) {
+ logger.warn(
+ "Calling a tool with no outputSchema is not expected to return result with structured content, but got: {}",
+ result.structuredContent());
+ return result;
+ }
+
+ // Validate the tool output against the cached output schema
+ var validation = this.jsonSchemaValidator.validate(optOutputSchema, result.structuredContent());
+
+ if (!validation.valid()) {
+ logger.warn("Tool call result validation failed: {}", validation.errorMessage());
+ throw new IllegalArgumentException("Tool call result validation failed: " + validation.errorMessage());
+ }
+
+ return result;
+ }
+
/**
* Retrieves the list of all tools provided by the server.
* @return A Mono that emits the list of all tools result
*/
public Mono listTools() {
- return this.listTools(McpSchema.FIRST_PAGE)
- .expand(result -> (result.nextCursor() != null) ? this.listTools(result.nextCursor()) : Mono.empty())
- .reduce(new McpSchema.ListToolsResult(new ArrayList<>(), null), (allToolsResult, result) -> {
- allToolsResult.tools().addAll(result.tools());
- return allToolsResult;
- })
- .map(result -> new McpSchema.ListToolsResult(Collections.unmodifiableList(result.tools()), null));
+ return this.listTools(McpSchema.FIRST_PAGE).expand(result -> {
+ String next = result.nextCursor();
+ return (next != null && !next.isEmpty()) ? this.listTools(next) : Mono.empty();
+ }).reduce(new McpSchema.ListToolsResult(new ArrayList<>(), null), (allToolsResult, result) -> {
+ allToolsResult.tools().addAll(result.tools());
+ return allToolsResult;
+ }).map(result -> new McpSchema.ListToolsResult(Collections.unmodifiableList(result.tools()), null));
}
/**
@@ -569,14 +644,26 @@ public Mono listTools() {
* @return A Mono that emits the list of tools result
*/
public Mono listTools(String cursor) {
- return this.initializer.withIntitialization("listing tools", init -> {
- if (init.initializeResult().capabilities().tools() == null) {
- return Mono.error(new IllegalStateException("Server does not provide tools capability"));
- }
- return init.mcpSession()
- .sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor),
- LIST_TOOLS_RESULT_TYPE_REF);
- });
+ return this.initializer.withInitialization("listing tools", init -> this.listToolsInternal(init, cursor));
+ }
+
+ private Mono listToolsInternal(Initialization init, String cursor) {
+
+ if (init.initializeResult().capabilities().tools() == null) {
+ return Mono.error(new IllegalStateException("Server does not provide tools capability"));
+ }
+ return init.mcpSession()
+ .sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor),
+ LIST_TOOLS_RESULT_TYPE_REF)
+ .doOnNext(result -> {
+ if (this.enableCallToolSchemaCaching && result.tools() != null) {
+ // Cache tools output schema
+ result.tools()
+ .stream()
+ .filter(tool -> tool.outputSchema() != null)
+ .forEach(tool -> this.toolsOutputSchemaCache.put(tool.name(), tool.outputSchema()));
+ }
+ });
}
private NotificationHandler asyncToolsChangeNotificationHandler(
@@ -596,13 +683,13 @@ private NotificationHandler asyncToolsChangeNotificationHandler(
// Resources
// --------------------------
- private static final TypeReference LIST_RESOURCES_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef LIST_RESOURCES_RESULT_TYPE_REF = new TypeRef<>() {
};
- private static final TypeReference READ_RESOURCE_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef READ_RESOURCE_RESULT_TYPE_REF = new TypeRef<>() {
};
- private static final TypeReference LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF = new TypeRef<>() {
};
/**
@@ -633,7 +720,7 @@ public Mono listResources() {
* @see #readResource(McpSchema.Resource)
*/
public Mono listResources(String cursor) {
- return this.initializer.withIntitialization("listing resources", init -> {
+ return this.initializer.withInitialization("listing resources", init -> {
if (init.initializeResult().capabilities().resources() == null) {
return Mono.error(new IllegalStateException("Server does not provide the resources capability"));
}
@@ -665,7 +752,7 @@ public Mono readResource(McpSchema.Resource resour
* @see McpSchema.ReadResourceResult
*/
public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) {
- return this.initializer.withIntitialization("reading resources", init -> {
+ return this.initializer.withInitialization("reading resources", init -> {
if (init.initializeResult().capabilities().resources() == null) {
return Mono.error(new IllegalStateException("Server does not provide the resources capability"));
}
@@ -703,7 +790,7 @@ public Mono listResourceTemplates() {
* @see McpSchema.ListResourceTemplatesResult
*/
public Mono listResourceTemplates(String cursor) {
- return this.initializer.withIntitialization("listing resource templates", init -> {
+ return this.initializer.withInitialization("listing resource templates", init -> {
if (init.initializeResult().capabilities().resources() == null) {
return Mono.error(new IllegalStateException("Server does not provide the resources capability"));
}
@@ -723,7 +810,7 @@ public Mono listResourceTemplates(String
* @see #unsubscribeResource(McpSchema.UnsubscribeRequest)
*/
public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
- return this.initializer.withIntitialization("subscribing to resources", init -> init.mcpSession()
+ return this.initializer.withInitialization("subscribing to resources", init -> init.mcpSession()
.sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE));
}
@@ -737,7 +824,7 @@ public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest)
* @see #subscribeResource(McpSchema.SubscribeRequest)
*/
public Mono unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
- return this.initializer.withIntitialization("unsubscribing from resources", init -> init.mcpSession()
+ return this.initializer.withInitialization("unsubscribing from resources", init -> init.mcpSession()
.sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE));
}
@@ -756,7 +843,7 @@ private NotificationHandler asyncResourcesUpdatedNotificationHandler(
List, Mono>> resourcesUpdateConsumers) {
return params -> {
McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params,
- new TypeReference<>() {
+ new TypeRef<>() {
});
return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri()))
@@ -773,10 +860,10 @@ private NotificationHandler asyncResourcesUpdatedNotificationHandler(
// --------------------------
// Prompts
// --------------------------
- private static final TypeReference LIST_PROMPTS_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef LIST_PROMPTS_RESULT_TYPE_REF = new TypeRef<>() {
};
- private static final TypeReference GET_PROMPT_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef GET_PROMPT_RESULT_TYPE_REF = new TypeRef<>() {
};
/**
@@ -803,7 +890,7 @@ public Mono listPrompts() {
* @see #getPrompt(GetPromptRequest)
*/
public Mono listPrompts(String cursor) {
- return this.initializer.withIntitialization("listing prompts", init -> init.mcpSession()
+ return this.initializer.withInitialization("listing prompts", init -> init.mcpSession()
.sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF));
}
@@ -817,7 +904,7 @@ public Mono listPrompts(String cursor) {
* @see #listPrompts()
*/
public Mono getPrompt(GetPromptRequest getPromptRequest) {
- return this.initializer.withIntitialization("getting prompts", init -> init.mcpSession()
+ return this.initializer.withInitialization("getting prompts", init -> init.mcpSession()
.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF));
}
@@ -835,14 +922,6 @@ private NotificationHandler asyncPromptsChangeNotificationHandler(
// --------------------------
// Logging
// --------------------------
- /**
- * Create a notification handler for logging notifications from the server. This
- * handler automatically distributes logging messages to all registered consumers.
- * @param loggingConsumers List of consumers that will be notified when a logging
- * message is received. Each consumer receives the logging message notification.
- * @return A NotificationHandler that processes log notifications by distributing the
- * message to all registered consumers
- */
private NotificationHandler asyncLoggingNotificationHandler(
List>> loggingConsumers) {
@@ -868,7 +947,7 @@ public Mono setLoggingLevel(LoggingLevel loggingLevel) {
return Mono.error(new IllegalArgumentException("Logging level must not be null"));
}
- return this.initializer.withIntitialization("setting logging level", init -> {
+ return this.initializer.withInitialization("setting logging level", init -> {
if (init.initializeResult().capabilities().logging() == null) {
return Mono.error(new IllegalStateException("Server's Logging capabilities are not enabled!"));
}
@@ -877,15 +956,6 @@ public Mono setLoggingLevel(LoggingLevel loggingLevel) {
});
}
- /**
- * Create a notification handler for progress notifications from the server. This
- * handler automatically distributes progress notifications to all registered
- * consumers.
- * @param progressConsumers List of consumers that will be notified when a progress
- * message is received. Each consumer receives the progress notification.
- * @return A NotificationHandler that processes progress notifications by distributing
- * the message to all registered consumers
- */
private NotificationHandler asyncProgressNotificationHandler(
List>> progressConsumers) {
@@ -911,7 +981,7 @@ void setProtocolVersions(List protocolVersions) {
// --------------------------
// Completions
// --------------------------
- private static final TypeReference COMPLETION_COMPLETE_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef COMPLETION_COMPLETE_RESULT_TYPE_REF = new TypeRef<>() {
};
/**
@@ -925,7 +995,7 @@ void setProtocolVersions(List protocolVersions) {
* @see McpSchema.CompleteResult
*/
public Mono completeCompletion(McpSchema.CompleteRequest completeRequest) {
- return this.initializer.withIntitialization("complete completions", init -> init.mcpSession()
+ return this.initializer.withInitialization("complete completions", init -> init.mcpSession()
.sendRequest(McpSchema.METHOD_COMPLETION_COMPLETE, completeRequest, COMPLETION_COMPLETE_RESULT_TYPE_REF));
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java
similarity index 86%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java
index c8af28ac1..c9989f832 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java
@@ -4,17 +4,10 @@
package io.modelcontextprotocol.client;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
-import io.modelcontextprotocol.spec.McpTransport;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
@@ -22,9 +15,19 @@
import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
import io.modelcontextprotocol.spec.McpSchema.Implementation;
import io.modelcontextprotocol.spec.McpSchema.Root;
+import io.modelcontextprotocol.spec.McpTransport;
import io.modelcontextprotocol.util.Assert;
import reactor.core.publisher.Mono;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
/**
* Factory class for creating Model Context Protocol (MCP) clients. MCP is a protocol that
* enables AI models to interact with external tools and resources through a standardized
@@ -72,6 +75,7 @@
* .resourcesChangeConsumer(resources -> Mono.fromRunnable(() -> System.out.println("Resources updated: " + resources)))
* .promptsChangeConsumer(prompts -> Mono.fromRunnable(() -> System.out.println("Prompts updated: " + prompts)))
* .loggingConsumer(message -> Mono.fromRunnable(() -> System.out.println("Log message: " + message)))
+ * .resourcesUpdateConsumer(resourceContents -> Mono.fromRunnable(() -> System.out.println("Resources contents updated: " + resourceContents)))
* .build();
* }
*
@@ -97,6 +101,7 @@
*
* @author Christian Tzolov
* @author Dariusz Jędrzejczyk
+ * @author Anurag Pant
* @see McpAsyncClient
* @see McpSyncClient
* @see McpTransport
@@ -163,7 +168,7 @@ class SyncSpec {
private ClientCapabilities capabilities;
- private Implementation clientInfo = new Implementation("Java SDK MCP Client", "1.0.0");
+ private Implementation clientInfo = new Implementation("Java SDK MCP Client", "0.15.0");
private final Map roots = new HashMap<>();
@@ -183,6 +188,12 @@ class SyncSpec {
private Function elicitationHandler;
+ private Supplier contextProvider = () -> McpTransportContext.EMPTY;
+
+ private JsonSchemaValidator jsonSchemaValidator;
+
+ private boolean enableCallToolSchemaCaching = false; // Default to false
+
private SyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
@@ -336,6 +347,22 @@ public SyncSpec resourcesChangeConsumer(Consumer> resou
return this;
}
+ /**
+ * Adds a consumer to be notified when a specific resource is updated. This allows
+ * the client to react to changes in individual resources, such as updates to
+ * their content or metadata.
+ * @param resourcesUpdateConsumer A consumer function that processes the updated
+ * resource and returns a Mono indicating the completion of the processing. Must
+ * not be null.
+ * @return This builder instance for method chaining.
+ * @throws IllegalArgumentException If the resourcesUpdateConsumer is null.
+ */
+ public SyncSpec resourcesUpdateConsumer(Consumer> resourcesUpdateConsumer) {
+ Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null");
+ this.resourcesUpdateConsumers.add(resourcesUpdateConsumer);
+ return this;
+ }
+
/**
* Adds a consumer to be notified when the available prompts change. This allows
* the client to react to changes in the server's prompt templates, such as new
@@ -409,6 +436,48 @@ public SyncSpec progressConsumers(List>
return this;
}
+ /**
+ * Add a provider of {@link McpTransportContext}, providing a context before
+ * calling any client operation. This allows to extract thread-locals and hand
+ * them over to the underlying transport.
+ *
+ * There is no direct equivalent in {@link AsyncSpec}. To achieve the same result,
+ * append {@code contextWrite(McpTransportContext.KEY, context)} to any
+ * {@link McpAsyncClient} call.
+ * @param contextProvider A supplier to create a context
+ * @return This builder for method chaining
+ */
+ public SyncSpec transportContextProvider(Supplier contextProvider) {
+ this.contextProvider = contextProvider;
+ return this;
+ }
+
+ /**
+ * Add a {@link JsonSchemaValidator} to validate the JSON structure of the
+ * structured output.
+ * @param jsonSchemaValidator A validator to validate the JSON structure of the
+ * structured output. Must not be null.
+ * @return This builder for method chaining
+ * @throws IllegalArgumentException if jsonSchemaValidator is null
+ */
+ public SyncSpec jsonSchemaValidator(JsonSchemaValidator jsonSchemaValidator) {
+ Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null");
+ this.jsonSchemaValidator = jsonSchemaValidator;
+ return this;
+ }
+
+ /**
+ * Enables automatic schema caching during callTool operations. When a tool's
+ * output schema is not found in the cache, callTool will automatically fetch and
+ * cache all tool schemas via listTools.
+ * @param enableCallToolSchemaCaching true to enable, false to disable
+ * @return This builder instance for method chaining
+ */
+ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching) {
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ return this;
+ }
+
/**
* Create an instance of {@link McpSyncClient} with the provided configurations or
* sensible defaults.
@@ -418,12 +487,13 @@ public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
- this.elicitationHandler);
+ this.elicitationHandler, this.enableCallToolSchemaCaching);
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
- return new McpSyncClient(
- new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
+ return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
+ jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(),
+ asyncFeatures), this.contextProvider);
}
}
@@ -454,7 +524,7 @@ class AsyncSpec {
private ClientCapabilities capabilities;
- private Implementation clientInfo = new Implementation("Spring AI MCP Client", "0.3.1");
+ private Implementation clientInfo = new Implementation("Java SDK MCP Client", "0.15.0");
private final Map roots = new HashMap<>();
@@ -474,6 +544,10 @@ class AsyncSpec {
private Function> elicitationHandler;
+ private JsonSchemaValidator jsonSchemaValidator;
+
+ private boolean enableCallToolSchemaCaching = false; // Default to false
+
private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
@@ -720,17 +794,45 @@ public AsyncSpec progressConsumers(
return this;
}
+ /**
+ * Sets the JSON schema validator to use for validating tool responses against
+ * output schemas.
+ * @param jsonSchemaValidator The validator to use. Must not be null.
+ * @return This builder instance for method chaining
+ * @throws IllegalArgumentException if jsonSchemaValidator is null
+ */
+ public AsyncSpec jsonSchemaValidator(JsonSchemaValidator jsonSchemaValidator) {
+ Assert.notNull(jsonSchemaValidator, "JsonSchemaValidator must not be null");
+ this.jsonSchemaValidator = jsonSchemaValidator;
+ return this;
+ }
+
+ /**
+ * Enables automatic schema caching during callTool operations. When a tool's
+ * output schema is not found in the cache, callTool will automatically fetch and
+ * cache all tool schemas via listTools.
+ * @param enableCallToolSchemaCaching true to enable, false to disable
+ * @return This builder instance for method chaining
+ */
+ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching) {
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
+ return this;
+ }
+
/**
* Create an instance of {@link McpAsyncClient} with the provided configurations
* or sensible defaults.
* @return a new instance of {@link McpAsyncClient}.
*/
public McpAsyncClient build() {
+ var jsonSchemaValidator = (this.jsonSchemaValidator != null) ? this.jsonSchemaValidator
+ : JsonSchemaValidator.getDefault();
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
+ jsonSchemaValidator,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
- this.samplingHandler, this.elicitationHandler));
+ this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
similarity index 94%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
index 3b6550765..127d53337 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java
@@ -62,6 +62,7 @@ class McpClientFeatures {
* @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List, Mono>> toolsChangeConsumers,
@@ -71,7 +72,8 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List>> loggingConsumers,
List>> progressConsumers,
Function> samplingHandler,
- Function> elicitationHandler) {
+ Function> elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
/**
* Create an instance and validate the arguments.
@@ -84,6 +86,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
* @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots,
@@ -94,7 +97,8 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List>> loggingConsumers,
List>> progressConsumers,
Function> samplingHandler,
- Function> elicitationHandler) {
+ Function> elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
@@ -113,6 +117,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
this.samplingHandler = samplingHandler;
this.elicitationHandler = elicitationHandler;
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
}
/**
@@ -129,7 +134,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
Function> elicitationHandler) {
this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
- elicitationHandler);
+ elicitationHandler, false);
}
/**
@@ -187,7 +192,8 @@ public static Async fromSync(Sync syncSpec) {
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
- loggingConsumers, progressConsumers, samplingHandler, elicitationHandler);
+ loggingConsumers, progressConsumers, samplingHandler, elicitationHandler,
+ syncSpec.enableCallToolSchemaCaching);
}
}
@@ -205,6 +211,7 @@ public static Async fromSync(Sync syncSpec) {
* @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
@@ -214,7 +221,8 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
List> loggingConsumers,
List> progressConsumers,
Function samplingHandler,
- Function elicitationHandler) {
+ Function elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
/**
* Create an instance and validate the arguments.
@@ -229,6 +237,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
* @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
+ * @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map roots, List>> toolsChangeConsumers,
@@ -238,7 +247,8 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
List> loggingConsumers,
List> progressConsumers,
Function samplingHandler,
- Function elicitationHandler) {
+ Function elicitationHandler,
+ boolean enableCallToolSchemaCaching) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
@@ -257,6 +267,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
this.progressConsumers = progressConsumers != null ? progressConsumers : List.of();
this.samplingHandler = samplingHandler;
this.elicitationHandler = elicitationHandler;
+ this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
}
/**
@@ -272,7 +283,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
Function elicitationHandler) {
this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
- elicitationHandler);
+ elicitationHandler, false);
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
similarity index 82%
rename from mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
index 33784adcd..7fdaa8941 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java
@@ -5,16 +5,19 @@
package io.modelcontextprotocol.client;
import java.time.Duration;
+import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
import io.modelcontextprotocol.spec.McpSchema.GetPromptResult;
import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult;
import io.modelcontextprotocol.util.Assert;
+import reactor.core.publisher.Mono;
/**
* A synchronous client implementation for the Model Context Protocol (MCP) that wraps an
@@ -63,14 +66,20 @@ public class McpSyncClient implements AutoCloseable {
private final McpAsyncClient delegate;
+ private final Supplier contextProvider;
+
/**
* Create a new McpSyncClient with the given delegate.
* @param delegate the asynchronous kernel on top of which this synchronous client
* provides a blocking API.
+ * @param contextProvider the supplier of context before calling any non-blocking
+ * operation on underlying delegate
*/
- McpSyncClient(McpAsyncClient delegate) {
+ McpSyncClient(McpAsyncClient delegate, Supplier contextProvider) {
Assert.notNull(delegate, "The delegate can not be null");
+ Assert.notNull(contextProvider, "The contextProvider can not be null");
this.delegate = delegate;
+ this.contextProvider = contextProvider;
}
/**
@@ -177,14 +186,14 @@ public boolean closeGracefully() {
public McpSchema.InitializeResult initialize() {
// TODO: block takes no argument here as we assume the async client is
// configured with a requestTimeout at all times
- return this.delegate.initialize().block();
+ return withProvidedContext(this.delegate.initialize()).block();
}
/**
* Send a roots/list_changed notification.
*/
public void rootsListChangedNotification() {
- this.delegate.rootsListChangedNotification().block();
+ withProvidedContext(this.delegate.rootsListChangedNotification()).block();
}
/**
@@ -206,7 +215,7 @@ public void removeRoot(String rootUri) {
* @return
*/
public Object ping() {
- return this.delegate.ping().block();
+ return withProvidedContext(this.delegate.ping()).block();
}
// --------------------------
@@ -224,7 +233,8 @@ public Object ping() {
* Boolean indicating if the execution failed (true) or succeeded (false/absent)
*/
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
- return this.delegate.callTool(callToolRequest).block();
+ return withProvidedContext(this.delegate.callTool(callToolRequest)).block();
+
}
/**
@@ -234,7 +244,7 @@ public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolReque
* pagination if more tools are available
*/
public McpSchema.ListToolsResult listTools() {
- return this.delegate.listTools().block();
+ return withProvidedContext(this.delegate.listTools()).block();
}
/**
@@ -245,7 +255,8 @@ public McpSchema.ListToolsResult listTools() {
* pagination if more tools are available
*/
public McpSchema.ListToolsResult listTools(String cursor) {
- return this.delegate.listTools(cursor).block();
+ return withProvidedContext(this.delegate.listTools(cursor)).block();
+
}
// --------------------------
@@ -257,7 +268,8 @@ public McpSchema.ListToolsResult listTools(String cursor) {
* @return The list of all resources result
*/
public McpSchema.ListResourcesResult listResources() {
- return this.delegate.listResources().block();
+ return withProvidedContext(this.delegate.listResources()).block();
+
}
/**
@@ -266,7 +278,8 @@ public McpSchema.ListResourcesResult listResources() {
* @return The list of resources result
*/
public McpSchema.ListResourcesResult listResources(String cursor) {
- return this.delegate.listResources(cursor).block();
+ return withProvidedContext(this.delegate.listResources(cursor)).block();
+
}
/**
@@ -275,7 +288,8 @@ public McpSchema.ListResourcesResult listResources(String cursor) {
* @return the resource content.
*/
public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
- return this.delegate.readResource(resource).block();
+ return withProvidedContext(this.delegate.readResource(resource)).block();
+
}
/**
@@ -284,7 +298,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
* @return the resource content.
*/
public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
- return this.delegate.readResource(readResourceRequest).block();
+ return withProvidedContext(this.delegate.readResource(readResourceRequest)).block();
+
}
/**
@@ -292,7 +307,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest r
* @return The list of all resource templates result.
*/
public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
- return this.delegate.listResourceTemplates().block();
+ return withProvidedContext(this.delegate.listResourceTemplates()).block();
+
}
/**
@@ -304,7 +320,8 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
* @return The list of resource templates result.
*/
public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
- return this.delegate.listResourceTemplates(cursor).block();
+ return withProvidedContext(this.delegate.listResourceTemplates(cursor)).block();
+
}
/**
@@ -317,7 +334,8 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor
* subscribe to.
*/
public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
- this.delegate.subscribeResource(subscribeRequest).block();
+ withProvidedContext(this.delegate.subscribeResource(subscribeRequest)).block();
+
}
/**
@@ -326,7 +344,8 @@ public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
* to unsubscribe from.
*/
public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
- this.delegate.unsubscribeResource(unsubscribeRequest).block();
+ withProvidedContext(this.delegate.unsubscribeResource(unsubscribeRequest)).block();
+
}
// --------------------------
@@ -338,7 +357,7 @@ public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest)
* @return The list of all prompts result.
*/
public ListPromptsResult listPrompts() {
- return this.delegate.listPrompts().block();
+ return withProvidedContext(this.delegate.listPrompts()).block();
}
/**
@@ -347,11 +366,12 @@ public ListPromptsResult listPrompts() {
* @return The list of prompts result.
*/
public ListPromptsResult listPrompts(String cursor) {
- return this.delegate.listPrompts(cursor).block();
+ return withProvidedContext(this.delegate.listPrompts(cursor)).block();
+
}
public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
- return this.delegate.getPrompt(getPromptRequest).block();
+ return withProvidedContext(this.delegate.getPrompt(getPromptRequest)).block();
}
/**
@@ -359,7 +379,8 @@ public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
* @param loggingLevel the min logging level
*/
public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
- this.delegate.setLoggingLevel(loggingLevel).block();
+ withProvidedContext(this.delegate.setLoggingLevel(loggingLevel)).block();
+
}
/**
@@ -369,7 +390,18 @@ public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
* @return the completion result containing suggested values.
*/
public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) {
- return this.delegate.completeCompletion(completeRequest).block();
+ return withProvidedContext(this.delegate.completeCompletion(completeRequest)).block();
+
+ }
+
+ /**
+ * For a given action, on assembly, capture the "context" via the
+ * {@link #contextProvider} and store it in the Reactor context.
+ * @param action the action to perform
+ * @return the result of the action
+ */
+ private Mono withProvidedContext(Mono action) {
+ return action.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, this.contextProvider.get()));
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
similarity index 71%
rename from mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
index 0f3511afb..ae093316f 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java
@@ -18,16 +18,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
-import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
import io.modelcontextprotocol.spec.McpTransportException;
+import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import reactor.core.Disposable;
@@ -94,8 +96,8 @@ public class HttpClientSseClientTransport implements McpClientTransport {
/** HTTP request builder for building requests to send messages to the server */
private final HttpRequest.Builder requestBuilder;
- /** JSON object mapper for message serialization/deserialization */
- protected ObjectMapper objectMapper;
+ /** JSON mapper for message serialization/deserialization */
+ protected McpJsonMapper jsonMapper;
/** Flag indicating if the transport is in closing state */
private volatile boolean isClosing = false;
@@ -112,67 +114,7 @@ public class HttpClientSseClientTransport implements McpClientTransport {
/**
* Customizer to modify requests before they are executed.
*/
- private final AsyncHttpRequestCustomizer httpRequestCustomizer;
-
- /**
- * Creates a new transport instance with default HTTP client and object mapper.
- * @param baseUri the base URI of the MCP server
- * @deprecated Use {@link HttpClientSseClientTransport#builder(String)} instead. This
- * constructor will be removed in future versions.
- */
- @Deprecated(forRemoval = true)
- public HttpClientSseClientTransport(String baseUri) {
- this(HttpClient.newBuilder(), baseUri, new ObjectMapper());
- }
-
- /**
- * Creates a new transport instance with custom HTTP client builder and object mapper.
- * @param clientBuilder the HTTP client builder to use
- * @param baseUri the base URI of the MCP server
- * @param objectMapper the object mapper for JSON serialization/deserialization
- * @throws IllegalArgumentException if objectMapper or clientBuilder is null
- * @deprecated Use {@link HttpClientSseClientTransport#builder(String)} instead. This
- * constructor will be removed in future versions.
- */
- @Deprecated(forRemoval = true)
- public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, String baseUri, ObjectMapper objectMapper) {
- this(clientBuilder, baseUri, DEFAULT_SSE_ENDPOINT, objectMapper);
- }
-
- /**
- * Creates a new transport instance with custom HTTP client builder and object mapper.
- * @param clientBuilder the HTTP client builder to use
- * @param baseUri the base URI of the MCP server
- * @param sseEndpoint the SSE endpoint path
- * @param objectMapper the object mapper for JSON serialization/deserialization
- * @throws IllegalArgumentException if objectMapper or clientBuilder is null
- * @deprecated Use {@link HttpClientSseClientTransport#builder(String)} instead. This
- * constructor will be removed in future versions.
- */
- @Deprecated(forRemoval = true)
- public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, String baseUri, String sseEndpoint,
- ObjectMapper objectMapper) {
- this(clientBuilder, HttpRequest.newBuilder(), baseUri, sseEndpoint, objectMapper);
- }
-
- /**
- * Creates a new transport instance with custom HTTP client builder, object mapper,
- * and headers.
- * @param clientBuilder the HTTP client builder to use
- * @param requestBuilder the HTTP request builder to use
- * @param baseUri the base URI of the MCP server
- * @param sseEndpoint the SSE endpoint path
- * @param objectMapper the object mapper for JSON serialization/deserialization
- * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
- * @deprecated Use {@link HttpClientSseClientTransport#builder(String)} instead. This
- * constructor will be removed in future versions.
- */
- @Deprecated(forRemoval = true)
- public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpRequest.Builder requestBuilder,
- String baseUri, String sseEndpoint, ObjectMapper objectMapper) {
- this(clientBuilder.connectTimeout(Duration.ofSeconds(10)).build(), requestBuilder, baseUri, sseEndpoint,
- objectMapper);
- }
+ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
/**
* Creates a new transport instance with custom HTTP client builder, object mapper,
@@ -181,30 +123,14 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
* @param requestBuilder the HTTP request builder to use
* @param baseUri the base URI of the MCP server
* @param sseEndpoint the SSE endpoint path
- * @param objectMapper the object mapper for JSON serialization/deserialization
- * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
- */
- @Deprecated(forRemoval = true)
- HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
- String sseEndpoint, ObjectMapper objectMapper) {
- this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
- }
-
- /**
- * Creates a new transport instance with custom HTTP client builder, object mapper,
- * and headers.
- * @param httpClient the HTTP client to use
- * @param requestBuilder the HTTP request builder to use
- * @param baseUri the base URI of the MCP server
- * @param sseEndpoint the SSE endpoint path
- * @param objectMapper the object mapper for JSON serialization/deserialization
+ * @param jsonMapper the object mapper for JSON serialization/deserialization
* @param httpRequestCustomizer customizer for the requestBuilder before executing
* requests
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
- String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
- Assert.notNull(objectMapper, "ObjectMapper must not be null");
+ String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
Assert.notNull(httpClient, "httpClient must not be null");
@@ -212,7 +138,7 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
this.baseUri = URI.create(baseUri);
this.sseEndpoint = sseEndpoint;
- this.objectMapper = objectMapper;
+ this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.httpRequestCustomizer = httpRequestCustomizer;
@@ -241,16 +167,15 @@ public static class Builder {
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
- private HttpClient.Builder clientBuilder = HttpClient.newBuilder()
- .version(HttpClient.Version.HTTP_1_1)
- .connectTimeout(Duration.ofSeconds(10));
+ private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);
- private ObjectMapper objectMapper = new ObjectMapper();
+ private McpJsonMapper jsonMapper;
- private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
- .header("Content-Type", "application/json");
+ private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
- private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;
+ private McpAsyncHttpClientRequestCustomizer httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.NOOP;
+
+ private Duration connectTimeout = Duration.ofSeconds(10);
/**
* Creates a new builder instance.
@@ -339,13 +264,13 @@ public Builder customizeRequest(final Consumer requestCusto
}
/**
- * Sets the object mapper for JSON serialization/deserialization.
- * @param objectMapper the object mapper
+ * Sets the JSON mapper implementation to use for serialization/deserialization.
+ * @param jsonMapper the JSON mapper
* @return this builder
*/
- public Builder objectMapper(ObjectMapper objectMapper) {
- Assert.notNull(objectMapper, "objectMapper must not be null");
- this.objectMapper = objectMapper;
+ public Builder jsonMapper(McpJsonMapper jsonMapper) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ this.jsonMapper = jsonMapper;
return this;
}
@@ -354,16 +279,17 @@ public Builder objectMapper(ObjectMapper objectMapper) {
* executing them.
*
* This overrides the customizer from
- * {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}.
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}.
*
- * Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking
- * context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}
+ * Do NOT use a blocking {@link McpSyncHttpClientRequestCustomizer} in a
+ * non-blocking context. Use
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}
* instead.
* @param syncHttpRequestCustomizer the request customizer
* @return this builder
*/
- public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
- this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
+ public Builder httpRequestCustomizer(McpSyncHttpClientRequestCustomizer syncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.fromSync(syncHttpRequestCustomizer);
return this;
}
@@ -372,24 +298,36 @@ public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCu
* executing them.
*
* This overrides the customizer from
- * {@link #httpRequestCustomizer(SyncHttpRequestCustomizer)}.
+ * {@link #httpRequestCustomizer(McpSyncHttpClientRequestCustomizer)}.
*
* Do NOT use a blocking implementation in a non-blocking context.
* @param asyncHttpRequestCustomizer the request customizer
* @return this builder
*/
- public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
+ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer asyncHttpRequestCustomizer) {
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
return this;
}
+ /**
+ * Sets the connection timeout for the HTTP client.
+ * @param connectTimeout the connection timeout duration
+ * @return this builder
+ */
+ public Builder connectTimeout(Duration connectTimeout) {
+ Assert.notNull(connectTimeout, "connectTimeout must not be null");
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
/**
* Builds a new {@link HttpClientSseClientTransport} instance.
* @return a new transport instance
*/
public HttpClientSseClientTransport build() {
- return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
- objectMapper, httpRequestCustomizer);
+ HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
+ return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint,
+ jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer);
}
}
@@ -398,14 +336,15 @@ public HttpClientSseClientTransport build() {
public Mono connect(Function, Mono> handler) {
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
- return Mono.defer(() -> {
+ return Mono.deferContextual(ctx -> {
var builder = requestBuilder.copy()
.uri(uri)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
.GET();
- return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
}).flatMap(requestBuilder -> Mono.create(sink -> {
Disposable connection = Flux.create(sseSink -> this.httpClient
.sendAsync(requestBuilder.build(),
@@ -435,7 +374,7 @@ public Mono connect(Function, Mono> h
}
}
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
- JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
+ JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper,
responseEvent.sseEvent().data());
sink.success();
return Flux.just(message);
@@ -516,7 +455,7 @@ public Mono sendMessage(JSONRPCMessage message) {
private Mono serializeMessage(final JSONRPCMessage message) {
return Mono.defer(() -> {
try {
- return Mono.just(objectMapper.writeValueAsString(message));
+ return Mono.just(jsonMapper.writeValueAsString(message));
}
catch (IOException e) {
return Mono.error(new McpTransportException("Failed to serialize message", e));
@@ -526,12 +465,14 @@ private Mono serializeMessage(final JSONRPCMessage message) {
private Mono> sendHttpPost(final String endpoint, final String body) {
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
- return Mono.defer(() -> {
+ return Mono.deferContextual(ctx -> {
var builder = this.requestBuilder.copy()
.uri(requestUri)
+ .header(HttpHeaders.CONTENT_TYPE, "application/json")
.header(MCP_PROTOCOL_VERSION_HEADER_NAME, MCP_PROTOCOL_VERSION)
.POST(HttpRequest.BodyPublishers.ofString(body));
- return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body, transportContext));
}).flatMap(customizedBuilder -> {
var request = customizedBuilder.build();
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
@@ -565,8 +506,8 @@ public Mono closeGracefully() {
* @return the unmarshalled object
*/
@Override
- public T unmarshalFrom(Object data, TypeReference typeRef) {
- return this.objectMapper.convertValue(data, typeRef);
+ public T unmarshalFrom(Object data, TypeRef typeRef) {
+ return this.jsonMapper.convertValue(data, typeRef);
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
similarity index 76%
rename from mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
index 93c28422a..e41f45ebb 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java
@@ -11,6 +11,8 @@
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.time.Duration;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
@@ -18,14 +20,14 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
+import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
+import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
@@ -38,6 +40,9 @@
import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
@@ -74,8 +79,6 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private static final Logger logger = LoggerFactory.getLogger(HttpClientStreamableHttpTransport.class);
- private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2025_03_26;
-
private static final String DEFAULT_ENDPOINT = "/mcp";
/**
@@ -103,7 +106,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
public static int BAD_REQUEST = 400;
- private final ObjectMapper objectMapper;
+ private final McpJsonMapper jsonMapper;
private final URI baseUri;
@@ -113,18 +116,23 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private final boolean resumableStreams;
- private final AsyncHttpRequestCustomizer httpRequestCustomizer;
+ private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
- private final AtomicReference activeSession = new AtomicReference<>();
+ private final AtomicReference> activeSession = new AtomicReference<>();
private final AtomicReference, Mono>> handler = new AtomicReference<>();
private final AtomicReference> exceptionHandler = new AtomicReference<>();
- private HttpClientStreamableHttpTransport(ObjectMapper objectMapper, HttpClient httpClient,
+ private final List supportedProtocolVersions;
+
+ private final String latestSupportedProtocolVersion;
+
+ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
- boolean openConnectionOnStartup, AsyncHttpRequestCustomizer httpRequestCustomizer) {
- this.objectMapper = objectMapper;
+ boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
+ List supportedProtocolVersions) {
+ this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.baseUri = URI.create(baseUri);
@@ -133,11 +141,16 @@ private HttpClientStreamableHttpTransport(ObjectMapper objectMapper, HttpClient
this.openConnectionOnStartup = openConnectionOnStartup;
this.activeSession.set(createTransportSession());
this.httpRequestCustomizer = httpRequestCustomizer;
+ this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
+ this.latestSupportedProtocolVersion = this.supportedProtocolVersions.stream()
+ .sorted(Comparator.reverseOrder())
+ .findFirst()
+ .get();
}
@Override
public List protocolVersions() {
- return List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26);
+ return supportedProtocolVersions;
}
public static Builder builder(String baseUri) {
@@ -159,23 +172,34 @@ public Mono connect(Function, Mono createTransportSession() {
Function> onClose = sessionId -> sessionId == null ? Mono.empty()
: createDelete(sessionId);
return new DefaultMcpTransportSession(onClose);
}
+ private McpTransportSession createClosedSession(McpTransportSession existingSession) {
+ var existingSessionId = Optional.ofNullable(existingSession)
+ .filter(session -> !(session instanceof ClosedMcpTransportSession))
+ .flatMap(McpTransportSession::sessionId)
+ .orElse(null);
+ return new ClosedMcpTransportSession<>(existingSessionId);
+ }
+
private Publisher createDelete(String sessionId) {
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
- return Mono.defer(() -> {
+ return Mono.deferContextual(ctx -> {
var builder = this.requestBuilder.copy()
.uri(uri)
.header("Cache-Control", "no-cache")
.header(HttpHeaders.MCP_SESSION_ID, sessionId)
- .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
.DELETE();
- return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext));
}).flatMap(requestBuilder -> {
var request = requestBuilder.build();
return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
@@ -205,9 +229,9 @@ private void handleException(Throwable t) {
public Mono closeGracefully() {
return Mono.defer(() -> {
logger.debug("Graceful close triggered");
- DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
+ McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
if (currentSession != null) {
- return currentSession.closeGracefully();
+ return Mono.from(currentSession.closeGracefully());
}
return Mono.empty();
});
@@ -228,7 +252,7 @@ private Mono reconnect(McpTransportStream stream) {
final McpTransportSession transportSession = this.activeSession.get();
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
- Disposable connection = Mono.defer(() -> {
+ Disposable connection = Mono.deferContextual(connectionCtx -> {
HttpRequest.Builder requestBuilder = this.requestBuilder.copy();
if (transportSession != null && transportSession.sessionId().isPresent()) {
@@ -241,11 +265,14 @@ private Mono reconnect(McpTransportStream stream) {
}
var builder = requestBuilder.uri(uri)
- .header("Accept", TEXT_EVENT_STREAM)
+ .header(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM)
.header("Cache-Control", "no-cache")
- .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
.GET();
- return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null));
+ var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
})
.flatMapMany(
requestBuilder -> Flux.create(
@@ -273,7 +300,7 @@ private Mono reconnect(McpTransportStream stream) {
// won't since the next version considers
// removing it.
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(
- this.objectMapper, responseEvent.sseEvent().data());
+ this.jsonMapper, responseEvent.sseEvent().data());
Tuple2, Iterable> idWithMessages = Tuples
.of(Optional.ofNullable(responseEvent.sseEvent().id()),
@@ -365,7 +392,7 @@ private BodyHandler toSendMessageBodySubscriber(FluxSink si
BodyHandler responseBodyHandler = responseInfo -> {
- String contentType = responseInfo.headers().firstValue("Content-Type").orElse("").toLowerCase();
+ String contentType = responseInfo.headers().firstValue(HttpHeaders.CONTENT_TYPE).orElse("").toLowerCase();
if (contentType.contains(TEXT_EVENT_STREAM)) {
// For SSE streams, use line subscriber that returns Void
@@ -388,7 +415,7 @@ else if (contentType.contains(APPLICATION_JSON)) {
public String toString(McpSchema.JSONRPCMessage message) {
try {
- return this.objectMapper.writeValueAsString(message);
+ return this.jsonMapper.writeValueAsString(message);
}
catch (IOException e) {
throw new RuntimeException("Failed to serialize JSON-RPC message", e);
@@ -405,7 +432,7 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
String jsonBody = this.toString(sentMessage);
- Disposable connection = Mono.defer(() -> {
+ Disposable connection = Mono.deferContextual(ctx -> {
HttpRequest.Builder requestBuilder = this.requestBuilder.copy();
if (transportSession != null && transportSession.sessionId().isPresent()) {
@@ -414,12 +441,16 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
}
var builder = requestBuilder.uri(uri)
- .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
- .header("Content-Type", APPLICATION_JSON)
- .header("Cache-Control", "no-cache")
- .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
+ .header(HttpHeaders.ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
+ .header(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON)
+ .header(HttpHeaders.CACHE_CONTROL, "no-cache")
+ .header(HttpHeaders.PROTOCOL_VERSION,
+ ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
+ this.latestSupportedProtocolVersion))
.POST(HttpRequest.BodyPublishers.ofString(jsonBody));
- return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody));
+ var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
+ return Mono
+ .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext));
}).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> {
// Create the async request with proper body subscriber selection
@@ -451,15 +482,19 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) {
String contentType = responseEvent.responseInfo()
.headers()
- .firstValue("Content-Type")
+ .firstValue(HttpHeaders.CONTENT_TYPE)
.orElse("")
.toLowerCase();
- if (contentType.isBlank()) {
- logger.debug("No content type returned for POST in session {}", sessionRepresentation);
+ String contentLength = responseEvent.responseInfo()
+ .headers()
+ .firstValue(HttpHeaders.CONTENT_LENGTH)
+ .orElse(null);
+
+ if (contentType.isBlank() || "0".equals(contentLength)) {
+ logger.debug("No body returned for POST in session {}", sessionRepresentation);
// No content type means no response body, so we can just
- // return
- // an empty stream
+ // return an empty stream
deliveredSink.success();
return Flux.empty();
}
@@ -472,7 +507,7 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) {
// since the
// next version considers removing it.
McpSchema.JSONRPCMessage message = McpSchema
- .deserializeJsonRpcMessage(this.objectMapper, sseEvent.data());
+ .deserializeJsonRpcMessage(this.jsonMapper, sseEvent.data());
Tuple2, Iterable> idWithMessages = Tuples
.of(Optional.ofNullable(sseEvent.id()), List.of(message));
@@ -495,13 +530,14 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) {
else if (contentType.contains(APPLICATION_JSON)) {
deliveredSink.success();
String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
- if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) {
- logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data);
+ if (sentMessage instanceof McpSchema.JSONRPCNotification) {
+ logger.warn("Notification: {} received non-compliant response: {}", sentMessage,
+ Utils.hasText(data) ? data : "[empty]");
return Mono.empty();
}
try {
- return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
+ return Mono.just(McpSchema.deserializeJsonRpcMessage(jsonMapper, data));
}
catch (IOException e) {
return Mono.error(new McpTransportException(
@@ -575,8 +611,8 @@ private static String sessionIdOrPlaceholder(McpTransportSession> transportSes
}
@Override
- public T unmarshalFrom(Object data, TypeReference typeRef) {
- return this.objectMapper.convertValue(data, typeRef);
+ public T unmarshalFrom(Object data, TypeRef typeRef) {
+ return this.jsonMapper.convertValue(data, typeRef);
}
/**
@@ -586,11 +622,9 @@ public static class Builder {
private final String baseUri;
- private ObjectMapper objectMapper;
+ private McpJsonMapper jsonMapper;
- private HttpClient.Builder clientBuilder = HttpClient.newBuilder()
- .version(HttpClient.Version.HTTP_1_1)
- .connectTimeout(Duration.ofSeconds(10));
+ private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);
private String endpoint = DEFAULT_ENDPOINT;
@@ -600,7 +634,12 @@ public static class Builder {
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder();
- private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;
+ private McpAsyncHttpClientRequestCustomizer httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.NOOP;
+
+ private Duration connectTimeout = Duration.ofSeconds(10);
+
+ private List supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
+ ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18);
/**
* Creates a new builder with the specified base URI.
@@ -656,13 +695,13 @@ public Builder customizeRequest(final Consumer requestCusto
}
/**
- * Configure the {@link ObjectMapper} to use.
- * @param objectMapper instance to use
+ * Configure a custom {@link McpJsonMapper} implementation to use.
+ * @param jsonMapper instance to use
* @return the builder instance
*/
- public Builder objectMapper(ObjectMapper objectMapper) {
- Assert.notNull(objectMapper, "ObjectMapper must not be null");
- this.objectMapper = objectMapper;
+ public Builder jsonMapper(McpJsonMapper jsonMapper) {
+ Assert.notNull(jsonMapper, "jsonMapper must not be null");
+ this.jsonMapper = jsonMapper;
return this;
}
@@ -709,16 +748,17 @@ public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
* executing them.
*
* This overrides the customizer from
- * {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}.
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}.
*
- * Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking
- * context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}
+ * Do NOT use a blocking {@link McpSyncHttpClientRequestCustomizer} in a
+ * non-blocking context. Use
+ * {@link #asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer)}
* instead.
* @param syncHttpRequestCustomizer the request customizer
* @return this builder
*/
- public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
- this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
+ public Builder httpRequestCustomizer(McpSyncHttpClientRequestCustomizer syncHttpRequestCustomizer) {
+ this.httpRequestCustomizer = McpAsyncHttpClientRequestCustomizer.fromSync(syncHttpRequestCustomizer);
return this;
}
@@ -727,27 +767,62 @@ public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCu
* executing them.
*
* This overrides the customizer from
- * {@link #httpRequestCustomizer(SyncHttpRequestCustomizer)}.
+ * {@link #httpRequestCustomizer(McpSyncHttpClientRequestCustomizer)}.
*
* Do NOT use a blocking implementation in a non-blocking context.
* @param asyncHttpRequestCustomizer the request customizer
* @return this builder
*/
- public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
+ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer asyncHttpRequestCustomizer) {
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
return this;
}
+ /**
+ * Sets the connection timeout for the HTTP client.
+ * @param connectTimeout the connection timeout duration
+ * @return this builder
+ */
+ public Builder connectTimeout(Duration connectTimeout) {
+ Assert.notNull(connectTimeout, "connectTimeout must not be null");
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ /**
+ * Sets the list of supported protocol versions used in version negotiation. By
+ * default, the client will send the latest of those versions in the
+ * {@code MCP-Protocol-Version} header.
+ *
+ * Setting this value only updates the values used in version negotiation, and
+ * does NOT impact the actual capabilities of the transport. It should only be
+ * used for compatibility with servers having strict requirements around the
+ * {@code MCP-Protocol-Version} header.
+ * @param supportedProtocolVersions protocol versions supported by this transport
+ * @return this builder
+ * @see version
+ * negotiation specification
+ * @see Protocol
+ * Version Header
+ */
+ public Builder supportedProtocolVersions(List supportedProtocolVersions) {
+ Assert.notEmpty(supportedProtocolVersions, "supportedProtocolVersions must not be empty");
+ this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
+ return this;
+ }
+
/**
* Construct a fresh instance of {@link HttpClientStreamableHttpTransport} using
* the current builder configuration.
* @return a new instance of {@link HttpClientStreamableHttpTransport}
*/
public HttpClientStreamableHttpTransport build() {
- ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
-
- return new HttpClientStreamableHttpTransport(objectMapper, clientBuilder.build(), requestBuilder, baseUri,
- endpoint, resumableStreams, openConnectionOnStartup, httpRequestCustomizer);
+ HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
+ return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper,
+ httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
+ httpRequestCustomizer, supportedProtocolVersions);
}
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
similarity index 89%
rename from mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
index 296d1a17d..29dc23c35 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java
@@ -141,7 +141,6 @@ protected void hookOnSubscribe(Subscription subscription) {
@Override
protected void hookOnNext(String line) {
-
if (line.isEmpty()) {
// Empty line means end of event
if (this.eventBuilder.length() > 0) {
@@ -158,23 +157,27 @@ protected void hookOnNext(String line) {
if (matcher.find()) {
this.eventBuilder.append(matcher.group(1).trim()).append("\n");
}
+ upstream().request(1);
}
else if (line.startsWith("id:")) {
var matcher = EVENT_ID_PATTERN.matcher(line);
if (matcher.find()) {
this.currentEventId.set(matcher.group(1).trim());
}
+ upstream().request(1);
}
else if (line.startsWith("event:")) {
var matcher = EVENT_TYPE_PATTERN.matcher(line);
if (matcher.find()) {
this.currentEventType.set(matcher.group(1).trim());
}
+ upstream().request(1);
}
else if (line.startsWith(":")) {
// Ignore comment lines starting with ":"
// This is a no-op, just to skip comments
logger.debug("Ignoring comment line: {}", line);
+ upstream().request(1);
}
else {
// If the response is not successful, emit an error
@@ -220,6 +223,8 @@ static class AggregateSubscriber extends BaseSubscriber {
*/
private ResponseInfo responseInfo;
+ volatile boolean hasRequestedDemand = false;
+
/**
* Creates a new JsonLineSubscriber that will emit parsed JSON-RPC messages.
* @param sink the {@link FluxSink} to emit parsed {@link ResponseEvent} objects
@@ -233,7 +238,13 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink si
@Override
protected void hookOnSubscribe(Subscription subscription) {
- sink.onRequest(subscription::request);
+
+ sink.onRequest(n -> {
+ if (!hasRequestedDemand) {
+ subscription.request(Long.MAX_VALUE);
+ }
+ hasRequestedDemand = true;
+ });
// Register disposal callback to cancel subscription when Flux is disposed
sink.onDispose(subscription::cancel);
@@ -246,8 +257,11 @@ protected void hookOnNext(String line) {
@Override
protected void hookOnComplete() {
- String data = this.eventBuilder.toString();
- this.sink.next(new AggregateResponseEvent(responseInfo, data));
+
+ if (hasRequestedDemand) {
+ String data = this.eventBuilder.toString();
+ this.sink.next(new AggregateResponseEvent(responseInfo, data));
+ }
this.sink.complete();
}
@@ -268,6 +282,8 @@ static class BodilessResponseLineSubscriber extends BaseSubscriber {
private final ResponseInfo responseInfo;
+ volatile boolean hasRequestedDemand = false;
+
public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink sink) {
this.sink = sink;
this.responseInfo = responseInfo;
@@ -277,7 +293,10 @@ public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink {
- subscription.request(n);
+ if (!hasRequestedDemand) {
+ subscription.request(Long.MAX_VALUE);
+ }
+ hasRequestedDemand = true;
});
// Register disposal callback to cancel subscription when Flux is disposed
@@ -288,11 +307,13 @@ protected void hookOnSubscribe(Subscription subscription) {
@Override
protected void hookOnComplete() {
- // emit dummy event to be able to inspect the response info
- // this is a shortcut allowing for a more streamlined processing using
- // operator composition instead of having to deal with the CompletableFuture
- // along the Subscriber for inspecting the result
- this.sink.next(new DummyEvent(responseInfo));
+ if (hasRequestedDemand) {
+ // emit dummy event to be able to inspect the response info
+ // this is a shortcut allowing for a more streamlined processing using
+ // operator composition instead of having to deal with the
+ // CompletableFuture along the Subscriber for inspecting the result
+ this.sink.next(new DummyEvent(responseInfo));
+ }
this.sink.complete();
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ServerParameters.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ServerParameters.java
similarity index 100%
rename from mcp/src/main/java/io/modelcontextprotocol/client/transport/ServerParameters.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ServerParameters.java
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java
similarity index 92%
rename from mcp/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java
index 009d415e0..1b4eaca97 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java
@@ -15,8 +15,8 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.McpJsonMapper;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
@@ -48,7 +48,7 @@ public class StdioClientTransport implements McpClientTransport {
/** The server process being communicated with */
private Process process;
- private ObjectMapper objectMapper;
+ private McpJsonMapper jsonMapper;
/** Scheduler for handling inbound messages from the server process */
private Scheduler inboundScheduler;
@@ -70,29 +70,20 @@ public class StdioClientTransport implements McpClientTransport {
private Consumer stdErrorHandler = error -> logger.info("STDERR Message received: {}", error);
/**
- * Creates a new StdioClientTransport with the specified parameters and default
- * ObjectMapper.
+ * Creates a new StdioClientTransport with the specified parameters and JsonMapper.
* @param params The parameters for configuring the server process
+ * @param jsonMapper The JsonMapper to use for JSON serialization/deserialization
*/
- public StdioClientTransport(ServerParameters params) {
- this(params, new ObjectMapper());
- }
-
- /**
- * Creates a new StdioClientTransport with the specified parameters and ObjectMapper.
- * @param params The parameters for configuring the server process
- * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
- */
- public StdioClientTransport(ServerParameters params, ObjectMapper objectMapper) {
+ public StdioClientTransport(ServerParameters params, McpJsonMapper jsonMapper) {
Assert.notNull(params, "The params can not be null");
- Assert.notNull(objectMapper, "The ObjectMapper can not be null");
+ Assert.notNull(jsonMapper, "The JsonMapper can not be null");
this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
this.params = params;
- this.objectMapper = objectMapper;
+ this.jsonMapper = jsonMapper;
this.errorSink = Sinks.many().unicast().onBackpressureBuffer();
@@ -259,7 +250,7 @@ private void startInboundProcessing() {
String line;
while (!isClosing && (line = processReader.readLine()) != null) {
try {
- JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, line);
+ JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, line);
if (!this.inboundSink.tryEmitNext(message).isSuccess()) {
if (!isClosing) {
logger.error("Failed to enqueue inbound message: {}", message);
@@ -300,7 +291,7 @@ private void startOutboundProcessing() {
.handle((message, s) -> {
if (message != null && !isClosing) {
try {
- String jsonMessage = objectMapper.writeValueAsString(message);
+ String jsonMessage = jsonMapper.writeValueAsString(message);
// Escape any embedded newlines in the JSON message as per spec:
// https://spec.modelcontextprotocol.io/specification/basic/transports/#stdio
// - Messages are delimited by newlines, and MUST NOT contain
@@ -392,8 +383,8 @@ public Sinks.Many getErrorSink() {
}
@Override
- public T unmarshalFrom(Object data, TypeReference typeRef) {
- return this.objectMapper.convertValue(data, typeRef);
+ public T unmarshalFrom(Object data, TypeRef typeRef) {
+ return this.jsonMapper.convertValue(data, typeRef);
}
}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/DelegatingMcpAsyncHttpClientRequestCustomizer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/DelegatingMcpAsyncHttpClientRequestCustomizer.java
new file mode 100644
index 000000000..2492efe18
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/DelegatingMcpAsyncHttpClientRequestCustomizer.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+package io.modelcontextprotocol.client.transport.customizer;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+import java.util.List;
+
+import org.reactivestreams.Publisher;
+
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.util.Assert;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Composable {@link McpAsyncHttpClientRequestCustomizer} that applies multiple
+ * customizers, in order.
+ *
+ * @author Daniel Garnier-Moiroux
+ */
+public class DelegatingMcpAsyncHttpClientRequestCustomizer implements McpAsyncHttpClientRequestCustomizer {
+
+ private final List customizers;
+
+ public DelegatingMcpAsyncHttpClientRequestCustomizer(List customizers) {
+ Assert.notNull(customizers, "Customizers must not be null");
+ this.customizers = customizers;
+ }
+
+ @Override
+ public Publisher customize(HttpRequest.Builder builder, String method, URI endpoint,
+ String body, McpTransportContext context) {
+ var result = Mono.just(builder);
+ for (var customizer : this.customizers) {
+ result = result.flatMap(b -> Mono.from(customizer.customize(b, method, endpoint, body, context)));
+ }
+ return result;
+ }
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/DelegatingMcpSyncHttpClientRequestCustomizer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/DelegatingMcpSyncHttpClientRequestCustomizer.java
new file mode 100644
index 000000000..e627e7e69
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/DelegatingMcpSyncHttpClientRequestCustomizer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport.customizer;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+import java.util.List;
+
+import io.modelcontextprotocol.common.McpTransportContext;
+import io.modelcontextprotocol.util.Assert;
+
+/**
+ * Composable {@link McpSyncHttpClientRequestCustomizer} that applies multiple
+ * customizers, in order.
+ *
+ * @author Daniel Garnier-Moiroux
+ */
+public class DelegatingMcpSyncHttpClientRequestCustomizer implements McpSyncHttpClientRequestCustomizer {
+
+ private final List delegates;
+
+ public DelegatingMcpSyncHttpClientRequestCustomizer(List customizers) {
+ Assert.notNull(customizers, "Customizers must not be null");
+ this.delegates = customizers;
+ }
+
+ @Override
+ public void customize(HttpRequest.Builder builder, String method, URI endpoint, String body,
+ McpTransportContext context) {
+ this.delegates.forEach(delegate -> delegate.customize(builder, method, endpoint, body, context));
+ }
+
+}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/AsyncHttpRequestCustomizer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpAsyncHttpClientRequestCustomizer.java
similarity index 62%
rename from mcp/src/main/java/io/modelcontextprotocol/client/transport/AsyncHttpRequestCustomizer.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpAsyncHttpClientRequestCustomizer.java
index dee026d96..756b39c35 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/AsyncHttpRequestCustomizer.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpAsyncHttpClientRequestCustomizer.java
@@ -2,15 +2,18 @@
* Copyright 2024-2025 the original author or authors.
*/
-package io.modelcontextprotocol.client.transport;
+package io.modelcontextprotocol.client.transport.customizer;
import java.net.URI;
import java.net.http.HttpRequest;
+
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
+import io.modelcontextprotocol.common.McpTransportContext;
+
/**
* Customize {@link HttpRequest.Builder} before executing the request, in either SSE or
* Streamable HTTP transport.
@@ -19,12 +22,12 @@
*
* @author Daniel Garnier-Moiroux
*/
-public interface AsyncHttpRequestCustomizer {
+public interface McpAsyncHttpClientRequestCustomizer {
Publisher customize(HttpRequest.Builder builder, String method, URI endpoint,
- @Nullable String body);
+ @Nullable String body, McpTransportContext context);
- AsyncHttpRequestCustomizer NOOP = new Noop();
+ McpAsyncHttpClientRequestCustomizer NOOP = new Noop();
/**
* Wrap a sync implementation in an async wrapper.
@@ -32,18 +35,18 @@ Publisher customize(HttpRequest.Builder builder, String met
* Do NOT wrap a blocking implementation for use in a non-blocking context. For a
* blocking implementation, consider using {@link Schedulers#boundedElastic()}.
*/
- static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
- return (builder, method, uri, body) -> Mono.fromSupplier(() -> {
- customizer.customize(builder, method, uri, body);
+ static McpAsyncHttpClientRequestCustomizer fromSync(McpSyncHttpClientRequestCustomizer customizer) {
+ return (builder, method, uri, body, context) -> Mono.fromSupplier(() -> {
+ customizer.customize(builder, method, uri, body, context);
return builder;
});
}
- class Noop implements AsyncHttpRequestCustomizer {
+ class Noop implements McpAsyncHttpClientRequestCustomizer {
@Override
public Publisher customize(HttpRequest.Builder builder, String method, URI endpoint,
- String body) {
+ String body, McpTransportContext context) {
return Mono.just(builder);
}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpSyncHttpClientRequestCustomizer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpSyncHttpClientRequestCustomizer.java
new file mode 100644
index 000000000..e22e3aa62
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpSyncHttpClientRequestCustomizer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.client.transport.customizer;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+
+import reactor.util.annotation.Nullable;
+
+import io.modelcontextprotocol.client.McpClient.SyncSpec;
+import io.modelcontextprotocol.common.McpTransportContext;
+
+/**
+ * Customize {@link HttpRequest.Builder} before executing the request, either in SSE or
+ * Streamable HTTP transport. Do not rely on thread-locals in this implementation, instead
+ * use {@link SyncSpec#transportContextProvider} to extract context, and then consume it
+ * through {@link McpTransportContext}.
+ *
+ * @author Daniel Garnier-Moiroux
+ */
+public interface McpSyncHttpClientRequestCustomizer {
+
+ void customize(HttpRequest.Builder builder, String method, URI endpoint, @Nullable String body,
+ McpTransportContext context);
+
+}
diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/common/DefaultMcpTransportContext.java b/mcp-core/src/main/java/io/modelcontextprotocol/common/DefaultMcpTransportContext.java
new file mode 100644
index 000000000..cde637b15
--- /dev/null
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/common/DefaultMcpTransportContext.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2024-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.common;
+
+import java.util.Map;
+
+import io.modelcontextprotocol.util.Assert;
+
+/**
+ * Default implementation for {@link McpTransportContext} which uses a map as storage.
+ *
+ * @author Dariusz Jędrzejczyk
+ * @author Daniel Garnier-Moiroux
+ */
+class DefaultMcpTransportContext implements McpTransportContext {
+
+ private final Map metadata;
+
+ DefaultMcpTransportContext(Map metadata) {
+ Assert.notNull(metadata, "The metadata cannot be null");
+ this.metadata = metadata;
+ }
+
+ @Override
+ public Object get(String key) {
+ return this.metadata.get(key);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DefaultMcpTransportContext that = (DefaultMcpTransportContext) o;
+ return this.metadata.equals(that.metadata);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.metadata.hashCode();
+ }
+
+}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpTransportContext.java b/mcp-core/src/main/java/io/modelcontextprotocol/common/McpTransportContext.java
similarity index 68%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpTransportContext.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/common/McpTransportContext.java
index 1cd540f72..46a2ccf84 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpTransportContext.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/common/McpTransportContext.java
@@ -2,9 +2,10 @@
* Copyright 2024-2025 the original author or authors.
*/
-package io.modelcontextprotocol.server;
+package io.modelcontextprotocol.common;
import java.util.Collections;
+import java.util.Map;
/**
* Context associated with the transport layer. It allows to add transport-level metadata
@@ -26,6 +27,15 @@ public interface McpTransportContext {
@SuppressWarnings("unchecked")
McpTransportContext EMPTY = new DefaultMcpTransportContext(Collections.EMPTY_MAP);
+ /**
+ * Create an unmodifiable context containing the given metadata.
+ * @param metadata the transport metadata
+ * @return the context containing the metadata
+ */
+ static McpTransportContext create(Map metadata) {
+ return new DefaultMcpTransportContext(metadata);
+ }
+
/**
* Extract a value from the context.
* @param key the key under the data is expected
@@ -33,18 +43,4 @@ public interface McpTransportContext {
*/
Object get(String key);
- /**
- * Inserts a value for a given key.
- * @param key a String representing the key
- * @param value the value to store
- */
- void put(String key, Object value);
-
- /**
- * Copies the contents of the context to allow further modifications without affecting
- * the initial object.
- * @return a new instance with the underlying storage copied.
- */
- McpTransportContext copy();
-
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java
similarity index 97%
rename from mcp/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java
index 2df3514b6..d1b55f594 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/DefaultMcpStatelessServerHandler.java
@@ -4,6 +4,7 @@
package io.modelcontextprotocol.server;
+import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import org.slf4j.Logger;
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
similarity index 68%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
index a51c2e36c..23285d514 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
@@ -5,7 +5,6 @@
package io.modelcontextprotocol.server;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -15,34 +14,37 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiFunction;
+import io.modelcontextprotocol.json.McpJsonMapper;
+import io.modelcontextprotocol.json.TypeRef;
+import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
import io.modelcontextprotocol.spec.DefaultMcpStreamableServerSessionFactory;
-import io.modelcontextprotocol.spec.McpServerTransportProviderBase;
-import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import io.modelcontextprotocol.spec.JsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientSession;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
+import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion;
+import io.modelcontextprotocol.spec.McpSchema.ErrorCodes;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
-import io.modelcontextprotocol.spec.McpSchema.ResourceTemplate;
+import io.modelcontextprotocol.spec.McpSchema.PromptReference;
+import io.modelcontextprotocol.spec.McpSchema.ResourceReference;
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
import io.modelcontextprotocol.spec.McpSchema.Tool;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
+import io.modelcontextprotocol.spec.McpServerTransportProviderBase;
+import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
-import io.modelcontextprotocol.util.DeafaultMcpUriTemplateManagerFactory;
+import io.modelcontextprotocol.util.DefaultMcpUriTemplateManagerFactory;
import io.modelcontextprotocol.util.McpUriTemplateManagerFactory;
import io.modelcontextprotocol.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import static io.modelcontextprotocol.spec.McpError.RESOURCE_NOT_FOUND;
+
/**
* The Model Context Protocol (MCP) server implementation that provides asynchronous
* communication using Project Reactor's Mono and Flux types.
@@ -91,7 +93,7 @@ public class McpAsyncServer {
private final McpServerTransportProviderBase mcpTransportProvider;
- private final ObjectMapper objectMapper;
+ private final McpJsonMapper jsonMapper;
private final JsonSchemaValidator jsonSchemaValidator;
@@ -103,10 +105,10 @@ public class McpAsyncServer {
private final CopyOnWriteArrayList tools = new CopyOnWriteArrayList<>();
- private final CopyOnWriteArrayList resourceTemplates = new CopyOnWriteArrayList<>();
-
private final ConcurrentHashMap resources = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap resourceTemplates = new ConcurrentHashMap<>();
+
private final ConcurrentHashMap prompts = new ConcurrentHashMap<>();
// FIXME: this field is deprecated and should be remvoed together with the
@@ -117,26 +119,26 @@ public class McpAsyncServer {
private List protocolVersions;
- private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DeafaultMcpUriTemplateManagerFactory();
+ private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory();
/**
* Create a new McpAsyncServer with the given transport provider and capabilities.
* @param mcpTransportProvider The transport layer implementation for MCP
* communication.
* @param features The MCP server supported features.
- * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
+ * @param jsonMapper The JsonMapper to use for JSON serialization/deserialization
*/
- McpAsyncServer(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
+ McpAsyncServer(McpServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
- this.objectMapper = objectMapper;
+ this.jsonMapper = jsonMapper;
this.serverInfo = features.serverInfo();
this.serverCapabilities = features.serverCapabilities().mutate().logging().build();
this.instructions = features.instructions();
this.tools.addAll(withStructuredOutputHandling(jsonSchemaValidator, features.tools()));
this.resources.putAll(features.resources());
- this.resourceTemplates.addAll(features.resourceTemplates());
+ this.resourceTemplates.putAll(features.resourceTemplates());
this.prompts.putAll(features.prompts());
this.completions.putAll(features.completions());
this.uriTemplateManagerFactory = uriTemplateManagerFactory;
@@ -151,17 +153,17 @@ public class McpAsyncServer {
requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}
- McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
+ McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
- this.objectMapper = objectMapper;
+ this.jsonMapper = jsonMapper;
this.serverInfo = features.serverInfo();
this.serverCapabilities = features.serverCapabilities().mutate().logging().build();
this.instructions = features.instructions();
this.tools.addAll(withStructuredOutputHandling(jsonSchemaValidator, features.tools()));
this.resources.putAll(features.resources());
- this.resourceTemplates.addAll(features.resourceTemplates());
+ this.resourceTemplates.putAll(features.resourceTemplates());
this.prompts.putAll(features.prompts());
this.completions.putAll(features.completions());
this.uriTemplateManagerFactory = uriTemplateManagerFactory;
@@ -319,25 +321,24 @@ private McpNotificationHandler asyncRootsListChangedNotificationHandler(
*/
public Mono addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {
if (toolSpecification == null) {
- return Mono.error(new McpError("Tool specification must not be null"));
+ return Mono.error(new IllegalArgumentException("Tool specification must not be null"));
}
if (toolSpecification.tool() == null) {
- return Mono.error(new McpError("Tool must not be null"));
+ return Mono.error(new IllegalArgumentException("Tool must not be null"));
}
if (toolSpecification.call() == null && toolSpecification.callHandler() == null) {
- return Mono.error(new McpError("Tool call handler must not be null"));
+ return Mono.error(new IllegalArgumentException("Tool call handler must not be null"));
}
if (this.serverCapabilities.tools() == null) {
- return Mono.error(new McpError("Server must be configured with tool capabilities"));
+ return Mono.error(new IllegalStateException("Server must be configured with tool capabilities"));
}
var wrappedToolSpecification = withStructuredOutputHandling(this.jsonSchemaValidator, toolSpecification);
return Mono.defer(() -> {
- // Check for duplicate tool names
- if (this.tools.stream().anyMatch(th -> th.tool().name().equals(wrappedToolSpecification.tool().name()))) {
- return Mono.error(
- new McpError("Tool with name '" + wrappedToolSpecification.tool().name() + "' already exists"));
+ // Remove tools with duplicate tool names first
+ if (this.tools.removeIf(th -> th.tool().name().equals(wrappedToolSpecification.tool().name()))) {
+ logger.warn("Replace existing Tool with name '{}'", wrappedToolSpecification.tool().name());
}
this.tools.add(wrappedToolSpecification);
@@ -376,6 +377,11 @@ public Mono apply(McpAsyncServerExchange exchange, McpSchema.Cal
return this.delegateCallToolResult.apply(exchange, request).map(result -> {
+ if (Boolean.TRUE.equals(result.isError())) {
+ // If the tool call resulted in an error, skip further validation
+ return result;
+ }
+
if (outputSchema == null) {
if (result.structuredContent() != null) {
logger.warn(
@@ -391,11 +397,12 @@ public Mono apply(McpAsyncServerExchange exchange, McpSchema.Cal
// results that conform to this schema.
// https://modelcontextprotocol.io/specification/2025-06-18/server/tools#output-schema
if (result.structuredContent() == null) {
- logger.warn(
- "Response missing structured content which is expected when calling tool with non-empty outputSchema");
- return new CallToolResult(
- "Response missing structured content which is expected when calling tool with non-empty outputSchema",
- true);
+ String content = "Response missing structured content which is expected when calling tool with non-empty outputSchema";
+ logger.warn(content);
+ return CallToolResult.builder()
+ .content(List.of(new McpSchema.TextContent(content)))
+ .isError(true)
+ .build();
}
// Validate the result against the output schema
@@ -403,7 +410,10 @@ public Mono apply(McpAsyncServerExchange exchange, McpSchema.Cal
if (!validation.valid()) {
logger.warn("Tool call result validation failed: {}", validation.errorMessage());
- return new CallToolResult(validation.errorMessage(), true);
+ return CallToolResult.builder()
+ .content(List.of(new McpSchema.TextContent(validation.errorMessage())))
+ .isError(true)
+ .build();
}
if (Utils.isEmpty(result.content())) {
@@ -413,8 +423,11 @@ public Mono apply(McpAsyncServerExchange exchange, McpSchema.Cal
// TextContent block.)
// https://modelcontextprotocol.io/specification/2025-06-18/server/tools#structured-content
- return new CallToolResult(List.of(new McpSchema.TextContent(validation.jsonStructuredOutput())),
- result.isError(), result.structuredContent());
+ return CallToolResult.builder()
+ .content(List.of(new McpSchema.TextContent(validation.jsonStructuredOutput())))
+ .isError(result.isError())
+ .structuredContent(result.structuredContent())
+ .build();
}
return result;
@@ -453,6 +466,14 @@ private static McpServerFeatures.AsyncToolSpecification withStructuredOutputHand
.build();
}
+ /**
+ * List all registered tools.
+ * @return A Flux stream of all registered tools
+ */
+ public Flux listTools() {
+ return Flux.fromIterable(this.tools).map(McpServerFeatures.AsyncToolSpecification::tool);
+ }
+
/**
* Remove a tool handler at runtime.
* @param toolName The name of the tool handler to remove
@@ -460,23 +481,25 @@ private static McpServerFeatures.AsyncToolSpecification withStructuredOutputHand
*/
public Mono removeTool(String toolName) {
if (toolName == null) {
- return Mono.error(new McpError("Tool name must not be null"));
+ return Mono.error(new IllegalArgumentException("Tool name must not be null"));
}
if (this.serverCapabilities.tools() == null) {
- return Mono.error(new McpError("Server must be configured with tool capabilities"));
+ return Mono.error(new IllegalStateException("Server must be configured with tool capabilities"));
}
return Mono.defer(() -> {
- boolean removed = this.tools
- .removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName));
- if (removed) {
+ if (this.tools.removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName))) {
+
logger.debug("Removed tool handler: {}", toolName);
if (this.serverCapabilities.tools().listChanged()) {
return notifyToolsListChanged();
}
- return Mono.empty();
}
- return Mono.error(new McpError("Tool with name '" + toolName + "' not found"));
+ else {
+ logger.warn("Ignore as a Tool with name '{}' not found", toolName);
+ }
+
+ return Mono.empty();
});
}
@@ -498,8 +521,8 @@ private McpRequestHandler toolsListRequestHandler() {
private McpRequestHandler toolsCallRequestHandler() {
return (exchange, params) -> {
- McpSchema.CallToolRequest callToolRequest = objectMapper.convertValue(params,
- new TypeReference() {
+ McpSchema.CallToolRequest callToolRequest = jsonMapper.convertValue(params,
+ new TypeRef() {
});
Optional toolSpecification = this.tools.stream()
@@ -507,11 +530,13 @@ private McpRequestHandler toolsCallRequestHandler() {
.findAny();
if (toolSpecification.isEmpty()) {
- return Mono.error(new McpError("Tool not found: " + callToolRequest.name()));
+ return Mono.error(McpError.builder(McpSchema.ErrorCodes.INVALID_PARAMS)
+ .message("Unknown tool: invalid_tool_name")
+ .data("Tool not found: " + callToolRequest.name())
+ .build());
}
- return toolSpecification.map(tool -> Mono.defer(() -> tool.callHandler().apply(exchange, callToolRequest)))
- .orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name())));
+ return toolSpecification.get().callHandler().apply(exchange, callToolRequest);
};
}
@@ -526,19 +551,22 @@ private McpRequestHandler toolsCallRequestHandler() {
*/
public Mono addResource(McpServerFeatures.AsyncResourceSpecification resourceSpecification) {
if (resourceSpecification == null || resourceSpecification.resource() == null) {
- return Mono.error(new McpError("Resource must not be null"));
+ return Mono.error(new IllegalArgumentException("Resource must not be null"));
}
if (this.serverCapabilities.resources() == null) {
- return Mono.error(new McpError("Server must be configured with resource capabilities"));
+ return Mono.error(new IllegalStateException(
+ "Server must be configured with resource capabilities to allow adding resources"));
}
return Mono.defer(() -> {
- if (this.resources.putIfAbsent(resourceSpecification.resource().uri(), resourceSpecification) != null) {
- return Mono.error(new McpError(
- "Resource with URI '" + resourceSpecification.resource().uri() + "' already exists"));
+ var previous = this.resources.put(resourceSpecification.resource().uri(), resourceSpecification);
+ if (previous != null) {
+ logger.warn("Replace existing Resource with URI '{}'", resourceSpecification.resource().uri());
+ }
+ else {
+ logger.debug("Added resource handler: {}", resourceSpecification.resource().uri());
}
- logger.debug("Added resource handler: {}", resourceSpecification.resource().uri());
if (this.serverCapabilities.resources().listChanged()) {
return notifyResourcesListChanged();
}
@@ -546,6 +574,14 @@ public Mono addResource(McpServerFeatures.AsyncResourceSpecification resou
});
}
+ /**
+ * List all registered resources.
+ * @return A Flux stream of all registered resources
+ */
+ public Flux listResources() {
+ return Flux.fromIterable(this.resources.values()).map(McpServerFeatures.AsyncResourceSpecification::resource);
+ }
+
/**
* Remove a resource handler at runtime.
* @param resourceUri The URI of the resource handler to remove
@@ -553,10 +589,11 @@ public Mono addResource(McpServerFeatures.AsyncResourceSpecification resou
*/
public Mono removeResource(String resourceUri) {
if (resourceUri == null) {
- return Mono.error(new McpError("Resource URI must not be null"));
+ return Mono.error(new IllegalArgumentException("Resource URI must not be null"));
}
if (this.serverCapabilities.resources() == null) {
- return Mono.error(new McpError("Server must be configured with resource capabilities"));
+ return Mono.error(new IllegalStateException(
+ "Server must be configured with resource capabilities to allow removing resources"));
}
return Mono.defer(() -> {
@@ -568,7 +605,74 @@ public Mono removeResource(String resourceUri) {
}
return Mono.empty();
}
- return Mono.error(new McpError("Resource with URI '" + resourceUri + "' not found"));
+ else {
+ logger.warn("Ignore as a Resource with URI '{}' not found", resourceUri);
+ }
+ return Mono.empty();
+ });
+ }
+
+ /**
+ * Add a new resource template at runtime.
+ * @param resourceTemplateSpecification The resource template to add
+ * @return Mono that completes when clients have been notified of the change
+ */
+ public Mono addResourceTemplate(
+ McpServerFeatures.AsyncResourceTemplateSpecification resourceTemplateSpecification) {
+
+ if (this.serverCapabilities.resources() == null) {
+ return Mono.error(new IllegalStateException(
+ "Server must be configured with resource capabilities to allow adding resource templates"));
+ }
+
+ return Mono.defer(() -> {
+ var previous = this.resourceTemplates.put(resourceTemplateSpecification.resourceTemplate().uriTemplate(),
+ resourceTemplateSpecification);
+ if (previous != null) {
+ logger.warn("Replace existing Resource Template with URI '{}'",
+ resourceTemplateSpecification.resourceTemplate().uriTemplate());
+ }
+ else {
+ logger.debug("Added resource template handler: {}",
+ resourceTemplateSpecification.resourceTemplate().uriTemplate());
+ }
+ if (this.serverCapabilities.resources().listChanged()) {
+ return notifyResourcesListChanged();
+ }
+ return Mono.empty();
+ });
+ }
+
+ /**
+ * List all registered resource templates.
+ * @return A Flux stream of all registered resource templates
+ */
+ public Flux listResourceTemplates() {
+ return Flux.fromIterable(this.resourceTemplates.values())
+ .map(McpServerFeatures.AsyncResourceTemplateSpecification::resourceTemplate);
+ }
+
+ /**
+ * Remove a resource template at runtime.
+ * @param uriTemplate The URI template of the resource template to remove
+ * @return Mono that completes when clients have been notified of the change
+ */
+ public Mono removeResourceTemplate(String uriTemplate) {
+
+ if (this.serverCapabilities.resources() == null) {
+ return Mono.error(new IllegalStateException(
+ "Server must be configured with resource capabilities to allow removing resource templates"));
+ }
+
+ return Mono.defer(() -> {
+ McpServerFeatures.AsyncResourceTemplateSpecification removed = this.resourceTemplates.remove(uriTemplate);
+ if (removed != null) {
+ logger.debug("Removed resource template: {}", uriTemplate);
+ }
+ else {
+ logger.warn("Ignore as a Resource Template with URI '{}' not found", uriTemplate);
+ }
+ return Mono.empty();
});
}
@@ -600,46 +704,50 @@ private McpRequestHandler resourcesListRequestHan
}
private McpRequestHandler resourceTemplateListRequestHandler() {
- return (exchange, params) -> Mono
- .just(new McpSchema.ListResourceTemplatesResult(this.getResourceTemplates(), null));
-
+ return (exchange, params) -> {
+ var resourceList = this.resourceTemplates.values()
+ .stream()
+ .map(McpServerFeatures.AsyncResourceTemplateSpecification::resourceTemplate)
+ .toList();
+ return Mono.just(new McpSchema.ListResourceTemplatesResult(resourceList, null));
+ };
}
- private List getResourceTemplates() {
- var list = new ArrayList<>(this.resourceTemplates);
- List resourceTemplates = this.resources.keySet()
- .stream()
- .filter(uri -> uri.contains("{"))
- .map(uri -> {
- var resource = this.resources.get(uri).resource();
- var template = new McpSchema.ResourceTemplate(resource.uri(), resource.name(), resource.title(),
- resource.description(), resource.mimeType(), resource.annotations());
- return template;
- })
- .toList();
+ private McpRequestHandler resourcesReadRequestHandler() {
+ return (ex, params) -> {
+ McpSchema.ReadResourceRequest resourceRequest = jsonMapper.convertValue(params, new TypeRef<>() {
+ });
- list.addAll(resourceTemplates);
+ var resourceUri = resourceRequest.uri();
- return list;
+ // First try to find a static resource specification
+ // Static resources have exact URIs
+ return this.findResourceSpecification(resourceUri)
+ .map(spec -> spec.readHandler().apply(ex, resourceRequest))
+ .orElseGet(() -> {
+ // If not found, try to find a dynamic resource specification
+ // Dynamic resources have URI templates
+ return this.findResourceTemplateSpecification(resourceUri)
+ .map(spec -> spec.readHandler().apply(ex, resourceRequest))
+ .orElseGet(() -> Mono.error(RESOURCE_NOT_FOUND.apply(resourceUri)));
+ });
+ };
}
- private McpRequestHandler resourcesReadRequestHandler() {
- return (exchange, params) -> {
- McpSchema.ReadResourceRequest resourceRequest = objectMapper.convertValue(params,
- new TypeReference() {
- });
- var resourceUri = resourceRequest.uri();
-
- McpServerFeatures.AsyncResourceSpecification specification = this.resources.values()
- .stream()
- .filter(resourceSpecification -> this.uriTemplateManagerFactory
- .create(resourceSpecification.resource().uri())
- .matches(resourceUri))
- .findFirst()
- .orElseThrow(() -> new McpError("Resource not found: " + resourceUri));
+ private Optional findResourceSpecification(String uri) {
+ var result = this.resources.values()
+ .stream()
+ .filter(spec -> this.uriTemplateManagerFactory.create(spec.resource().uri()).matches(uri))
+ .findFirst();
+ return result;
+ }
- return Mono.defer(() -> specification.readHandler().apply(exchange, resourceRequest));
- };
+ private Optional findResourceTemplateSpecification(
+ String uri) {
+ return this.resourceTemplates.values()
+ .stream()
+ .filter(spec -> this.uriTemplateManagerFactory.create(spec.resourceTemplate().uriTemplate()).matches(uri))
+ .findFirst();
}
// ---------------------------------------
@@ -653,32 +761,36 @@ private McpRequestHandler resourcesReadRequestHand
*/
public Mono addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpecification) {
if (promptSpecification == null) {
- return Mono.error(new McpError("Prompt specification must not be null"));
+ return Mono.error(new IllegalArgumentException("Prompt specification must not be null"));
}
if (this.serverCapabilities.prompts() == null) {
- return Mono.error(new McpError("Server must be configured with prompt capabilities"));
+ return Mono.error(new IllegalStateException("Server must be configured with prompt capabilities"));
}
return Mono.defer(() -> {
- McpServerFeatures.AsyncPromptSpecification specification = this.prompts
- .putIfAbsent(promptSpecification.prompt().name(), promptSpecification);
- if (specification != null) {
- return Mono.error(
- new McpError("Prompt with name '" + promptSpecification.prompt().name() + "' already exists"));
+ var previous = this.prompts.put(promptSpecification.prompt().name(), promptSpecification);
+ if (previous != null) {
+ logger.warn("Replace existing Prompt with name '{}'", promptSpecification.prompt().name());
+ }
+ else {
+ logger.debug("Added prompt handler: {}", promptSpecification.prompt().name());
}
-
- logger.debug("Added prompt handler: {}", promptSpecification.prompt().name());
-
- // Servers that declared the listChanged capability SHOULD send a
- // notification,
- // when the list of available prompts changes
if (this.serverCapabilities.prompts().listChanged()) {
- return notifyPromptsListChanged();
+ return this.notifyPromptsListChanged();
}
+
return Mono.empty();
});
}
+ /**
+ * List all registered prompts.
+ * @return A Flux stream of all registered prompts
+ */
+ public Flux listPrompts() {
+ return Flux.fromIterable(this.prompts.values()).map(McpServerFeatures.AsyncPromptSpecification::prompt);
+ }
+
/**
* Remove a prompt handler at runtime.
* @param promptName The name of the prompt handler to remove
@@ -686,10 +798,10 @@ public Mono addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpe
*/
public Mono removePrompt(String promptName) {
if (promptName == null) {
- return Mono.error(new McpError("Prompt name must not be null"));
+ return Mono.error(new IllegalArgumentException("Prompt name must not be null"));
}
if (this.serverCapabilities.prompts() == null) {
- return Mono.error(new McpError("Server must be configured with prompt capabilities"));
+ return Mono.error(new IllegalStateException("Server must be configured with prompt capabilities"));
}
return Mono.defer(() -> {
@@ -697,14 +809,15 @@ public Mono removePrompt(String promptName) {
if (removed != null) {
logger.debug("Removed prompt handler: {}", promptName);
- // Servers that declared the listChanged capability SHOULD send a
- // notification, when the list of available prompts changes
if (this.serverCapabilities.prompts().listChanged()) {
return this.notifyPromptsListChanged();
}
return Mono.empty();
}
- return Mono.error(new McpError("Prompt with name '" + promptName + "' not found"));
+ else {
+ logger.warn("Ignore as a Prompt with name '{}' not found", promptName);
+ }
+ return Mono.empty();
});
}
@@ -734,14 +847,18 @@ private McpRequestHandler promptsListRequestHandler
private McpRequestHandler promptsGetRequestHandler() {
return (exchange, params) -> {
- McpSchema.GetPromptRequest promptRequest = objectMapper.convertValue(params,
- new TypeReference() {
+ McpSchema.GetPromptRequest promptRequest = jsonMapper.convertValue(params,
+ new TypeRef() {
});
// Implement prompt retrieval logic here
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
+
if (specification == null) {
- return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
+ return Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
+ .message("Invalid prompt name")
+ .data("Prompt not found: " + promptRequest.name())
+ .build());
}
return Mono.defer(() -> specification.promptHandler().apply(exchange, promptRequest));
@@ -782,9 +899,8 @@ private McpRequestHandler setLoggerRequestHandler() {
return (exchange, params) -> {
return Mono.defer(() -> {
- SetLevelRequest newMinLoggingLevel = objectMapper.convertValue(params,
- new TypeReference() {
- });
+ SetLevelRequest newMinLoggingLevel = jsonMapper.convertValue(params, new TypeRef() {
+ });
exchange.setMinLoggingLevel(newMinLoggingLevel.level());
@@ -797,27 +913,38 @@ private McpRequestHandler setLoggerRequestHandler() {
};
}
+ private static final Mono EMPTY_COMPLETION_RESULT = Mono
+ .just(new McpSchema.CompleteResult(new CompleteCompletion(List.of(), 0, false)));
+
private McpRequestHandler completionCompleteRequestHandler() {
return (exchange, params) -> {
+
McpSchema.CompleteRequest request = parseCompletionParams(params);
if (request.ref() == null) {
- return Mono.error(new McpError("ref must not be null"));
+ return Mono.error(
+ McpError.builder(ErrorCodes.INVALID_PARAMS).message("Completion ref must not be null").build());
}
if (request.ref().type() == null) {
- return Mono.error(new McpError("type must not be null"));
+ return Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
+ .message("Completion ref type must not be null")
+ .build());
}
String type = request.ref().type();
String argumentName = request.argument().name();
- // check if the referenced resource exists
- if (type.equals("ref/prompt") && request.ref() instanceof McpSchema.PromptReference promptReference) {
+ // Check if valid a Prompt exists for this completion request
+ if (type.equals(PromptReference.TYPE)
+ && request.ref() instanceof McpSchema.PromptReference promptReference) {
+
McpServerFeatures.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
if (promptSpec == null) {
- return Mono.error(new McpError("Prompt not found: " + promptReference.name()));
+ return Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
+ .message("Prompt not found: " + promptReference.name())
+ .build());
}
if (!promptSpec.prompt()
.arguments()
@@ -826,27 +953,67 @@ private McpRequestHandler completionCompleteRequestHan
.findFirst()
.isPresent()) {
- return Mono.error(new McpError("Argument not found: " + argumentName));
+ logger.warn("Argument not found: {} in prompt: {}", argumentName, promptReference.name());
+
+ return EMPTY_COMPLETION_RESULT;
}
}
- if (type.equals("ref/resource") && request.ref() instanceof McpSchema.ResourceReference resourceReference) {
- McpServerFeatures.AsyncResourceSpecification resourceSpec = this.resources.get(resourceReference.uri());
- if (resourceSpec == null) {
- return Mono.error(new McpError("Resource not found: " + resourceReference.uri()));
- }
- if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri())
- .getVariableNames()
- .contains(argumentName)) {
- return Mono.error(new McpError("Argument not found: " + argumentName));
+ // Check if valid Resource or ResourceTemplate exists for this completion
+ // request
+ if (type.equals(ResourceReference.TYPE)
+ && request.ref() instanceof McpSchema.ResourceReference resourceReference) {
+
+ var uriTemplateManager = uriTemplateManagerFactory.create(resourceReference.uri());
+
+ if (!uriTemplateManager.isUriTemplate(resourceReference.uri())) {
+ // Attempting to autocomplete a fixed resource URI is not an error in
+ // the spec (but probably should be).
+ return EMPTY_COMPLETION_RESULT;
}
+ McpServerFeatures.AsyncResourceSpecification resourceSpec = this
+ .findResourceSpecification(resourceReference.uri())
+ .orElse(null);
+
+ if (resourceSpec != null) {
+ if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri())
+ .getVariableNames()
+ .contains(argumentName)) {
+
+ return Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
+ .message("Argument not found: " + argumentName + " in resource: " + resourceReference.uri())
+ .build());
+ }
+ }
+ else {
+ var templateSpec = this.findResourceTemplateSpecification(resourceReference.uri()).orElse(null);
+ if (templateSpec != null) {
+
+ if (!uriTemplateManagerFactory.create(templateSpec.resourceTemplate().uriTemplate())
+ .getVariableNames()
+ .contains(argumentName)) {
+
+ return Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
+ .message("Argument not found: " + argumentName + " in resource template: "
+ + resourceReference.uri())
+ .build());
+ }
+ }
+ else {
+ return Mono.error(RESOURCE_NOT_FOUND.apply(resourceReference.uri()));
+ }
+ }
}
+ // Handle the completion request using the registered handler
+ // for the given reference.
McpServerFeatures.AsyncCompletionSpecification specification = this.completions.get(request.ref());
if (specification == null) {
- return Mono.error(new McpError("AsyncCompletionSpecification not found: " + request.ref()));
+ return Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
+ .message("AsyncCompletionSpecification not found: " + request.ref())
+ .build());
}
return Mono.defer(() -> specification.completionHandler().apply(exchange, request));
@@ -877,9 +1044,9 @@ private McpSchema.CompleteRequest parseCompletionParams(Object object) {
String refType = (String) refMap.get("type");
McpSchema.CompleteReference ref = switch (refType) {
- case "ref/prompt" -> new McpSchema.PromptReference(refType, (String) refMap.get("name"),
+ case PromptReference.TYPE -> new McpSchema.PromptReference(refType, (String) refMap.get("name"),
refMap.get("title") != null ? (String) refMap.get("title") : null);
- case "ref/resource" -> new McpSchema.ResourceReference(refType, (String) refMap.get("uri"));
+ case ResourceReference.TYPE -> new McpSchema.ResourceReference(refType, (String) refMap.get("uri"));
default -> throw new IllegalArgumentException("Invalid ref type: " + refType);
};
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
similarity index 95%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
index 61d60bacc..a15c58cd5 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java
@@ -4,10 +4,11 @@
package io.modelcontextprotocol.server;
+import io.modelcontextprotocol.common.McpTransportContext;
import java.util.ArrayList;
import java.util.Collections;
-import com.fasterxml.jackson.core.type.TypeReference;
+import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpLoggableSession;
import io.modelcontextprotocol.spec.McpSchema;
@@ -36,16 +37,16 @@ public class McpAsyncServerExchange {
private final McpTransportContext transportContext;
- private static final TypeReference CREATE_MESSAGE_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef CREATE_MESSAGE_RESULT_TYPE_REF = new TypeRef<>() {
};
- private static final TypeReference LIST_ROOTS_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef LIST_ROOTS_RESULT_TYPE_REF = new TypeRef<>() {
};
- private static final TypeReference ELICITATION_RESULT_TYPE_REF = new TypeReference<>() {
+ private static final TypeRef ELICITATION_RESULT_TYPE_REF = new TypeRef<>() {
};
- public static final TypeReference OBJECT_TYPE_REF = new TypeReference<>() {
+ public static final TypeRef OBJECT_TYPE_REF = new TypeRef<>() {
};
/**
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpInitRequestHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpInitRequestHandler.java
similarity index 100%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpInitRequestHandler.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/McpInitRequestHandler.java
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpNotificationHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpNotificationHandler.java
similarity index 100%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpNotificationHandler.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/McpNotificationHandler.java
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpRequestHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpRequestHandler.java
similarity index 100%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpRequestHandler.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/McpRequestHandler.java
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpServer.java
similarity index 89%
rename from mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java
rename to mcp-core/src/main/java/io/modelcontextprotocol/server/McpServer.java
index f5dfffffb..fe3125271 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java
+++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpServer.java
@@ -4,6 +4,7 @@
package io.modelcontextprotocol.server;
+import io.modelcontextprotocol.common.McpTransportContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -13,10 +14,9 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.json.McpJsonMapper;
-import io.modelcontextprotocol.spec.DefaultJsonSchemaValidator;
-import io.modelcontextprotocol.spec.JsonSchemaValidator;
+import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
import io.modelcontextprotocol.spec.McpSchema.ResourceTemplate;
@@ -24,7 +24,7 @@
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
-import io.modelcontextprotocol.util.DeafaultMcpUriTemplateManagerFactory;
+import io.modelcontextprotocol.util.DefaultMcpUriTemplateManagerFactory;
import io.modelcontextprotocol.util.McpUriTemplateManagerFactory;
import reactor.core.publisher.Mono;
@@ -66,17 +66,23 @@
* Example of creating a basic synchronous server:
* @param tool The tool definition including name, description, and schema. Must
@@ -584,40 +606,38 @@ public AsyncSpecification resources(McpServerFeatures.AsyncResourceSpecificat
}
/**
- * Sets the resource templates that define patterns for dynamic resource access.
- * Templates use URI patterns with placeholders that can be filled at runtime.
- *
- *
- * Example usage:
{@code
- * .resourceTemplates(
- * new ResourceTemplate("file://{path}", "Access files by path"),
- * new ResourceTemplate("db://{table}/{id}", "Access database records")
- * )
- * }
- * @param resourceTemplates List of resource templates. If null, clears existing
- * templates.
+ * Registers multiple resource templates with their specifications using a List.
+ * This method is useful when resource templates need to be added in bulk from a
+ * collection.
+ * @param resourceTemplates Map of template URI to specification. Must not be
+ * null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if resourceTemplates is null.
- * @see #resourceTemplates(ResourceTemplate...)
*/
- public AsyncSpecification resourceTemplates(List resourceTemplates) {
+ public AsyncSpecification resourceTemplates(
+ List