Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions __tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import { expect, it, describe, inject } from "vitest"
import { deserialize, serialize, RpcSession, type RpcSessionOptions, RpcTransport, RpcTarget,
RpcStub, newWebSocketRpcSession, newMessagePortRpcSession,
newHttpBatchRpcSession} from "../src/index.js"
newHttpBatchRpcSession, type WireFormat, jsonFormat} from "../src/index.js"
import { Counter, TestTarget } from "./test-util.js";

let SERIALIZE_TEST_CASES: Record<string, unknown> = {
Expand Down Expand Up @@ -111,15 +111,17 @@ class TestTransport implements RpcTransport {
}
}

private queue: string[] = [];
private queue: (string | ArrayBuffer)[] = [];
private waiter?: () => void;
private aborter?: (err: any) => void;
public log = false;

async send(message: string): Promise<void> {
async send(message: string | ArrayBuffer): Promise<void> {
// HACK: If the string "$remove$" appears in the message, remove it. This is used in some
// tests to hack the RPC protocol.
message = message.replaceAll("$remove$", "");
if (typeof message === "string") {
message = message.replaceAll("$remove$", "");
}

if (this.log) console.log(`${this.name}: ${message}`);
this.partner!.queue.push(message);
Expand All @@ -130,7 +132,7 @@ class TestTransport implements RpcTransport {
}
}

async receive(): Promise<string> {
async receive(): Promise<string | ArrayBuffer> {
if (this.queue.length == 0) {
await new Promise<void>((resolve, reject) => {
this.waiter = resolve;
Expand Down Expand Up @@ -161,11 +163,11 @@ class TestHarness<T extends RpcTarget> {

stub: RpcStub<T>;

constructor(target: T, serverOptions?: RpcSessionOptions) {
constructor(target: T, serverOptions?: RpcSessionOptions, clientOptions?: RpcSessionOptions) {
this.clientTransport = new TestTransport("client");
this.serverTransport = new TestTransport("server", this.clientTransport);

this.client = new RpcSession<T>(this.clientTransport);
this.client = new RpcSession<T>(this.clientTransport, undefined, clientOptions);

// TODO: If I remove `<undefined>` here, I get a TypeScript error about the instantiation being
// excessively deep and possibly infinite. Why? `<undefined>` is supposed to be the default.
Expand Down Expand Up @@ -1509,3 +1511,58 @@ describe("MessagePorts", () => {
new Error("Peer closed MessagePort connection."));
});
});

describe("WireFormat", () => {
it("works with the default jsonFormat", async () => {
let fmtOpts: RpcSessionOptions = { format: jsonFormat };
await using harness = new TestHarness(new TestTarget(), fmtOpts, fmtOpts);
expect(await harness.stub.square(5)).toBe(25);
});

it("works with a custom identity format", async () => {
// A trivial format that still uses JSON under the hood but proves the plumbing works.
let encodeCount = 0;
let decodeCount = 0;
let customFormat: WireFormat = {
encode(value: unknown): string {
encodeCount++;
return JSON.stringify(value);
},
decode(data: string | ArrayBuffer): unknown {
decodeCount++;
if (typeof data !== "string") throw new Error("expected string");
return JSON.parse(data);
},
};

let fmtOpts: RpcSessionOptions = { format: customFormat };
await using harness = new TestHarness(new TestTarget(), fmtOpts, fmtOpts);
expect(await harness.stub.square(4)).toBe(16);
expect(encodeCount).toBeGreaterThan(0);
expect(decodeCount).toBeGreaterThan(0);
});

it("works with an ArrayBuffer-based format", async () => {
// Format that encodes to ArrayBuffer via TextEncoder/TextDecoder (proves binary path).
let encoder = new TextEncoder();
let decoder = new TextDecoder();
let binaryFormat: WireFormat = {
encode(value: unknown): ArrayBuffer {
return encoder.encode(JSON.stringify(value)).buffer as ArrayBuffer;
},
decode(data: string | ArrayBuffer): unknown {
if (data instanceof ArrayBuffer) {
return JSON.parse(decoder.decode(data));
}
return JSON.parse(data as string);
},
};

let fmtOpts: RpcSessionOptions = { format: binaryFormat };
await using harness = new TestHarness(new TestTarget(), fmtOpts, fmtOpts);
expect(await harness.stub.square(6)).toBe(36);

using counter = await harness.stub.makeCounter(10);
expect(await counter.increment(5)).toBe(15);
});
});
12 changes: 6 additions & 6 deletions src/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ class BatchClientTransport implements RpcTransport {
#batchToSend: string[] | null = [];
#batchToReceive: string[] | null = null;

async send(message: string): Promise<void> {
async send(message: string | ArrayBuffer): Promise<void> {
// If the batch was already sent, we just ignore the message, because throwing may cause the
// RPC system to abort prematurely. Once the last receive() is done then we'll throw an error
// that aborts the RPC system at the right time and will propagate to all other requests.
if (this.#batchToSend !== null) {
this.#batchToSend.push(message);
this.#batchToSend.push(message as string);
}
}

async receive(): Promise<string> {
async receive(): Promise<string | ArrayBuffer> {
if (!this.#batchToReceive) {
await this.#promise;
}
Expand Down Expand Up @@ -98,11 +98,11 @@ class BatchServerTransport implements RpcTransport {
#batchToReceive: string[];
#allReceived: PromiseWithResolvers<void> = Promise.withResolvers<void>();

async send(message: string): Promise<void> {
this.#batchToSend.push(message);
async send(message: string | ArrayBuffer): Promise<void> {
this.#batchToSend.push(message as string);
}

async receive(): Promise<string> {
async receive(): Promise<string | ArrayBuffer> {
let msg = this.#batchToReceive!.shift();
if (msg !== undefined) {
return msg;
Expand Down
7 changes: 4 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import { RpcTarget as RpcTargetImpl, RpcStub as RpcStubImpl, RpcPromise as RpcPromiseImpl } from "./core.js";
import { serialize, deserialize } from "./serialize.js";
import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js";
import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions, WireFormat,
jsonFormat } from "./rpc.js";
import { RpcTargetBranded, RpcCompatible, Stub, Stubify, __RPC_TARGET_BRAND } from "./types.js";
import { newWebSocketRpcSession as newWebSocketRpcSessionImpl,
newWorkersWebSocketRpcResponse } from "./websocket.js";
Expand All @@ -17,8 +18,8 @@ forceInitMap();

// Re-export public API types.
export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpcResponse,
nodeHttpBatchRpcResponse };
export type { RpcTransport, RpcSessionOptions, RpcCompatible };
nodeHttpBatchRpcResponse, jsonFormat };
export type { RpcTransport, RpcSessionOptions, RpcCompatible, WireFormat };

// Hack the type system to make RpcStub's types work nicely!
/**
Expand Down
13 changes: 7 additions & 6 deletions src/messageport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class MessagePortTransport implements RpcTransport {
} else if (event.data === null) {
// Peer is signaling that they're closing the connection
this.#receivedError(new Error("Peer closed MessagePort connection."));
} else if (typeof event.data === "string") {
} else if (typeof event.data === "string" ||
event.data instanceof ArrayBuffer) {
if (this.#receiveResolver) {
this.#receiveResolver(event.data);
this.#receiveResolver = undefined;
Expand All @@ -48,25 +49,25 @@ class MessagePortTransport implements RpcTransport {
}

#port: MessagePort;
#receiveResolver?: (message: string) => void;
#receiveResolver?: (message: string | ArrayBuffer) => void;
#receiveRejecter?: (err: any) => void;
#receiveQueue: string[] = [];
#receiveQueue: (string | ArrayBuffer)[] = [];
#error?: any;

async send(message: string): Promise<void> {
async send(message: string | ArrayBuffer): Promise<void> {
if (this.#error) {
throw this.#error;
}
this.#port.postMessage(message);
}

async receive(): Promise<string> {
async receive(): Promise<string | ArrayBuffer> {
if (this.#receiveQueue.length > 0) {
return this.#receiveQueue.shift()!;
} else if (this.#error) {
throw this.#error;
} else {
return new Promise<string>((resolve, reject) => {
return new Promise<string | ArrayBuffer>((resolve, reject) => {
this.#receiveResolver = resolve;
this.#receiveRejecter = reject;
});
Expand Down
68 changes: 54 additions & 14 deletions src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@
import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStubHook, RpcTarget, unwrapStubAndPath } from "./core.js";
import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize } from "./serialize.js";

/**
* Pluggable wire format for encoding/decoding RPC messages. Implement this interface to use a
* binary format (e.g. CBOR) instead of JSON.
*/
export interface WireFormat {
encode(value: unknown): string | ArrayBuffer;
decode(data: string | ArrayBuffer): unknown;
}

/** Default wire format that uses JSON text encoding. */
export const jsonFormat: WireFormat = {
encode(value: unknown): string {
return JSON.stringify(value);
},
decode(data: string | ArrayBuffer): unknown {
if (typeof data !== "string") {
throw new TypeError("jsonFormat received non-string data");
}
return JSON.parse(data);
},
};

/**
* Interface for an RPC transport, which is a simple bidirectional message stream. Implement this
* interface if the built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs.
Expand All @@ -13,7 +35,7 @@ export interface RpcTransport {
/**
* Sends a message to the other end.
*/
send(message: string): Promise<void>;
send(message: string | ArrayBuffer): Promise<void>;

/**
* Receives a message sent by the other end.
Expand All @@ -23,7 +45,7 @@ export interface RpcTransport {
* If there are no outstanding calls (and none are made in the future), then the error does not
* propagate anywhere -- this is considered a "clean" shutdown.
*/
receive(): Promise<string>;
receive(): Promise<string | ArrayBuffer>;

/**
* Indicates that the RPC system has suffered an error that prevents the session from continuing.
Expand Down Expand Up @@ -298,6 +320,15 @@ export type RpcSessionOptions = {
* to serialize the error with the stack omitted.
*/
onSendError?: (error: Error) => Error | void;

/** Wire format for encoding/decoding messages. Defaults to `jsonFormat` (JSON text). */
format?: WireFormat;

/**
* When true, `Uint8Array` values are passed through raw instead of being base64-encoded.
* Only useful with a binary wire format that natively supports byte arrays.
*/
binaryBytes?: boolean;
};

class RpcSessionImpl implements Importer, Exporter {
Expand All @@ -322,8 +353,13 @@ class RpcSessionImpl implements Importer, Exporter {
// may be deleted from the middle (hence leaving the array sparse).
onBrokenCallbacks: ((error: any) => void)[] = [];

private format: WireFormat;
private binaryBytes: boolean;

constructor(private transport: RpcTransport, mainHook: StubHook,
private options: RpcSessionOptions) {
this.format = options.format ?? jsonFormat;
this.binaryBytes = options.binaryBytes ?? false;
// Export zero is automatically the bootstrap object.
this.exports.push({hook: mainHook, refcount: 1});

Expand Down Expand Up @@ -440,18 +476,21 @@ class RpcSessionImpl implements Importer, Exporter {
payload => {
// We don't transfer ownership of stubs in the payload since the payload
// belongs to the hook which sticks around to handle pipelined requests.
let value = Devaluator.devaluate(payload.value, undefined, this, payload);
let value = Devaluator.devaluate(
payload.value, undefined, this, payload, this.binaryBytes);
this.send(["resolve", exportId, value]);
},
error => {
this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]);
this.send(["reject", exportId,
Devaluator.devaluate(error, undefined, this, undefined, this.binaryBytes)]);
}
).catch(
error => {
// If serialization failed, report the serialization error, which should
// itself always be serializable.
try {
this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]);
this.send(["reject", exportId,
Devaluator.devaluate(error, undefined, this, undefined, this.binaryBytes)]);
} catch (error2) {
// TODO: Shouldn't happen, now what?
this.abort(error2);
Expand Down Expand Up @@ -511,17 +550,17 @@ class RpcSessionImpl implements Importer, Exporter {
return;
}

let msgText: string;
let encoded: string | ArrayBuffer;
try {
msgText = JSON.stringify(msg);
encoded = this.format.encode(msg);
} catch (err) {
// If JSON stringification failed, there's something wrong with the devaluator, as it should
// not allow non-JSONable values to be injected in the first place.
// If encoding failed, there's something wrong with the devaluator, as it should
// not allow non-encodable values to be injected in the first place.
try { this.abort(err); } catch (err2) {}
throw err;
}

this.transport.send(msgText)
this.transport.send(encoded)
// If send fails, abort the connection, but don't try to send an abort message since
// that'll probably also fail.
.catch(err => this.abort(err, false));
Expand All @@ -532,7 +571,7 @@ class RpcSessionImpl implements Importer, Exporter {

let value: Array<any> = ["pipeline", id, path];
if (args) {
let devalue = Devaluator.devaluate(args.value, undefined, this, args);
let devalue = Devaluator.devaluate(args.value, undefined, this, args, this.binaryBytes);

// HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap.
// TODO: Clean this up somehow.
Expand Down Expand Up @@ -596,8 +635,8 @@ class RpcSessionImpl implements Importer, Exporter {

if (trySendAbortMessage) {
try {
this.transport.send(JSON.stringify(["abort", Devaluator
.devaluate(error, undefined, this)]))
this.transport.send(this.format.encode(["abort", Devaluator
.devaluate(error, undefined, this, undefined, this.binaryBytes)]))
.catch(err => {});
} catch (err) {
// ignore, probably the whole reason we're aborting is because the transport is broken
Expand Down Expand Up @@ -644,7 +683,8 @@ class RpcSessionImpl implements Importer, Exporter {

private async readLoop(abortPromise: Promise<never>) {
while (!this.abortReason) {
let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise]));
let msg: any = this.format.decode(
await Promise.race([this.transport.receive(), abortPromise]));
if (this.abortReason) break; // check again before processing

if (msg instanceof Array) {
Expand Down
Loading