Skip to content

Commit

Permalink
chore: Add tests for streamx (#58)
Browse files Browse the repository at this point in the history
fix!: Allow end-of-stream to handle the stream error states
  • Loading branch information
phated committed Jun 25, 2022
1 parent dfa4f0b commit 7b37da4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 7 deletions.
4 changes: 1 addition & 3 deletions index.js
Expand Up @@ -6,9 +6,7 @@ var eos = require('end-of-stream');
var once = require('once');
var exhaust = require('stream-exhaust');

var eosConfig = {
error: false,
};
var eosConfig = {};

function rethrowAsync(err) {
process.nextTick(rethrow);
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -42,6 +42,7 @@
"nyc": "^15.1.0",
"pumpify": "^2.0.1",
"rxjs": "^7.4.0",
"streamx": "^2.12.0",
"through2": "^4.0.2",
"typescript": "^4.4.4"
},
Expand Down
4 changes: 0 additions & 4 deletions test/streams.js
Expand Up @@ -41,10 +41,6 @@ function pumpifyError() {
var read = fs.createReadStream(exists);
var pipeline = pumpify(through(), through(withErr), through());

pipeline.on('error', function (err) {
throw err;
});

return read.pipe(pipeline);
}

Expand Down
85 changes: 85 additions & 0 deletions test/streamx.js
@@ -0,0 +1,85 @@
'use strict';

var expect = require('expect');

var streamx = require('streamx');

var asyncDone = require('../');

function success() {
return streamx.Readable.from('Foo Bar Baz').pipe(new streamx.Writable());
}

function failure() {
return streamx.Readable.from('Foo Bar Baz').pipe(
new streamx.Writable({
write: function (data, cb) {
cb(new Error('Fail'));
},
})
);
}

function pipelineError() {
return streamx.pipeline(
streamx.Readable.from('Foo Bar Baz'),
new streamx.Transform(),
new streamx.Transform({
transform: function (data, cb) {
cb(new Error('Fail'));
},
}),
new streamx.Writable()
);
}

function unpiped() {
return streamx.Readable.from('Foo Bar Baz');
}

describe('streamx streams', function () {
it('should handle a successful stream', function (done) {
asyncDone(success, function (err) {
expect(err).not.toBeInstanceOf(Error);
done();
});
});

it('should handle an errored stream', function (done) {
asyncDone(failure, function (err) {
expect(err).toBeInstanceOf(Error);
expect(err.message).not.toEqual('premature close');
done();
});
});

it('should handle an errored pipeline', function (done) {
asyncDone(pipelineError, function (err) {
expect(err).toBeInstanceOf(Error);
expect(err.message).not.toEqual('premature close');
done();
});
});

it('handle a returned stream and cb by only calling callback once', function (done) {
asyncDone(
function (cb) {
return success().on('finish', function () {
cb(null, 3);
});
},
function (err, result) {
expect(err).not.toBeInstanceOf(Error);
expect(result).toEqual(3); // To know we called the callback
done();
}
);
});

it('consumes an unpiped readable stream', function (done) {
asyncDone(unpiped, function (err) {
expect(err).not.toBeInstanceOf(Error);
done();
});
});
});

0 comments on commit 7b37da4

Please sign in to comment.