From d05081bafff8ad9104bb42b694e9eaa983bbac58 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Fri, 19 Dec 2025 22:15:24 -0600 Subject: [PATCH] Add rpc_params_dup_stubs compat flag. This flag changes the Worker RPC behavior to match Cap'n Web: When a stub is passed in the params of an RPC method, we should NOT transfer ownership of the stub. Instead, the stub is dup()ed. The comments explain in more detail why these semantics are superior (and thus why Cap'n Web took them). Additionally, in the case that the params contain an `RpcTarget`, if that target has a `dup()` method, we call it. This specifically fixes an interoperability bug with Cap'n Web: https://github.com/cloudflare/capnweb/issues/110 --- src/workerd/api/tests/BUILD.bazel | 6 + .../api/tests/js-rpc-params-ownership-test.js | 189 ++++++++++++++++++ .../js-rpc-params-ownership-test.wd-test | 19 ++ src/workerd/api/worker-rpc.c++ | 131 +++++++++++- src/workerd/api/worker-rpc.h | 12 +- src/workerd/io/compatibility-date.capnp | 24 +++ 6 files changed, 370 insertions(+), 11 deletions(-) create mode 100644 src/workerd/api/tests/js-rpc-params-ownership-test.js create mode 100644 src/workerd/api/tests/js-rpc-params-ownership-test.wd-test diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 7c8fd802ee8..e7b0e3ee22b 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -275,6 +275,12 @@ wd_test( data = ["js-rpc-test.js"], ) +wd_test( + src = "js-rpc-params-ownership-test.wd-test", + args = ["--experimental"], + data = ["js-rpc-params-ownership-test.js"], +) + wd_test( src = "memory-cache-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/js-rpc-params-ownership-test.js b/src/workerd/api/tests/js-rpc-params-ownership-test.js new file mode 100644 index 00000000000..9fb6e3f0c61 --- /dev/null +++ b/src/workerd/api/tests/js-rpc-params-ownership-test.js @@ -0,0 +1,189 @@ +import assert from 'node:assert'; +import { WorkerEntrypoint, RpcTarget, RpcStub } from 'cloudflare:workers'; + +class Counter extends RpcTarget { + count = { value: 0 }; + dupCounts = { created: 0, disposed: 0 }; + disposeCount = 0; + + increment(amount = 1) { + this.count.value += amount; + return this.count.value; + } + + [Symbol.dispose]() { + ++this.disposeCount; + } +} + +class DupableCounter extends Counter { + disposed = false; + + dup() { + let result = new DupableCounter(); + result.count = this.count; + result.dupCounts = this.dupCounts; + ++this.dupCounts.created; + return result; + } + + [Symbol.dispose]() { + if (this.disposed) { + throw new Error('duplicate disposal'); + } + this.disposed = true; + ++this.dupCounts.disposed; + ++this.disposeCount; + } +} + +export class TestService extends WorkerEntrypoint { + async increment(stub, i) { + await stub.increment(i); + } + + async roundTrip(stub) { + return { stub: stub.dup() }; + } +} + +// Test that (with the rpc_params_dup_stubs compat flag) passing a stub in RPC params doesn't +// transfer ownership of the stub. +export let rpcParamsDontTransferOwnership = { + async test(controller, env, ctx) { + let counter = new Counter(); + + { + using stub = new RpcStub(counter); + + // Use the stub in params twice to prove that ownership isn't transferred away. + await ctx.exports.TestService.increment(stub, 2); + await ctx.exports.TestService.increment(stub, 3); + + // Make extra-sure we can still call the stub. + await stub.increment(); + + assert.strictEqual(counter.count.value, 6); + + // RpcTarget disposer should not have been called at all. + await scheduler.wait(0); + assert.strictEqual(counter.disposeCount, 0); + } + + // Disposing a stub *asynchrconously* disposes the RpcTarget, so we have to spin the event + // loop to observe the disposal. + await scheduler.wait(0); + assert.strictEqual(counter.disposeCount, 1); + }, +}; + +// Test that placing a plain RpcTarget in RPC params DOES "take ownership", that is, the disposer +// will be called. +export let rpcParamsPlainTarget = { + async test(controller, env, ctx) { + let counter = new Counter(); + + await ctx.exports.TestService.increment(counter, 2); + await ctx.exports.TestService.increment(counter, 3); + + assert.strictEqual(counter.count.value, 5); + + // Each RPC invocation will have called the disposer. + await scheduler.wait(0); + assert.strictEqual(counter.disposeCount, 2); + }, +}; + +// Test that placing an RpcTarget with a dup() method in RPC params causes the dup() method to be +// called, and then the duplicate is later disposed. +export let rpcParamsDupTarget = { + async test(controller, env, ctx) { + let counter = new DupableCounter(); + + // If we directly pass `counter` to RPC params, it'll be dup()ed. + await ctx.exports.TestService.increment(counter, 2); + assert.strictEqual(counter.dupCounts.created, 1); + await ctx.exports.TestService.increment(counter, 3); + assert.strictEqual(counter.dupCounts.created, 2); + + assert.strictEqual(counter.count.value, 5); + + // Dups should have been disposed, but not original. + await scheduler.wait(0); + assert.strictEqual(counter.dupCounts.disposed, 2); + assert.strictEqual(counter.disposed, false); + }, +}; + +// Like rpcParamsDupTarget but the target is wrapped in a Proxy. (This takes a different code +// path.) +export let rpcParamsDupProxyTarget = { + async test(controller, env, ctx) { + let counter = new Proxy(new DupableCounter(), {}); + + // If we directly pass `counter` to RPC params, it'll be dup()ed. + await ctx.exports.TestService.increment(counter, 2); + assert.strictEqual(counter.dupCounts.created, 1); + await ctx.exports.TestService.increment(counter, 3); + assert.strictEqual(counter.dupCounts.created, 2); + + assert.strictEqual(counter.count.value, 5); + + // Dups should have been disposed, but not original. + await scheduler.wait(0); + assert.strictEqual(counter.dupCounts.disposed, 2); + assert.strictEqual(counter.disposed, false); + }, +}; + +// Like rpcParamsDupTarget but the target is a function. +export let rpcParamsDupFunction = { + async test(controller, env, ctx) { + let count = 0; + let dupCount = 0; + let disposeCount = 0; + + let increment = (i) => { + return (count += i); + }; + increment.dup = () => { + ++dupCount; + return increment; + }; + increment[Symbol.dispose] = function () { + ++disposeCount; + }; + + let counter = { increment }; + + // If we directly pass `counter` to RPC params, it'll be dup()ed. + await ctx.exports.TestService.increment(counter, 2); + assert.strictEqual(dupCount, 1); + await ctx.exports.TestService.increment(counter, 3); + assert.strictEqual(dupCount, 2); + + assert.strictEqual(count, 5); + + await scheduler.wait(0); + assert.strictEqual(disposeCount, 2); + }, +}; + +// Test that returning a stub tansfers ownership of the stub, that is, the system later disposes +// it. +export let rpcReturnsTransferOwnership = { + async test(controller, env, ctx) { + let counter = new Counter(); + + { + using stub = new RpcStub(counter); + using stub2 = (await ctx.exports.TestService.roundTrip(stub)).stub; + + await scheduler.wait(0); + assert.strictEqual(counter.disposeCount, 0); + } + + await scheduler.wait(0); + assert.strictEqual(counter.disposeCount, 1); + }, +}; diff --git a/src/workerd/api/tests/js-rpc-params-ownership-test.wd-test b/src/workerd/api/tests/js-rpc-params-ownership-test.wd-test new file mode 100644 index 00000000000..e472a7f8aa5 --- /dev/null +++ b/src/workerd/api/tests/js-rpc-params-ownership-test.wd-test @@ -0,0 +1,19 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "js-rpc-params-ownership-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "js-rpc-params-ownership-test.js") + ], + compatibilityDate = "2025-12-01", + compatibilityFlags = [ + "nodejs_compat", + "rpc_params_dup_stubs", + ], + ) + ), + ], + v8Flags = [ "--expose-gc" ], +); diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index be5e6a5b1a2..aa856d2504a 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -492,8 +492,12 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js, } auto arr = v8::Array::New(js.v8Isolate, argv.data(), argv.size()); - auto externalHandler = - RpcSerializerExternalHandler([&]() -> rpc::JsValue::StreamSink::Client { + auto stubOwnership = FeatureFlags::get(js).getRpcParamsDupStubs() + ? RpcSerializerExternalHandler::DUPLICATE + : RpcSerializerExternalHandler::TRANSFER; + + RpcSerializerExternalHandler externalHandler( + stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client { // A stream was encountered in the params, so we must expect the response to contain // paramsStreamSink. But we don't have the response yet. So, we need to set up a // temporary promise client, which we hook to the response a little bit later. @@ -842,11 +846,13 @@ void JsRpcStub::serialize(jsg::Lock& js, jsg::Serializer& serializer) { builder.setRpcTarget(kj::mv(cap)); }); - // Instead of disposing the stub immediately, we add a disposer to the serializer - // that will be executed when the pipeline is finished. This ensures the stub - // remains valid for the duration of any pipelined operations. - externalHandler->addStubDisposer( - kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); }))); + if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::TRANSFER) { + // Instead of disposing the stub immediately, we add a disposer to the serializer + // that will be executed when the pipeline is finished. This ensures the stub + // remains valid for the duration of any pipelined operations. + externalHandler->addStubDisposer( + kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); }))); + } } jsg::Ref JsRpcStub::deserialize( @@ -1592,7 +1598,8 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js, auto hasDispose = maybeDispose != kj::none; // Now that we've extracted our dispose function, we can serialize our value. - auto externalHandler = RpcSerializerExternalHandler(kj::mv(getStreamSinkFunc)); + RpcSerializerExternalHandler externalHandler( + RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc)); serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder)); auto stubDisposers = externalHandler.releaseStubDisposers(); @@ -1721,6 +1728,62 @@ void JsRpcTarget::serialize(jsg::Lock& js, jsg::Serializer& serializer) { // Handle can't possibly be missing during serialization, it's how we got here. auto handle = jsg::JsObject(KJ_ASSERT_NONNULL(JSG_THIS.tryGetHandle(js))); + if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::DUPLICATE) { + // This message isn't supposed to take ownership of stubs. What does that mean for an + // RpcTarget? You might argue that it means we should never call the disposer. But that's not + // really enough: what if the real owner *does* call the disposer, before our stub is done + // with it? How do we make sure the RpcTarget stays alive? + // + // Things get clearer if we look at a real use case: pure-JS Cap'n Web stubs. We don't see + // them as stubs (since they are not instances of JsRpcStub). Instead, we see them as + // RpcTargets. But we need the semantics to come out the same: when passed as a parameter + // to a native RPC call, we need to duplicate the stub, because the original copy might very + // well be disposed before we use it. + // + // How do we duplicate this non-native stub? Well... proper way to duplicate a pure-JS Cap'n + // Web stub is, of course, to call its `dup()` method. + // + // So how about we just do that? If the target has a `dup()` method, we call it, and we take + // ownership of the result, instead of taking ownership of the original object. + auto dup = handle.get(js, "dup"); + KJ_IF_SOME(dupFunc, dup.tryCast()) { + auto replacement = dupFunc.call(js, handle); + bool replaced = false; + + // We got a duplicate. Is it still an RpcTarget? + KJ_IF_SOME(replacementObj, replacement.tryCast()) { + if (replacementObj.isInstanceOf(js)) { + // It is! Let's replace our handle with the duplicate! + handle = replacementObj; + replaced = true; + } + } + + JSG_REQUIRE(replaced, DOMDataCloneError, + "Couldn't create a stub for the RcpTarget because it has a dup() method which did not " + "return another RpcTarget. Either remove the dup() method or make sure it returns an " + "RpcTarget."); + } else { + // If no dup() method was present, then what? + // + // The pedantic argument would say: we need to throw an exception. But that would lead to a + // pretty poor development experience as people would have to fiddle with adding dup() + // methods to all their RpcTargets. + // + // Another argument might say: we should just use the RpcTarget but never call the disposer + // since we don't own it. But that would probably be confusing. People would wonder why their + // disposers are never called. + // + // If someone passes an RpcTarget with no dup() method, but which does have a disposer, as + // the argument to an RPC method, *probably* they just want the disposer to be called when + // the callee is done with the object. That is, they want us to take ownership after all. If + // that is *not* what they want, then they can always implement a dup() method to make it + // clear. + // + // So, we will just "take ownership" of the target after all, and call its disposer. + } + } + rpc::JsRpcTarget::Client cap = kj::heap(js, IoContext::current(), handle); externalHandler->write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable { @@ -1732,8 +1795,33 @@ void RpcSerializerExternalHandler::serializeFunction( jsg::Lock& js, jsg::Serializer& serializer, v8::Local func) { serializer.writeRawUint32(static_cast(rpc::SerializationTag::JS_RPC_STUB)); + auto handle = jsg::JsObject(func); + + // Similar to JsRpcTarget::serialize(), we may need to dup() the function. + if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) { + auto dup = handle.get(js, "dup"); + KJ_IF_SOME(dupFunc, dup.tryCast()) { + auto replacement = dupFunc.call(js, handle); + bool replaced = false; + + // We got a duplicate. Is it still a Function? + KJ_IF_SOME(replacementObj, replacement.tryCast()) { + if (isFunctionForRpc(js, replacementObj)) { + // It is! Let's replace our handle with the duplicate! + handle = replacementObj; + replaced = true; + } + } + + JSG_REQUIRE(replaced, DOMDataCloneError, + "Couldn't create a stub for the function because it has a dup() method which did not " + "return another function. Either remove the dup() method or make sure it returns a " + "function."); + } + } + rpc::JsRpcTarget::Client cap = - kj::heap(js, IoContext::current(), jsg::JsObject(func), true); + kj::heap(js, IoContext::current(), handle, true); write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable { builder.setRpcTarget(kj::mv(cap)); }); @@ -1758,6 +1846,31 @@ void RpcSerializerExternalHandler::serializeProxy( "Proxy must emulate either a plain object or an RpcTarget, as indicated by the " "Proxy's prototype chain."); + // Similar to JsRpcTarget::serialize(), we may need to dup() the proxy. + if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) { + auto dup = handle.get(js, "dup"); + KJ_IF_SOME(dupFunc, dup.tryCast()) { + auto replacement = dupFunc.call(js, handle); + bool replaced = false; + + // We got a duplicate. Is it still the same type? + KJ_IF_SOME(replacementObj, replacement.tryCast()) { + KJ_IF_SOME(stubType, checkStubType(js, replacementObj)) { + if (stubType == allowInstanceProperties) { + // It is! Let's replace our handle with the duplicate! + handle = replacementObj; + replaced = true; + } + } + } + + JSG_REQUIRE(replaced, DOMDataCloneError, + "Couldn't create a stub for the Proxy because it has a dup() method which did not " + "return the same underlying type (RpcTarget or Function) as the Proxy itself represents. " + "Either remove the dup() method or make sure it returns an RpcTarget."); + } + } + // Great, we've concluded we can indeed point a stub at this proxy. serializer.writeRawUint32(static_cast(rpc::SerializationTag::JS_RPC_STUB)); diff --git a/src/workerd/api/worker-rpc.h b/src/workerd/api/worker-rpc.h index b928932d2f2..8e5ab8398ae 100644 --- a/src/workerd/api/worker-rpc.h +++ b/src/workerd/api/worker-rpc.h @@ -38,10 +38,17 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle public: using GetStreamSinkFunc = kj::Function; + enum StubOwnership { TRANSFER, DUPLICATE }; + // `getStreamSinkFunc` will be called at most once, the first time a stream is encountered in // serialization, to get the StreamSink that should be used. - RpcSerializerExternalHandler(GetStreamSinkFunc getStreamSinkFunc) - : getStreamSinkFunc(kj::mv(getStreamSinkFunc)) {} + RpcSerializerExternalHandler(StubOwnership stubOwnership, GetStreamSinkFunc getStreamSinkFunc) + : stubOwnership(stubOwnership), + getStreamSinkFunc(kj::mv(getStreamSinkFunc)) {} + + inline StubOwnership getStubOwnership() { + return stubOwnership; + } using BuilderCallback = kj::Function; @@ -90,6 +97,7 @@ class RpcSerializerExternalHandler final: public jsg::Serializer::ExternalHandle jsg::Lock& js, jsg::Serializer& serializer, v8::Local proxy) override; private: + StubOwnership stubOwnership; GetStreamSinkFunc getStreamSinkFunc; kj::Vector externals; diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index 381ba50aef7..60557797c40 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -1282,4 +1282,28 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { # # This feature exists for experimental use only, and will be removed once we have a properly # auditable and revocable storage mechanism. + + rpcParamsDupStubs @152 :Bool + $compatEnableFlag("rpc_params_dup_stubs") + $compatDisableFlag("rpc_params_transfer_stubs") + $compatEnableDate("2026-01-20"); + # Changes the ownership semantics of RPC stubs embedded in the parameters of an RPC call. + # + # When the RPC system was first introduced, RPC stubs that were embedded in the params or return + # value of some other call had their ownership transferred. That is, the original stub was + # implicitly disposed, with a duplicate stub being delivered to the destination. + # + # This turns out to compose poorly with another rule: in the callee, any stubs received in the + # params of a call are automatically disposed when the call returns. These two rules combine to + # mean that if you proxy a call -- i.e. the implementation of an RPC just makes another RPC call + # passing along the same params -- then any stubs in the params get disposed twice. Worse, if + # the eventual recipient of the stub wants to keep a duplicate past the end of the call, this + # may not work because the copy of the stub in the proxy layer gets disposed anyway, breaking the + # connection. + # + # For this reason, the pure-JS implementation of Cap'n Web switched to saying that stubs in params + # do NOT transfer ownership -- they are simply duplicated. This compat flag fixes the Workers + # Runtime built-in RPC to match Cap'n Web behavior. + # + # In particular, this fixes: https://github.com/cloudflare/capnweb/issues/110 }