Skip to content

Commit

Permalink
fix: Ensure project works with different streams (#8)
Browse files Browse the repository at this point in the history
chore: Add tests for using lead with different streams
fix: Use listenerCount API on streams
chore: Remove through2 from the examples
chore: Skip failing node versions & explain why
feat!: Remove piping to a Writable and instead call resume on stream
  • Loading branch information
phated committed Sep 22, 2022
1 parent 8b0c1ec commit 27324d6
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 245 deletions.
13 changes: 7 additions & 6 deletions README.md
Expand Up @@ -13,17 +13,18 @@ Sink your streams.
## Usage

```js
var from = require('from2');
var through = require('through2');
var { Readable, Transform } = require('streamx');
var sink = require('lead');

// Might be used as a Transform or Writeable
var maybeThrough = through(function (chunk, enc, cb) {
// processing
cb(null, chunk);
var maybeThrough = new Transform({
transform(chunk, cb) {
// processing
cb(null, chunk);
},
});

from(['hello', 'world'])
Readable.from(['hello', 'world'])
// Sink it to behave like a Writeable
.pipe(sink(maybeThrough));
```
Expand Down
21 changes: 9 additions & 12 deletions index.js
@@ -1,20 +1,12 @@
'use strict';

var Writable = require('streamx').Writable;

function listenerCount(stream, evt) {
return stream.listeners(evt).length;
}

function hasListeners(stream) {
return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data'));
return !!(stream.listenerCount('readable') || stream.listenerCount('data'));
}

function sink(stream) {
var sinkAdded = false;

var sinkStream = new Writable();

function addSink() {
if (sinkAdded) {
return;
Expand All @@ -25,7 +17,7 @@ function sink(stream) {
}

sinkAdded = true;
stream.pipe(sinkStream);
stream.resume();
}

function removeSink(evt) {
Expand All @@ -35,13 +27,18 @@ function sink(stream) {

if (hasListeners(stream)) {
sinkAdded = false;
stream.unpipe(sinkStream);
}

process.nextTick(addSink);
}

function markSink() {
sinkAdded = true;
}

stream.on('newListener', removeSink);
stream.on('removeListener', removeSink);
stream.on('removeListener', addSink);
stream.on('piping', markSink);

// Sink the stream to start flowing
// Do this on nextTick, it will flow at slowest speed of piped streams
Expand Down
9 changes: 4 additions & 5 deletions package.json
Expand Up @@ -21,17 +21,16 @@
"pretest": "npm run lint",
"test": "nyc mocha --async-only"
},
"dependencies": {
"streamx": "^2.12.0"
},
"dependencies": {},
"devDependencies": {
"eslint": "^7.32.0",
"eslint-config-gulp": "^5.0.1",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.4.2",
"mississippi": "^4.0.0",
"mocha": "^8.4.0",
"nyc": "^15.1.0"
"nyc": "^15.1.0",
"readable-stream": "^3.6.0",
"streamx": "^2.12.0"
},
"nyc": {
"reporter": [
Expand Down

0 comments on commit 27324d6

Please sign in to comment.