Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ wd_test(
data = ["js-rpc-test.js"],
)

wd_test(
src = "js-rpc-params-ownership-test.wd-test",
args = ["--experimental"],
data = ["js-rpc-params-ownership-test.js"],
)

wd_test(
src = "memory-cache-test.wd-test",
args = ["--experimental"],
Expand Down
189 changes: 189 additions & 0 deletions src/workerd/api/tests/js-rpc-params-ownership-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import assert from 'node:assert';
import { WorkerEntrypoint, RpcTarget, RpcStub } from 'cloudflare:workers';

class Counter extends RpcTarget {
count = { value: 0 };
dupCounts = { created: 0, disposed: 0 };
disposeCount = 0;

increment(amount = 1) {
this.count.value += amount;
return this.count.value;
}

[Symbol.dispose]() {
++this.disposeCount;
}
}

class DupableCounter extends Counter {
disposed = false;

dup() {
let result = new DupableCounter();
result.count = this.count;
result.dupCounts = this.dupCounts;
++this.dupCounts.created;
return result;
}

[Symbol.dispose]() {
if (this.disposed) {
throw new Error('duplicate disposal');
}
this.disposed = true;
++this.dupCounts.disposed;
++this.disposeCount;
}
}

export class TestService extends WorkerEntrypoint {
async increment(stub, i) {
await stub.increment(i);
}

async roundTrip(stub) {
return { stub: stub.dup() };
}
}

// Test that (with the rpc_params_dup_stubs compat flag) passing a stub in RPC params doesn't
// transfer ownership of the stub.
export let rpcParamsDontTransferOwnership = {
async test(controller, env, ctx) {
let counter = new Counter();

{
using stub = new RpcStub(counter);

// Use the stub in params twice to prove that ownership isn't transferred away.
await ctx.exports.TestService.increment(stub, 2);
await ctx.exports.TestService.increment(stub, 3);

// Make extra-sure we can still call the stub.
await stub.increment();

assert.strictEqual(counter.count.value, 6);

// RpcTarget disposer should not have been called at all.
await scheduler.wait(0);
assert.strictEqual(counter.disposeCount, 0);
}

// Disposing a stub *asynchrconously* disposes the RpcTarget, so we have to spin the event
// loop to observe the disposal.
await scheduler.wait(0);
assert.strictEqual(counter.disposeCount, 1);
},
};

// Test that placing a plain RpcTarget in RPC params DOES "take ownership", that is, the disposer
// will be called.
export let rpcParamsPlainTarget = {
async test(controller, env, ctx) {
let counter = new Counter();

await ctx.exports.TestService.increment(counter, 2);
await ctx.exports.TestService.increment(counter, 3);

assert.strictEqual(counter.count.value, 5);

// Each RPC invocation will have called the disposer.
await scheduler.wait(0);
assert.strictEqual(counter.disposeCount, 2);
},
};

// Test that placing an RpcTarget with a dup() method in RPC params causes the dup() method to be
// called, and then the duplicate is later disposed.
export let rpcParamsDupTarget = {
async test(controller, env, ctx) {
let counter = new DupableCounter();

// If we directly pass `counter` to RPC params, it'll be dup()ed.
await ctx.exports.TestService.increment(counter, 2);
assert.strictEqual(counter.dupCounts.created, 1);
await ctx.exports.TestService.increment(counter, 3);
assert.strictEqual(counter.dupCounts.created, 2);

assert.strictEqual(counter.count.value, 5);

// Dups should have been disposed, but not original.
await scheduler.wait(0);
assert.strictEqual(counter.dupCounts.disposed, 2);
assert.strictEqual(counter.disposed, false);
},
};

// Like rpcParamsDupTarget but the target is wrapped in a Proxy. (This takes a different code
// path.)
export let rpcParamsDupProxyTarget = {
async test(controller, env, ctx) {
let counter = new Proxy(new DupableCounter(), {});

// If we directly pass `counter` to RPC params, it'll be dup()ed.
await ctx.exports.TestService.increment(counter, 2);
assert.strictEqual(counter.dupCounts.created, 1);
await ctx.exports.TestService.increment(counter, 3);
assert.strictEqual(counter.dupCounts.created, 2);

assert.strictEqual(counter.count.value, 5);

// Dups should have been disposed, but not original.
await scheduler.wait(0);
assert.strictEqual(counter.dupCounts.disposed, 2);
assert.strictEqual(counter.disposed, false);
},
};

// Like rpcParamsDupTarget but the target is a function.
export let rpcParamsDupFunction = {
async test(controller, env, ctx) {
let count = 0;
let dupCount = 0;
let disposeCount = 0;

let increment = (i) => {
return (count += i);
};
increment.dup = () => {
++dupCount;
return increment;
};
increment[Symbol.dispose] = function () {
++disposeCount;
};

let counter = { increment };

// If we directly pass `counter` to RPC params, it'll be dup()ed.
await ctx.exports.TestService.increment(counter, 2);
assert.strictEqual(dupCount, 1);
await ctx.exports.TestService.increment(counter, 3);
assert.strictEqual(dupCount, 2);

assert.strictEqual(count, 5);

await scheduler.wait(0);
assert.strictEqual(disposeCount, 2);
},
};

// Test that returning a stub tansfers ownership of the stub, that is, the system later disposes
// it.
export let rpcReturnsTransferOwnership = {
async test(controller, env, ctx) {
let counter = new Counter();

{
using stub = new RpcStub(counter);
using stub2 = (await ctx.exports.TestService.roundTrip(stub)).stub;

await scheduler.wait(0);
assert.strictEqual(counter.disposeCount, 0);
}

await scheduler.wait(0);
assert.strictEqual(counter.disposeCount, 1);
},
};
19 changes: 19 additions & 0 deletions src/workerd/api/tests/js-rpc-params-ownership-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "js-rpc-params-ownership-test",
worker = (
modules = [
(name = "worker", esModule = embed "js-rpc-params-ownership-test.js")
],
compatibilityDate = "2025-12-01",
compatibilityFlags = [
"nodejs_compat",
"rpc_params_dup_stubs",
],
)
),
],
v8Flags = [ "--expose-gc" ],
);
131 changes: 122 additions & 9 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,12 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js,
}
auto arr = v8::Array::New(js.v8Isolate, argv.data(), argv.size());

auto externalHandler =
RpcSerializerExternalHandler([&]() -> rpc::JsValue::StreamSink::Client {
auto stubOwnership = FeatureFlags::get(js).getRpcParamsDupStubs()
? RpcSerializerExternalHandler::DUPLICATE
: RpcSerializerExternalHandler::TRANSFER;

RpcSerializerExternalHandler externalHandler(
stubOwnership, [&]() -> rpc::JsValue::StreamSink::Client {
// A stream was encountered in the params, so we must expect the response to contain
// paramsStreamSink. But we don't have the response yet. So, we need to set up a
// temporary promise client, which we hook to the response a little bit later.
Expand Down Expand Up @@ -842,11 +846,13 @@ void JsRpcStub::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
builder.setRpcTarget(kj::mv(cap));
});

// Instead of disposing the stub immediately, we add a disposer to the serializer
// that will be executed when the pipeline is finished. This ensures the stub
// remains valid for the duration of any pipelined operations.
externalHandler->addStubDisposer(
kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); })));
if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::TRANSFER) {
// Instead of disposing the stub immediately, we add a disposer to the serializer
// that will be executed when the pipeline is finished. This ensures the stub
// remains valid for the duration of any pipelined operations.
externalHandler->addStubDisposer(
kj::heap(kj::defer([self = JSG_THIS]() mutable { self->dispose(); })));
}
}

jsg::Ref<JsRpcStub> JsRpcStub::deserialize(
Expand Down Expand Up @@ -1592,7 +1598,8 @@ MakeCallPipeline::Result serializeJsValueWithPipeline(jsg::Lock& js,
auto hasDispose = maybeDispose != kj::none;

// Now that we've extracted our dispose function, we can serialize our value.
auto externalHandler = RpcSerializerExternalHandler(kj::mv(getStreamSinkFunc));
RpcSerializerExternalHandler externalHandler(
RpcSerializerExternalHandler::TRANSFER, kj::mv(getStreamSinkFunc));
serializeJsValue(js, value, externalHandler, kj::mv(makeBuilder));

auto stubDisposers = externalHandler.releaseStubDisposers();
Expand Down Expand Up @@ -1721,6 +1728,62 @@ void JsRpcTarget::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
// Handle can't possibly be missing during serialization, it's how we got here.
auto handle = jsg::JsObject(KJ_ASSERT_NONNULL(JSG_THIS.tryGetHandle(js)));

if (externalHandler->getStubOwnership() == RpcSerializerExternalHandler::DUPLICATE) {
// This message isn't supposed to take ownership of stubs. What does that mean for an
// RpcTarget? You might argue that it means we should never call the disposer. But that's not
// really enough: what if the real owner *does* call the disposer, before our stub is done
// with it? How do we make sure the RpcTarget stays alive?
//
// Things get clearer if we look at a real use case: pure-JS Cap'n Web stubs. We don't see
// them as stubs (since they are not instances of JsRpcStub). Instead, we see them as
// RpcTargets. But we need the semantics to come out the same: when passed as a parameter
// to a native RPC call, we need to duplicate the stub, because the original copy might very
// well be disposed before we use it.
//
// How do we duplicate this non-native stub? Well... proper way to duplicate a pure-JS Cap'n
// Web stub is, of course, to call its `dup()` method.
//
// So how about we just do that? If the target has a `dup()` method, we call it, and we take
// ownership of the result, instead of taking ownership of the original object.
auto dup = handle.get(js, "dup");
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
auto replacement = dupFunc.call(js, handle);
bool replaced = false;

// We got a duplicate. Is it still an RpcTarget?
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
if (replacementObj.isInstanceOf<JsRpcTarget>(js)) {
// It is! Let's replace our handle with the duplicate!
handle = replacementObj;
replaced = true;
}
}

JSG_REQUIRE(replaced, DOMDataCloneError,
"Couldn't create a stub for the RcpTarget because it has a dup() method which did not "
"return another RpcTarget. Either remove the dup() method or make sure it returns an "
"RpcTarget.");
} else {
// If no dup() method was present, then what?
//
// The pedantic argument would say: we need to throw an exception. But that would lead to a
// pretty poor development experience as people would have to fiddle with adding dup()
// methods to all their RpcTargets.
//
// Another argument might say: we should just use the RpcTarget but never call the disposer
// since we don't own it. But that would probably be confusing. People would wonder why their
// disposers are never called.
//
// If someone passes an RpcTarget with no dup() method, but which does have a disposer, as
// the argument to an RPC method, *probably* they just want the disposer to be called when
// the callee is done with the object. That is, they want us to take ownership after all. If
// that is *not* what they want, then they can always implement a dup() method to make it
// clear.
//
// So, we will just "take ownership" of the target after all, and call its disposer.
}
}

rpc::JsRpcTarget::Client cap = kj::heap<TransientJsRpcTarget>(js, IoContext::current(), handle);

externalHandler->write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable {
Expand All @@ -1732,8 +1795,33 @@ void RpcSerializerExternalHandler::serializeFunction(
jsg::Lock& js, jsg::Serializer& serializer, v8::Local<v8::Function> func) {
serializer.writeRawUint32(static_cast<uint>(rpc::SerializationTag::JS_RPC_STUB));

auto handle = jsg::JsObject(func);

// Similar to JsRpcTarget::serialize(), we may need to dup() the function.
if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) {
auto dup = handle.get(js, "dup");
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
auto replacement = dupFunc.call(js, handle);
bool replaced = false;

// We got a duplicate. Is it still a Function?
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
if (isFunctionForRpc(js, replacementObj)) {
// It is! Let's replace our handle with the duplicate!
handle = replacementObj;
replaced = true;
}
}

JSG_REQUIRE(replaced, DOMDataCloneError,
"Couldn't create a stub for the function because it has a dup() method which did not "
"return another function. Either remove the dup() method or make sure it returns a "
"function.");
}
}

rpc::JsRpcTarget::Client cap =
kj::heap<TransientJsRpcTarget>(js, IoContext::current(), jsg::JsObject(func), true);
kj::heap<TransientJsRpcTarget>(js, IoContext::current(), handle, true);
write([cap = kj::mv(cap)](rpc::JsValue::External::Builder builder) mutable {
builder.setRpcTarget(kj::mv(cap));
});
Expand All @@ -1758,6 +1846,31 @@ void RpcSerializerExternalHandler::serializeProxy(
"Proxy must emulate either a plain object or an RpcTarget, as indicated by the "
"Proxy's prototype chain.");

// Similar to JsRpcTarget::serialize(), we may need to dup() the proxy.
if (stubOwnership == RpcSerializerExternalHandler::DUPLICATE) {
auto dup = handle.get(js, "dup");
KJ_IF_SOME(dupFunc, dup.tryCast<jsg::JsFunction>()) {
auto replacement = dupFunc.call(js, handle);
bool replaced = false;

// We got a duplicate. Is it still the same type?
KJ_IF_SOME(replacementObj, replacement.tryCast<jsg::JsObject>()) {
KJ_IF_SOME(stubType, checkStubType(js, replacementObj)) {
if (stubType == allowInstanceProperties) {
// It is! Let's replace our handle with the duplicate!
handle = replacementObj;
replaced = true;
}
}
}

JSG_REQUIRE(replaced, DOMDataCloneError,
"Couldn't create a stub for the Proxy because it has a dup() method which did not "
"return the same underlying type (RpcTarget or Function) as the Proxy itself represents. "
"Either remove the dup() method or make sure it returns an RpcTarget.");
}
}

// Great, we've concluded we can indeed point a stub at this proxy.
serializer.writeRawUint32(static_cast<uint>(rpc::SerializationTag::JS_RPC_STUB));

Expand Down
Loading
Loading