Skip to content

Commit

Permalink
Merge remote-tracking branch 'tapppi/flush-to-final' into merge-tappp…
Browse files Browse the repository at this point in the history
…i-flush-to-final

I respectfully merged Tapppi/parallel-transform/tree/flush-to-final

Refs. mafintosh#4
Refs. mafintosh#6

# Conflicts:
#	index.js
#	package.json
  • Loading branch information
piglovesyou committed Sep 29, 2019
2 parents c5e9b8d + 9d5d82d commit 0af65f9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 14 deletions.
32 changes: 30 additions & 2 deletions index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { pipeline, Readable, Transform } from 'readable-stream';
import transform from './index';

describe('transform', () => {
it('should emit pipeline callback with sync function', async () => {
it('should emit pipeline callback with synchronous stream ', async () => {
const expected = [ 2, 3, 4, 5, 6 ];
const actual: number[] = [];

Expand Down Expand Up @@ -32,7 +32,7 @@ describe('transform', () => {
assert.deepStrictEqual(actual, expected);
}, 10 * 1000);

it.skip('should emit pipeline callback with async function', async () => {
it('should emit pipeline callback with asynchronous stream', async () => {
const expected = [ 2, 3, 4, 5, 6 ];
const actual: number[] = [];

Expand Down Expand Up @@ -62,6 +62,34 @@ describe('transform', () => {
assert.deepStrictEqual(actual, expected);
}, 10 * 1000);

it('should fix mafintosh/parallel-transform##4, emit "finish" after all buffer is consumed', async () => {
const expectedArray = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
const actualArray: number[] = [];
let finished = false;

await new Promise(resolve => {
const stream = transform(10, function (data, callback) {
setTimeout(function () {
actualArray.push(data);
callback(undefined, data);
}, data );
});

for (let i = 0; i < 10; i++) {
stream.write(i);
}
stream.end();

stream.on('finish', function () {
assert.deepStrictEqual(actualArray, expectedArray);
finished = true;
resolve();
});
});

assert(finished);
});

it('should run in parallel', async () => {
const acceptableOffset = 200;
const tookExpected = 1000 + acceptableOffset;
Expand Down
24 changes: 12 additions & 12 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ export class ParallelTransform extends Transform {
private _destroyed: boolean;
private _maxParallel: number;
private _ontransform: OnTransformFn;
private _flushed: boolean;
private _finishing: boolean;
private _ordered: boolean;
private _buffer: Cyclist<any> | Array<any>;
private _top: number;
private _bottom: number;
private _ondrain: null | Function;
private ondrain: null | Function;

constructor(
maxParallel: number,
Expand All @@ -36,12 +36,12 @@ export class ParallelTransform extends Transform {
this._destroyed = false;
this._maxParallel = maxParallel;
this._ontransform = ontransform;
this._flushed = false;
this._finishing = false;
this._ordered = opts.ordered !== false;
this._buffer = this._ordered ? cyclist(maxParallel) : [];
this._top = 0;
this._bottom = 0;
this._ondrain = null;
this.ondrain = null;
}

destroy(err?: Error | undefined, callback?: ((error: Error | null) => void) | undefined): this {
Expand Down Expand Up @@ -71,12 +71,12 @@ export class ParallelTransform extends Transform {
});

if (this._top - this._bottom < this._maxParallel) return callback();
this._ondrain = callback;
this.ondrain = callback;
}

_flush(callback) {
this._flushed = true;
this._ondrain = callback;
_final(callback) {
this._finishing = true;
this.ondrain = callback;
this._drain();
}

Expand All @@ -96,16 +96,16 @@ export class ParallelTransform extends Transform {
}
}

if (!this._drained() || !this._ondrain) return;
if (!this._drained() || !this.ondrain) return;

const ondrain = this._ondrain;
this._ondrain = null;
const ondrain = this.ondrain;
this.ondrain = null;
ondrain();
};

_drained() {
const diff = this._top - this._bottom;
return this._flushed ? !diff : diff < this._maxParallel;
return this._finishing ? !diff : diff < this._maxParallel;
}
}

Expand Down

0 comments on commit 0af65f9

Please sign in to comment.