Skip to content

Commit 111a252

Browse files
committed
Don't cancel ReadableStreams that are locked.
I noticed that the `state.stream.cancel()` call in `ReadableStreamStubHook.dispose()` was often failing, throwing an exception because the stream was already locked. But we were ignoring the exceptions. This actually fixes the problem in two ways: 1. When we pipe from a ReadableStream (which locks it), we also take a reference on its StubHook, which we dispose then the pipe completes. This makes sense and solves the case seen in the tests. 2. I also just made it skip the cancel() call if the stream is locked. Throwing the exception and ignoring it is just a waste of cycles.
1 parent 52a610e commit 111a252

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

src/rpc.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ class RpcSessionImpl implements Importer, Exporter {
538538
return readable;
539539
}
540540

541-
createPipe(readable: ReadableStream): ImportId {
541+
createPipe(readable: ReadableStream, readableHook: StubHook): ImportId {
542542
if (this.abortReason) throw this.abortReason;
543543

544544
this.send(["pipe"]);
@@ -555,7 +555,7 @@ class RpcSessionImpl implements Importer, Exporter {
555555
// Errors are handled by the writable stream's error handling -- either the write fails
556556
// and the writable side reports it, or the readable side errors and pipeTo aborts the
557557
// writable side. Either way, the hook's disposal will handle cleanup.
558-
});
558+
}).finally(() => readableHook.dispose());
559559

560560
return importId;
561561
}

src/serialize.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ export interface Exporter {
1919
unexport(ids: Array<ExportId>): void;
2020

2121
// Creates a pipe by sending a ["pipe"] message, then starts pumping the given ReadableStream
22-
// into the pipe's writable end. Returns the import ID assigned to the pipe.
23-
createPipe(readable: ReadableStream): ImportId;
22+
// into the pipe's writable end. Returns the import ID assigned to the pipe. `hook` should be
23+
// disposed when the pipe finishes.
24+
createPipe(readable: ReadableStream, hook: StubHook): ImportId;
2425

2526
onSendError(error: Error): Error | void;
2627
}
@@ -256,8 +257,11 @@ export class Devaluator {
256257
throw new Error("Can't serialize ReadableStream in this context.");
257258
}
258259

260+
let ws = <ReadableStream>value;
261+
let hook = this.source.getHookForReadableStream(ws, parent);
262+
259263
// Create a pipe and start pumping the ReadableStream into it.
260-
let importId = this.exporter.createPipe(<ReadableStream>value);
264+
let importId = this.exporter.createPipe(ws, hook);
261265

262266
return ["readable", importId];
263267
}

src/streams.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,9 +495,20 @@ class ReadableStreamStubHook extends StubHook {
495495
if (--state.refcount === 0) {
496496
if (!state.canceled) {
497497
state.canceled = true;
498-
state.stream.cancel(
499-
new Error("ReadableStream RPC stub was disposed without being consumed"))
500-
.catch(() => {}); // Ignore errors from cancel.
498+
499+
// Don't try to cancel the stream if it's locked. It won't work anyway -- it'll throw
500+
// an exception, which we'd ignore anyway.
501+
//
502+
// This is a little janky but it makes some sense: If someone has locked the stream, they
503+
// have taken responsibility for fully reading it. The only reason we really need to
504+
// cancel when this hook is disposed is to handle the case where an application receives
505+
// a ReadableStream but completely ignores it -- we want it to be canceled naturally when
506+
// the payload is disposed.
507+
if (!state.stream.locked) {
508+
state.stream.cancel(
509+
new Error("ReadableStream RPC stub was disposed without being consumed"))
510+
.catch(() => {}); // Ignore errors from cancel.
511+
}
501512
}
502513
}
503514
}

0 commit comments

Comments
 (0)