New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
suggestion: concatReadableStreams()
#4500
Comments
While composing multiple readable streams into a single one I found that in some cases I need to generate the streams asynchronously. This lead me to a different implementation with a signature more like type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
function concatReadableStreams<T>(
streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
return new ReadableStream({
async start(controller) {
try {
if (Symbol.asyncIterator in streams) {
for await (const stream of streams) {
for await (const chunk of stream) {
controller.enqueue(chunk);
}
}
} else {
// use `for...of` instead of `for await...of` to call sync generator `finally` blocks
for (const stream of streams) {
for await (const chunk of await stream) {
controller.enqueue(chunk);
}
}
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
} @@ -1,12 +1,23 @@
+type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
+
function concatReadableStreams<T>(
- ...streams: ReadableStream<T>[]
+ streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
return new ReadableStream({
async start(controller) {
try {
- for (const stream of streams) {
- for await (const chunk of stream) {
- controller.enqueue(chunk);
+ if (Symbol.asyncIterator in streams) {
+ for await (const stream of streams) {
+ for await (const chunk of stream) {
+ controller.enqueue(chunk);
+ }
+ }
+ } else {
+ // use `for...of` instead of `for await...of` to call sync generator `finally` blocks
+ for (const stream of streams) {
+ for await (const chunk of await stream) {
+ controller.enqueue(chunk);
+ }
}
}
controller.close();
|
Hmmm… I found a different way to concatenate streams by actually using class FlatStream<T> extends TransformStream<ReadableStream<T>, T> {
constructor() {
super({
async transform(stream, controller) {
for await (const chunk of stream) {
controller.enqueue(chunk);
}
},
});
}
}
type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
function concatReadableStreams<T>(
streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
return ReadableStream.from(streams).pipeThrough(new FlatStream());
} |
Are you able to provide a few use cases for this suggestion? |
Examples:
keeping things as streams reduces max memory usage, etc. |
Seems reasonable to have 👍🏾 |
Isn't the below structure enough easy/straightforward to understand? for await (const stream of streams) {
for await (const chunk of stream) {
// process chunk
}
} I don't see much point to add this as a util |
@kt3k with the If I want to process chunks then the looping makes perfect sense to me but if I want to stream multiple readable streams combined together to a file or to a network response then the loops themselves are not helpful (although may be part of the solution). |
Fair enough |
Is your feature request related to a problem? Please describe.
I want to combine multiple
ReadableStream
instances into a singleReadableStream
instance where each stream is read fully in sequence.Describe the solution you'd like
A function to go along with
earlyZipReadableStreams
,mergeReadableStreams
, andzipReadableStreams
but processes streams in order, one at a time.e.g.
Describe alternatives you've considered
I wonder if this isn't already included because there's some reason not to do this with streams or that there's already a simple way to do this built into the JS Streams API that I have yet to find.
The text was updated successfully, but these errors were encountered: