Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Piped streams not flowing with batch #617

Open
Sintendo opened this issue Apr 24, 2017 · 1 comment
Open

Piped streams not flowing with batch #617

Sintendo opened this issue Apr 24, 2017 · 1 comment

Comments

@Sintendo
Copy link

We're dealing with an issue where piping streams through a highland batched stream causes it to halt. The actual code involves streaming data from HTTP, but I think I've managed to narrow it down to the following code.

let {Transform, Writable} = require('stream');
let _ = require('highland');

// Dummy transform
class NullTransform extends Transform {
  constructor(options) {
    options = options || {};
    options.writableObjectMode = true;
    options.readableObjectMode = true;
    super(options);
  }
  _transform(chunk, encoding, callback) {
    console.log('transform', typeof chunk);
    this.push(chunk);
    callback();
  }
  _flush(callback) {
    callback();
  }
}
// Dummy writer
class NullWriter extends Writable {
  constructor(options) {
    options = options || {};
    options.objectMode = true;
    // options.highWaterMark = 1;
    super(options);
    this._storage = [];
  }

  _write(chunk, encoding, callback) {
    console.log('write', chunk, typeof chunk, chunk.length);
    callback();
  }
}

// Infinite stream of random numbers
const readStream = _({
  next() {
    console.log('next!');
    return {
      done: false,
      value: Math.random(),
    };
  }});
const transform = new NullTransform();
const writer = new NullWriter();

This will cause the application to exit:

readStream
  .pipe(_().batch(1000))
  .pipe(writer);

Strangely, adding the dummy transform before the batching causes it to flow properly:

readStream
  .pipe(transform)
  .pipe(_().batch(1000))
  .pipe(writer);

Am I missing something?

@vqvu
Copy link
Collaborator

vqvu commented Apr 24, 2017

At first glance, it looks like the problem is in pipe(_().batch(1000)). The only Highland stream that you can pipe to is the one that is returned by _() or _.pipeline(...). When you call batch, it returns a new stream that is read-only, so you can't pipe to that stream.

It's pretty rare that you'll need to use _() at all. In this case, since readStream is already a Highland stream, you can call batch on it directly.

readStream
  .batch(1000)
  .pipe(writer);

If readStream is a Node stream (since you mentioned HTTP streaming), you can create a Highland stream of out it using the Highland stream constructor.

_(readStream)
  .batch(1000)
  .pipe(writer);

I don't know why the code you showed works when you add the dummy transform, but it only works by coincidence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants