Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do the abort steps of ReadableStreamPipeTo really guarantee the abort callback to be called before cancel? #1229

Open
saschanaz opened this issue Apr 26, 2022 · 5 comments

Comments

@saschanaz
Copy link
Member

saschanaz commented Apr 26, 2022

Step 14 of the https://streams.spec.whatwg.org/#readable-stream-pipe-to is basically Promise.all([dest.abort(), source.cancel()]), assuming the states are writable/readable respectively. One WPT test for this asserts that abort() is called before cancel(): https://github.com/web-platform-tests/wpt/blob/285addceabb4443562a9b93d789b17230c3d6e20/streams/piping/abort.any.js#L215-L237

Blink passes this test, but not sure how. A slightly modified test without AbortSignal shows that the abort callback is called after the cancel callback (because the latter is called synchronously while the former is not), so I'd expect same for the AbortSignal test:

promise_test(t => {
  const events = [];
  const rs = new ReadableStream({
    pull(controller) {
      controller.error('failed to abort');
    },
    cancel() {
      events.push('cancel');
      return Promise.reject(error1);
    }
  }, hwm0);
  const ws = new WritableStream({
    abort() {
      events.push('abort');
      return Promise.reject(error2);
    }
  });
  return promise_rejects_exactly(t, error1, Promise.all([ws.abort(), rs.cancel()]), 'The cancel rejection happens first in this case')
      .then(() => assert_array_equals(events, ['cancel', 'abort'], 'cancel() is called first in this case'));
}, '');

Am I understanding something wrong?

@domenic
Copy link
Member

domenic commented Apr 26, 2022

This seems extremely related to #1208 and similar things @MattiasBuelens was working on... but the fact that the reference implementation passes all WPTs implies the current spec should be good enough.

Does

abortAlgorithm = () => {
const error = signal.reason;
const actions = [];
if (preventAbort === false) {
actions.push(() => {
if (dest._state === 'writable') {
return WritableStreamAbort(dest, error);
}
return promiseResolvedWith(undefined);
});
}
if (preventCancel === false) {
actions.push(() => {
if (source._state === 'readable') {
return ReadableStreamCancel(source, error);
}
return promiseResolvedWith(undefined);
});
}
shutdownWithAction(() => waitForAllPromise(actions.map(action => action())), true, error);
};
if (signal.aborted === true) {
abortAlgorithm();
return;
}
signal.addEventListener('abort', abortAlgorithm);
help? It doesn't really look like Promise.all([dest.abort(), source.cancel()]) so I am not sure where you are getting that...

@MattiasBuelens
Copy link
Collaborator

This happens because the writable stream is not yet started. If you add a delay before you call abort and cancel, you get the events in the expected order:

promise_test(async t => {
  const events = [];
  const rs = new ReadableStream({
    pull(controller) {
      controller.error('failed to abort');
    },
    cancel() {
      events.push('cancel');
      return Promise.reject(error1);
    }
  }, { highWaterMark: 0 });
  const ws = new WritableStream({
    abort() {
      events.push('abort');
      return Promise.reject(error2);
    }
  });
  await flushAsyncEvents(); // <<< the important bit
  await promise_rejects_exactly(t, error2, Promise.all([ws.abort(), rs.cancel()]), 'The abort rejection happens first in this case');
  assert_array_equals(events, ['abort', 'cancel'], 'abort() is called first in this case');
}, '#1229');

I agree that this is surprising: we don't wait for the readable stream to be started before we call its cancel method. But there's a reason for this: the start() method may a long-running async producer.

let abortController = new AbortController();
const rs = new ReadableStream({
  async start(controller) {
    while (!abortController.signal.aborted) {
      controller.enqueue("a");
      await new Promise(r => setTimeout(r, 1000));
    }
  },
  cancel(reason) {
    controller.abort(reason);
  }
});

See also #1208 (comment).

@MattiasBuelens
Copy link
Collaborator

Oh wait, hang on. That doesn't actually explain why pipeTo() works even without the await flushAsyncEvents()... 😅

It looks like the problem is that, even when the pipe is immediately aborted (with an aborted AbortSignal), the reference implementation doesn't execute the abort actions in the first microtask. That's because we enter this if branch:

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), doTheRest);

The implementation of waitForWritesToFinish() takes at least one microtask to transform the currentWrite promise, and then we add another microtask because of the uponFulfillment() itself. By that time, the writable stream in this WPT test has become started, and the abort() call is handled synchronously.

I've tried fixing this for the case where the pipe didn't start any writes at all (#1208 (comment)), but that caused even more problems... 😛

@saschanaz
Copy link
Member Author

saschanaz commented Apr 26, 2022

Thanks, #1208 looks indeed very much related! The current Gecko implementation does not wait at all there and thus calls the shutdown actions synchronously, while both the reference impl and Blink wait there per the following code:

var rs = new ReadableStream({ cancel() { console.log("canceled") } });
var ws = new WritableStream({ abort() { console.log("aborted") } });
var abortController = new AbortController();
var signal = abortController.signal;
abortController.abort();
rs.pipeTo(ws, { signal });
console.log("foo")

// Reference: foo aborted canceled
// Blink: foo aborted canceled
// Gecko: canceled foo aborted

Will #1208 change the behavior here as Gecko does?

@MattiasBuelens
Copy link
Collaborator

Will #1208 change the behavior here as Gecko does?

In its current state, no.

Personally, I think we should make pipeTo() do more things synchronously when possible, especially when the timing and results of those actions are observable by author code (i.e. when the readable and/or writable stream is constructed by author code). See #1208 (comment) for Domenic's idea on how to make that happen. But that should probably go into a separate PR, since #1208 is already quite large as it is. 😛

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants