Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

suggestion: concatReadableStreams() #4500

Open
mfulton26 opened this issue Mar 17, 2024 · 8 comments
Open

suggestion: concatReadableStreams() #4500

mfulton26 opened this issue Mar 17, 2024 · 8 comments
Labels
feedback welcome We want community's feedback on this issue or PR

Comments

@mfulton26
Copy link
Contributor

Is your feature request related to a problem? Please describe.

I want to combine multiple ReadableStream instances into a single ReadableStream instance where each stream is read fully in sequence.

Describe the solution you'd like

A function to go along with earlyZipReadableStreams, mergeReadableStreams, and zipReadableStreams but processes streams in order, one at a time.

e.g.

function concatReadableStreams<T>(
  ...streams: ReadableStream<T>[]
): ReadableStream<T> {
  return new ReadableStream({
    async start(controller) {
      try {
        for (const stream of streams) {
          for await (const chunk of stream) {
            controller.enqueue(chunk);
          }
        }
        controller.close();
      } catch (e) {
        controller.error(e);
      }
    },
  });
}

Describe alternatives you've considered

I wonder if this isn't already included because there's some reason not to do this with streams or that there's already a simple way to do this built into the JS Streams API that I have yet to find.

@mfulton26
Copy link
Contributor Author

While composing multiple readable streams into a single one I found that in some cases I need to generate the streams asynchronously. This lead me to a different implementation with a signature more like ReadableStream.from() instead of like mergeReadableStreams:

type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;

function concatReadableStreams<T>(
  streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
  return new ReadableStream({
    async start(controller) {
      try {
        if (Symbol.asyncIterator in streams) {
          for await (const stream of streams) {
            for await (const chunk of stream) {
              controller.enqueue(chunk);
            }
          }
        } else {
          // use `for...of` instead of `for await...of` to call sync generator `finally` blocks
          for (const stream of streams) {
            for await (const chunk of await stream) {
              controller.enqueue(chunk);
            }
          }
        }
        controller.close();
      } catch (e) {
        controller.error(e);
      }
    },
  });
}
@@ -1,12 +1,23 @@
+type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;
+
 function concatReadableStreams<T>(
-  ...streams: ReadableStream<T>[]
+  streams: AnyIterable<ReadableStream<T>>,
 ): ReadableStream<T> {
   return new ReadableStream({
     async start(controller) {
       try {
-        for (const stream of streams) {
-          for await (const chunk of stream) {
-            controller.enqueue(chunk);
+        if (Symbol.asyncIterator in streams) {
+          for await (const stream of streams) {
+            for await (const chunk of stream) {
+              controller.enqueue(chunk);
+            }
+          }
+        } else {
+          // use `for...of` instead of `for await...of` to call sync generator `finally` blocks
+          for (const stream of streams) {
+            for await (const chunk of await stream) {
+              controller.enqueue(chunk);
+            }
           }
         }
         controller.close();

@mfulton26
Copy link
Contributor Author

Hmmm… I found a different way to concatenate streams by actually using ReadableStream.from() and then using pipeThrough() with a TransformStream to flatten it. I'm not sure yet which solution I prefer though.

class FlatStream<T> extends TransformStream<ReadableStream<T>, T> {
  constructor() {
    super({
      async transform(stream, controller) {
        for await (const chunk of stream) {
          controller.enqueue(chunk);
        }
      },
    });
  }
}

type AnyIterable<T> = AsyncIterable<T> | Iterable<T | PromiseLike<T>>;

function concatReadableStreams<T>(
  streams: AnyIterable<ReadableStream<T>>,
): ReadableStream<T> {
  return ReadableStream.from(streams).pipeThrough(new FlatStream());
}

@iuioiua iuioiua added the feedback welcome We want community's feedback on this issue or PR label Mar 18, 2024
@iuioiua
Copy link
Collaborator

iuioiua commented Mar 18, 2024

Are you able to provide a few use cases for this suggestion?

@iuioiua iuioiua changed the title concatReadableStreams suggestion: concatReadableStreams() Mar 18, 2024
@mfulton26
Copy link
Contributor Author

Examples:

  • streaming multiple files into one stream (e.g. to a file or network response)
  • adding content prefix to a response stream from a fetch Response

keeping things as streams reduces max memory usage, etc.

@iuioiua
Copy link
Collaborator

iuioiua commented Mar 25, 2024

Seems reasonable to have 👍🏾

@kt3k
Copy link
Member

kt3k commented Mar 26, 2024

Isn't the below structure enough easy/straightforward to understand?

for await (const stream of streams) {
  for await (const chunk of stream) {
    // process chunk
  }
}

I don't see much point to add this as a util

@mfulton26
Copy link
Contributor Author

@kt3k with the for…await loops how do you use it with pipeTo() and/or pipeThrough()?

If I want to process chunks then the looping makes perfect sense to me but if I want to stream multiple readable streams combined together to a file or to a network response then the loops themselves are not helpful (although may be part of the solution).

@kt3k
Copy link
Member

kt3k commented Mar 27, 2024

Fair enough

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feedback welcome We want community's feedback on this issue or PR
Projects
None yet
Development

No branches or pull requests

3 participants