Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_(readable).pipe(writable) does not destroy readable when writable closes #691

Open
richardscarrott opened this issue Jan 10, 2020 · 5 comments

Comments

@richardscarrott
Copy link

richardscarrott commented Jan 10, 2020

I know native Node streams also do not exhibit this behaviour either but they provide pipeline (AKA pump) to ensure proper cleanup. I wondered what the equivalent pattern is for highland.

My particular use case is piping a readable stream to a Node.ServerResponse, e.g.

const express = require('express');
const highland = require('highland');
const { pipeline } = require('stream');

const app = express();

app.get('/highland', (req, res) => {
  const readable = createNodeReadableStream();
  _(readable).pipe(res);
  res.destroy();
  setInterval(() => {
     console.log(aNodeReadableStream.destroyed) // Always false, even when `res` is closed 😔
  }, 1000);
});

app.get('/node-stream', (req, res) => {
  const readable = createNodeReadableStream();
  pipeline(readable, res, (ex) => {
     ex && console.error(ex);
  });
  res.destroy();
  setInterval(() => {
     console.log(aNodeReadableStream.destroyed) // Becomes true when `res` closes 🙂
  }, 1000);
});

If res is destroyed, either explicitly as above or for example by the browser cancelling the request, readable is not destroyed and therefore any cleanup code is not run -- it will continue to write until it's buffer reaches the highWaterMark.

@vqvu
Copy link
Collaborator

vqvu commented Jan 11, 2020 via email

@richardscarrott
Copy link
Author

Unfortunately not, however keeping readable and writable outside of highland by composing transforms with _.pipeline seems to work, e.g.

const _ = require('highland');
const { pipeline } = require('stream');
const readable = createNodeReadableStream();
const transform = _.pipeline(
  _.filter((chunk) => !chunk.includes('foo')),
  _.map((chunk) => `::${chunk}::`)
);
pipeline(readable, transform, res, (ex) => {
   if (err) {
     throw err;
   } 
  console.log('DONE');
})

As an aside, I noticed the Stream methods (Stream.prototype.map / Stream.prototype.filter etc.) return a new Stream instance, whereas _.map, _.filter appear to be composed into a single Stream with _.pipeline; are they functionally any different?

@richardscarrott
Copy link
Author

richardscarrott commented Jan 11, 2020

Although I imagine you'd lose a lot of expressiveness by avoiding creating readable streams in highland so it's not an ideal solution. e.g.

const readFile = _.wrapCallback(fs.readFile);
const fileNames = ['foo.js', 'bar.js'];
const readable = _(fileNames).map(readFile).sequence();
// The `readFile` streams are not destroyed when `res` is closed 😔
pipeline(readable, res, (ex) => { console.log('DONE', ex) });

I guess you could manually track them, e.g.

let readFileStreams = [];
const readable = _(fileNames).map(readFile).tap((s) => readFileStreams.push(s)).sequence().pipe(res);
res.on('close', () => {
    readFileStreams.forEach((s) => s.destroy())
})

But that's not too pretty...

@vqvu
Copy link
Collaborator

vqvu commented Jan 11, 2020

Now that I think about it, _(readable) uses pipe under the hood, so of course it wouldn't work...

Making this work correctly is actually pretty difficult. Highland steams are lazy-by-default, so the code doesn't do a good job of propagating destroy events back up the pipeline---there's not usually a need to if the values are generated lazily. We've tried to make this work before, but the complexity of propagating destroy events up through forks in the pipeline (i.e., merge and fork) caused it to stall out.

For now, I would say that you should do what you suggest in #691 (comment) and use _.pipeline alongside and stream.pipeline to destroy the original stream. You can do this without sacrificing the chaining syntax by structuring your code like so:

const _ = require('highland');
const { pipeline } = require('stream');

function transformReadable(stream) {
  // In this function, you can pretend like you have a regular Highland stream that's
  // correctly wrapping the Readable.
  return stream
      .filter((chunk) => !chunk.includes('foo'))
      .map((chunk) => `::${chunk}::`);
}

const readable = createNodeReadableStream();
pipeline(readable, _.pipeline(transformReadable), res, (ex) => {
 if (err) {
   throw err;
 } 
  console.log('DONE');
});

The above won't work if you need to merge multiple Readables or fork into multiple Writables. In those cases, you'll need to manage stream destruction manually.


I noticed the Stream methods (Stream.prototype.map / Stream.prototype.filter etc.) return a new Stream instance, whereas _.map, _.filter appear to be composed into a single Stream with _.pipeline; are they functionally any different?

Functionally, they're the same. However, semantically, you should think of _.pipeline as returning a Node Duplex stream that encapsulates all of the transforms that you passed to _.pipeline (even though the current implementation actually returns a Highland stream that pretends to be a Duplex stream).

richardscarrott added a commit to richardscarrott/highland that referenced this issue Jan 12, 2020
@richardscarrott
Copy link
Author

richardscarrott commented Jan 12, 2020

Making this work correctly is actually pretty difficult. Highland steams are lazy-by-default, so the code doesn't do a good job of propagating destroy events back up the pipeline---there's not usually a need to if the values are generated lazily. We've tried to make this work before, but the complexity of propagating destroy events up through forks in the pipeline (i.e., merge and fork) caused it to stall out.

I had a (naive) go at implementing it just so I could understand the difficulties a bit better if you want to take a look -- #692

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants