From 7b37da45e8344e78a5d40a8c277cc57d796c1257 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Fri, 24 Jun 2022 17:12:17 -0700 Subject: [PATCH] chore: Add tests for streamx (#58) fix!: Allow end-of-stream to handle the stream error states --- index.js | 4 +-- package.json | 1 + test/streams.js | 4 --- test/streamx.js | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 7 deletions(-) create mode 100644 test/streamx.js diff --git a/index.js b/index.js index 5d0cd80..d835622 100644 --- a/index.js +++ b/index.js @@ -6,9 +6,7 @@ var eos = require('end-of-stream'); var once = require('once'); var exhaust = require('stream-exhaust'); -var eosConfig = { - error: false, -}; +var eosConfig = {}; function rethrowAsync(err) { process.nextTick(rethrow); diff --git a/package.json b/package.json index e62c0f1..6376413 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "nyc": "^15.1.0", "pumpify": "^2.0.1", "rxjs": "^7.4.0", + "streamx": "^2.12.0", "through2": "^4.0.2", "typescript": "^4.4.4" }, diff --git a/test/streams.js b/test/streams.js index 21ef6d7..d5119c6 100644 --- a/test/streams.js +++ b/test/streams.js @@ -41,10 +41,6 @@ function pumpifyError() { var read = fs.createReadStream(exists); var pipeline = pumpify(through(), through(withErr), through()); - pipeline.on('error', function (err) { - throw err; - }); - return read.pipe(pipeline); } diff --git a/test/streamx.js b/test/streamx.js new file mode 100644 index 0000000..71256b3 --- /dev/null +++ b/test/streamx.js @@ -0,0 +1,85 @@ +'use strict'; + +var expect = require('expect'); + +var streamx = require('streamx'); + +var asyncDone = require('../'); + +function success() { + return streamx.Readable.from('Foo Bar Baz').pipe(new streamx.Writable()); +} + +function failure() { + return streamx.Readable.from('Foo Bar Baz').pipe( + new streamx.Writable({ + write: function (data, cb) { + cb(new Error('Fail')); + }, + }) + ); +} + +function pipelineError() { + return streamx.pipeline( + streamx.Readable.from('Foo Bar Baz'), + new streamx.Transform(), + new streamx.Transform({ + transform: function (data, cb) { + cb(new Error('Fail')); + }, + }), + new streamx.Writable() + ); +} + +function unpiped() { + return streamx.Readable.from('Foo Bar Baz'); +} + +describe('streamx streams', function () { + it('should handle a successful stream', function (done) { + asyncDone(success, function (err) { + expect(err).not.toBeInstanceOf(Error); + done(); + }); + }); + + it('should handle an errored stream', function (done) { + asyncDone(failure, function (err) { + expect(err).toBeInstanceOf(Error); + expect(err.message).not.toEqual('premature close'); + done(); + }); + }); + + it('should handle an errored pipeline', function (done) { + asyncDone(pipelineError, function (err) { + expect(err).toBeInstanceOf(Error); + expect(err.message).not.toEqual('premature close'); + done(); + }); + }); + + it('handle a returned stream and cb by only calling callback once', function (done) { + asyncDone( + function (cb) { + return success().on('finish', function () { + cb(null, 3); + }); + }, + function (err, result) { + expect(err).not.toBeInstanceOf(Error); + expect(result).toEqual(3); // To know we called the callback + done(); + } + ); + }); + + it('consumes an unpiped readable stream', function (done) { + asyncDone(unpiped, function (err) { + expect(err).not.toBeInstanceOf(Error); + done(); + }); + }); +});