Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance optimization: ReadableStreamDefaultReader.prototype.readMany #1236

Open
Jarred-Sumner opened this issue Jun 14, 2022 · 3 comments

Comments

@Jarred-Sumner
Copy link

Jarred-Sumner commented Jun 14, 2022

Some sources enqueue many small chunks, but ReadableStreamDefaultReader.prototype.read only returns one chunk at a time. ReadableStreamDefaultReader.prototype.read has non-trivial overhead (at least one tick per call)

When the developer knows they want to read everything queued, it is faster to ask for all the values available directly instead of reading one value at a time.

I implemented this in Bun.

This led to a > 30% performance improvement to server-side rendering React via new Response(await renderToReadableStream(reactElement)).arrayBuffer()

Before:
image

After:
image

Note: these numbers are out of date, bun's implementation is faster now - but the delta above can directly be attributed to ReadableStreamDefaultReader.prototype.readMany

Here is an example implementation that uses JavaScriptCore builtins
function readMany()
{
    "use strict";

    if (!@isReadableStreamDefaultReader(this))
        @throwTypeError("ReadableStreamDefaultReader.readMany() should not be called directly");

    const stream = @getByIdDirectPrivate(this, "ownerReadableStream");
    if (!stream)
        @throwTypeError("readMany() called on a reader owned by no readable stream");

    const state = @getByIdDirectPrivate(stream, "state");
    @putByIdDirectPrivate(stream, "disturbed", true);
    if (state === @streamClosed)
        return {value: [], size: 0, done: true};
    else if (state === @streamErrored) {
        throw @getByIdDirectPrivate(stream, "storedError");
    }

    
    var controller = @getByIdDirectPrivate(stream, "readableStreamController");

    const content = @getByIdDirectPrivate(controller, "queue").content;
    var size = @getByIdDirectPrivate(controller, "queue").size;
    var values = content.toArray(false);
    var length = values.length;

    if (length > 0) {

        if (@isReadableByteStreamController(controller)) {
            for (var i = 0; i < value.length; i++) {
                const buf = value[i];
                if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
                    value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
                }
            }
        }
        
        @resetQueue(@getByIdDirectPrivate(controller, "queue"));

        if (@getByIdDirectPrivate(controller, "closeRequested"))
            @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
        else if (@isReadableStreamDefaultController(controller)) 
            @readableStreamDefaultControllerCallPullIfNeeded(controller);
        else if (@isReadableByteStreamController(controller))
            @readableByteStreamControllerCallPullIfNeeded(controller);

        return @createFulfilledPromise({value: values, size, done: false});
    }

    var onPullMany = (result) => {
        if (result.done) {
            return @createFulfilledPromise({value: [], size: 0, done: true});
        }
        var controller = @getByIdDirectPrivate(stream, "readableStreamController");
        
        var queue = @getByIdDirectPrivate(controller, "queue");
        var value = [result.value].concat(queue.content.toArray(false));

        if (@isReadableByteStreamController(controller)) {
            for (var i = 0; i < value.length; i++) {
                const buf = value[i];
                if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
                    value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
                }
            }
        }
        var size = queue.size;
        @resetQueue(queue);

        if (@getByIdDirectPrivate(controller, "closeRequested"))
            @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
        else if (@isReadableStreamDefaultController(controller)) 
            @readableStreamDefaultControllerCallPullIfNeeded(controller);
        else if (@isReadableByteStreamController(controller))
            @readableByteStreamControllerCallPullIfNeeded(controller);
        

        
        return @createFulfilledPromise({value: value, size: size, done: false});
    };
    
    var pullResult = controller.@pull(controller);
    if (pullResult && @isPromise(pullResult)) {
        return pullResult.@then(onPullMany);
    }

    return onPullMany(pullResult);
}

I have no opinion about the name of the function - just that some way to read all the queued values in one call should be exposed in the public interface for developers.

@jasnell
Copy link

jasnell commented Jun 14, 2022

I've been considering the possibility of a new kind of reader that implements the Body mixin, which would provide opportunities for these kinds of optimizations. e.g.

const readable = new ReadableStream({ ... });

const reader = new ReadableStreamBodyReader(readable);

await reader.arrayBuffer();

// or

await reader.blob();

// or

await reader.text();

The ReadableStreamBodyReader would provide opportunities to more efficiently consume the ReadableStream without going through the typical chunk-by-chunk slow path and would not require any changes to the existing standard API surface.

Now, this approach would necessarily consume the entire readable so if we wanted an alternative that would consume the stream incrementally, then this won't work, but the basic idea remains the same: introducing a new kind of Reader is the right way to accomplish this.

@Jarred-Sumner
Copy link
Author

Jarred-Sumner commented Jun 15, 2022

introducing a new kind of Reader is the right way to accomplish this

Yeah, that makes sense.

I've been considering the possibility of a new kind of reader that implements the Body mixin
const reader = new ReadableStreamBodyReader(readable);

This is nice. Only thought here is maybe getReader("body") would be nice to have, too.

The ReadableStreamBodyReader would provide opportunities to more efficiently consume the ReadableStream without going through the typical chunk-by-chunk slow path and would not require any changes to the existing standard API surface.

Thinking more on this today – I think this problem needs to be addressed in the controller more so than the reader. The problem is that the reader can't tell the controller what it wants to do because the controller starts doing work before the reader is acquired.

For converting a stream to an array/arrayBuffer/text/blob, instead of creating a bunch of chunks and merging them at the end, it should just not create a bunch of chunks in the first place.

For streaming an HTTP response body, the HTTP socket/client is much better equipped to efficiently chunk up input than ReactDOM.

I think a new controller type would work be more effective, but ReadableStreamBodyReader would also be nice

@Jarred-Sumner
Copy link
Author

Jarred-Sumner commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants