Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: ReadableStream tee() backpressure #1235

Open
jasnell opened this issue Jun 13, 2022 · 10 comments
Open

Proposal: ReadableStream tee() backpressure #1235

jasnell opened this issue Jun 13, 2022 · 10 comments

Comments

@jasnell
Copy link

jasnell commented Jun 13, 2022

The current definition of the ReadableStreamTee abstract operation includes an inherent flaw when it comes to backpressure signaling ... specifically, there isn't any. Whether stream.[[controller]] implements ReadableByteStreamController or ReadableStreamDefaultController, the issue here is the same so I'll describe the issue in general terms.

Current behavior

We start with an original ReadableStream RS0. When tee() is called on RS0, a single ReadableStreamDefaultReader R0 is created and RS0 is marked as locked. That default reader R1 is then shared by two new ReadableStreams RS1 and RS2:

When either RS1 or RS2 initiates a pull on their respective underlying sources, then shared exclusive reader R0 will initiate a read on RS0. Then that read completes the resulting data, if any, is enqueued to both RS1 and RS2. Unfortunately, if one of those does not have an active reader, or is being read at a significantly slower rate than the other, the data enqueued accumulates in the unread branch's internal queue without any backpressure signaling.

flowchart TD;
  R1 --> U1[[Data flows]]
  RS2 --> U2[(Data accumulates)]
  U2 -- no backpressure --x RS2
  RS1 --> R1
  R0 --> RS1
  R0 --> RS2
  RS0 --> R0

While the origin stream RS0 may implement backpressure, that is relieved when only one of the two child branches initiates a read(). From RS0's perspective, data is flowing because something is calling read, even though the data is accumulating in the downstream unconsumed branch.

For browser-based uses of ReadableStream, this is often acceptable, as the number of streams are limited and the impact is limited to a single user environment. The memory accumulation has low impact. For server and edge environments, particularly multi-tenant environments, the impact is much more significant at scale. We can and should do better.

Requirements

First, we want a solution to introducing tee() backpressure that introduces no observable breaking changes to the streams specification public API. The solution, however, does require changes to the streams internals in a number of important ways that are described below. Importantly, all current implementations of tee(), as far as the observable behavior is concerned, remains precisely as it is today.

Second, we want a mechanism that allows for reliably signaling backpressure up through an entire tree of tee branches so that buffering at each step can be minimized or avoided entirely. Specifically, backpressure must be controlled by the branch that is consuming data the slowest as opposed to the branch consuming data the fastest.

Third, our solution must not introduce any changes to the underlying source API.

ReadableStreamNonExclusiveReader

There are currently two kinds of readers: ReadableStreamDefaultReader and ReadableStreamBYOBReader. For the sake of this discussion, we are going to generically refer to these as "exclusive readers".

This is a one-to-one relationship between a ReadableStream an an exclusive reader. When a user calls getReader() or users a reader constructor, the ReadableStream is locked to that one reader instance. There is no other way to consume the ReadableStream once locked. Subsequent calls to get a reader will fail, as will subsequent requests to pipe or tee the ReadableStream. This property is precisely why we run into the buffering issue we are seeking to address here.

This proposal seeks to a new kind of non-exclusive reader to the streams standard: ReadableStreamNonExclusiveReader (we will call this an N-E-R reader for short). As suggested by the name, ReadableStream would be modified to allow multiple N-E-R reader to be attached to it simultaneously.

In general, the locking mechanism of a ReadableStream does not change much under this proposal. If a readable is piped, or attached to an exclusive reader, or tee'd, the ReadableStream is locked such that it cannot be otherwise consumed (exactly the way it is now). When an N-E-R reader is attached to the ReadableStream, the stream is still marked as locked such that it cannot be subsequently piped, tee'd, or have an exclusive reader attached to it -- *but it will be possible to attach additional N-E-R readers to the same ReadableStream. So long as there is at least one N-E-R reader attached, the readable remains locked. Once all N-E-R readers are released, the lock on the readable is released.

flowchart TD;
  A[ReadableStream] -- locked --> B[pipeTo]
  C[ReadableStream] -- locked --> D[tee]
  E[ReadableStream] -- locked --> F[exclusive reader]
  G[ReadableStream] -- locked --> H1[N-E-R reader]
  G[ReadableStream] -- locked --> H2[N-E-R reader]

The addition of an N-E-R reader changes the internal behavior of the ReadableStream queue in a couple of critical ways.

Currently, when a controller enqueues data, and there is no read operation pending, the data enqueued is pushed directly into the readable's internal queue, triggering the backpressure calculation to decrement the desiredSize reported by the controller by the calculated size of the enqueued chunk. If, when the controller enqueues the data, there is a pending read operation, and the internal buffer is empty, the chunk is used to immediately fulfill the read and the data is never added to the internal buffer. In this case, the desiredSize is never updated because no backpressure is applied.

When a readable is attached to multiple N-E-R readers, when a controller enqueues data, and there are no pending readers on any of the attached readers, the data is pushed into the queue and the desiredSize is decremented. If any, but not all, of the attached N-E-R readers have pending reads, the enqueued chunk is used to fulfill the pending reads and the chunk is added to the internal buffer waiting to be read by the remaining N-E-R readers. The desiredSize will be decremented by the calculated size. If all N-E-R readers have pending reads, and the buffer is empty, the chunk will never be added to the internal buffer.

While this does require a more complicated bookkeeping, the design ensures that the backpressured (signaled by desiredSize) is relieved only at the pace of the slowest attached N-E-R reader, while allowing other branches to continue consuming data as quickly as it is made available. It continues to be up to the underlying source implementation to determine whether or not to pay attention to the backpressure signal.

The mechanism supports default and BYOB reads, although BYOB reads are certainly more complicated with a N-E-R reader. When an N-E-R reader performs a BYOB read and provides a buffer for the underlying source to fill, once the controller's byobRequest is completed (by calling either respond() or respondWithNewView(), the data provided will be copied into the internal buffer of the ReadableStream pending reads from the remaining N-E-R readers. For those, BYOB reads will be treated the same as BYOB reads fulfilled from the queue. A faster reader can continue to submit new BYOB reads that will be passed on to the controller to fulfill if it chooses. The desiredSize backpressure signaling will still properly reflect the backpressure status, and is it up to the underlying source implementation to decide whether and how to handle the backpressure.

Reading Modes

The ReadableStreamNonExclusiveReader adapts to the type of the underlying controller. If the controller is a ReadableStreamDefaultController, the N-E-R reader's read() method accepts no arguments and reads using the same semantics as the ReadableStreamDefaultReader. If the controller is a ReadableByteStreamController, the N-E-R reader's read() method accepts a TypedArray to be filled using the BYOB semantics.

Queue bookkeeping

Each type of controller maintains an internal queue. When data is enqueued in the controller while there are no pending reads, that data is appended to the end of the queue. Reads occurring while there is data in the queue will cause that data to be removed from the queue on a first-in-first-out basis. When data is enqueued while there is a pending read, the read is just filled directly without ever adding the data to the queue.

With N-E-R readers, this accounting changes. Each N-E-R reader maintains its own individual cursor location in the queue. Data may be freed from the queue once all cursors advance beyond the position of that data in the queue. If there is currently data in the queue, there are any attached N-E-R readers whose cursor position is not at the end of the queue, or if at least one attached N-E-R does not have a pending read request, then data pushed into the controller is appended to the end of the queue, If all cursors are positioned at the end of the queue and all N-E-R reader have pending reads, and the queue is empty, then the data is never added to the queue.

Acquiring a non-exclusive reader

The N-E-R reader would be used internally in the implementation of tee() (see the Re-defining tee() section that follows). But will also possible directly using either a constructor or the getReader() method.

const readable = new ReadableStream();
const reader1 = readable.getReader({ mode: 'non-exclusive' });
console.log(readable.locked); // true
const reader2 = readable.getReader({ mode: 'non-exclusive' }); // Success!!
const reader3 = readable.getReader(); // Fails!!

const reader4 = new ReadableStreamNonExclusiveReader(readable); // Success!!
const reader5 = new ReadableStreamDefaultReader(readable); // Fails!!

Redefining tee()

The ReadableStream tee() operation is modified such that each branch of the tee uses its own N-E-R reader as opposed to sharing a single exclusive reader.

flowchart TD;
  RS0 --> U2[(Data accumulates)]
  U2 -- signal backpressure --> RS0
  R3 --> U[[Data flows]]
  RS1 --> R3
  R1 --> RS1
  R2 --> RS2
  RS0 --> R1[[N-E-R 1]]
  RS0 --> R2[[N-E-R 2]]

As a further optimization that is secondary to backpressure, if the child ReadableStreams (RS1 and RS2 in the diagram) are themselves tee'd, then those are simply added as new N-E-R readers to the original ReadableStream RS0.

flowchart TD;
  R1 --> RS1
  R2 -.-> RS2
  RS2 -.-> R3[[N-E-R 3]]
  RS2 -.-> R4[[N-E-R 4]]
  RS0 --> R1[[N-E-R 1]]
  RS0 -.-> R2[[N-E-R 2]]
  RS0 --> R3[[N-E-R 3]]
  RS0 --> R4[[N-E-R 4]]

(In the above, RS2 and N-E-R 2 can each be safely garbage collected as they are no longer an active part of the tree)

With this design, data buffers in only a single location within the tree with proper backpressure signaling that is relieved only at the pace of the slowest reading branch. Faster branches can continue to consume data as quickly as the underlying source makes it available. As mentioned, the underlying source continues to choose whether or not to pay attention to the backpressure signaling.

The design has no observable API impact to existing tee() uses and no observable impact to underlying source implementations. There is no change to backpressure signaling. Existing methods of consuming the readable are unchanged. Having an N-E-R reader leaks no information about the presence or state of other attached N-E-R readers beyond observable memory usage metrics.

@jasnell
Copy link
Author

jasnell commented Jun 21, 2022

@domenic @annevk ... any thoughts on this?

@domenic
Copy link
Member

domenic commented Jun 21, 2022

This is a bit much for me to digest, given that I am not working on streams full time. I suspect the same is true for @annevk, who has never been working on streams and is currently or soon on vacation.

@ricea or @MattiasBuelens may be better positioned to help.

Based on my initial quick skim, I am surprised that a simple problem like "change the backpressure semantics" requires such a large and complicated solution. If it's true that this is a server-focused problem and doesn't matter for browsers, and that solving it requires such a large and complicated bunch of machinery, then I am not personally optimistic about finding much engagement from the browser engineers you are pinging.

@jasnell
Copy link
Author

jasnell commented Jun 22, 2022

If it's true that this is a server-focused problem and doesn't matter for browsers

I would not characterize it like that. It's a problem for any environment where you're going to be teeing a stream and reading a large amount of data faster through one branch than the other. The impact is less acute in browsers since those are generally single user environments with far fewer of these things likely happening in parallel, but the problem would still exist there.

I am surprised that a simple problem like "change the backpressure semantics" requires such a large and complicated solution

Unfortunately it does. The backpressure signaling in a ReadableStream never appears to have taken tee branching into consideration.

@benjamingr
Copy link
Member

If it's true that this is a server-focused problem and doesn't matter for browsers, and that solving it requires such a large and complicated bunch of machinery, then I am not personally optimistic about finding much engagement from the browser engineers you are pinging.

First of all, thanks for looking into this. I've personally run into this a bunch when working on video in browsers. This was before ReadableStream was common enough to use (so we had to work on our own) but code that touches video in browsers can definitely run into this sort of issue.

I actually haven't used ReadableStream enough in servers to ever run into this and performance sensitive code on servers often can't actually tee because of performance. On the client though the amount of back-pressure determines buffering/downloads which is really important for video.

If we want a concrete (and rather common) example let's look at HLS/DASH playback (like on YouTube, Netflix or most video sites). Most of the code doing this doesn't use ReadableStream (yet) but this case is common:

  • I have a server giving me video. How much data I buffer is very important because it balances the quality of service for the users, the amount of data I download (especially on metered connections) and processing.
  • I have client code that takes the HLS/DASH video and demuxes it (since most browsers don't natively run HLS) - changing the container format from .ts to fragmented mp4. Using media source extensions it then pushes the video fragment to the video tag. Two examples of (popular) libraries that do this for HLS/DASH are hlsjs and shaka. One can think of this as a streaming transformation and the user actually consuming the video should impact backpressure and buffering.
  • I also want to look at the stream for analytics (how many frames, time watched etc).

In this case for ReadableStream to be viable backpressure would need to work, buffering information (that may be thrown away if the rendition changes) because the analytics parsing is fast wouldn't work in terms of quality of service since the "real" pace is the user watching the video + the cost of demuxing it.

There is a good picture illustrating Chrome's algorithm for mp4 (that again predates ReadableStream) - I really like that picture and use it when giving talks about watermarks and backpressure:

image

(From https://www.chromium.org/audio-video/#how-the-does-buffering-work )


I am not arguing this is a bigger issue on the client or even that it's a very common one - but it's one that I personally ran into when working on video related tasks in browsers, it's one the browsers have to solve internally and I suspect it's slowing adoption of ReadableStream in that space (and I believe video is one of the most important use cases for ReadableStream)

@jasnell
Copy link
Author

jasnell commented Jul 5, 2022

FWIW, I do have a path forward for addressing this issue within the Workers implementation that does not require any public API changes. The changes are entirely internal to the implementation of the data queuing and backpressure signaling in ReadableStreamDefaultController and ReadableByteStreamController and ReadableStream without having any impact on public API (both consumer and producer sides). The external facing API pieces discussed in my original post above (e.g. the non-exclusive reader class) can be considered to be optional to resolving this.

@domenic
Copy link
Member

domenic commented Jul 6, 2022

Fascinating. I'd love to learn more about that? Does it have any drawbacks, or is it just something we should do for all instances of those classes?

@jasnell
Copy link
Author

jasnell commented Jul 13, 2022

The key drawback is really just the increase in bookkeeping complexity for the internal queue. Within the next couple of weeks I'm going to take a stab at writing up some draft spec text for it that'll hopefully help make it more concrete.

@domenic
Copy link
Member

domenic commented Jul 14, 2022

Great! Feel free to start with modifications to the reference implementation / web platform tests, since translating that into spec text is usually pretty mechanical.

@yonran
Copy link

yonran commented Jul 21, 2022

In #1233 I also reported the issue that ReadableStream.tee() only partially signals backpressure, from the faster branch to the source, while leaving the slower branch accumulate unlimited data on its queue. Why not add an option tee({fullBackpressure: true}) that fixes the implementation of tee and returns two readers that correctly signal backpressure from the slower branch to the source ReadableStream? Then you could do anything you want with the branches such as calling pipeTo, which is harder to do if you can only create nonexclusive ReadableStreamDefaultReader/ReadableStreamBYOBReaders.

In one way, your proposal is more flexible in that you can dynamically add and remove non-exclusive readers, but is that really what you want?

@ricea
Copy link
Collaborator

ricea commented May 26, 2023

We can't change the semantics of tee() without breaking existing sites. But as @yonran suggests, we can add an option to base backpressure signalling on the slower branch.

While a more general mechanism is interesting, it's complex, and I wouldn't proceed with it unless there were some compelling use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

5 participants