Skip to content

Commit

Permalink
feat: Allow max concurrency for parallel map with option (#15)
Browse files Browse the repository at this point in the history
chore!: Rename extensions to options and allow other configuration passed

Co-authored-by: Blaine Bublitz <blaine.bublitz@gmail.com>
  • Loading branch information
weswigham and phated committed Jun 25, 2022
1 parent b30c00d commit 92ae356
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 45 deletions.
38 changes: 22 additions & 16 deletions README.md
Expand Up @@ -82,10 +82,10 @@ nal.map(

## API

### `map(values, iterator[, extensions][, callback])`
### `map(values, iterator[, options][, callback])`

Takes an object or array of `values` and an `iterator` function to execute with each value.
Optionally, takes an `extensions` object and a `callback` function that is called upon completion of the iterations.
Optionally, takes an `options` object and a `callback` function that is called upon completion of the iterations.

All iterations run in parallel.

Expand All @@ -105,27 +105,33 @@ The `iterator` function is called once with each `value`, `key` and a function (

If `done` is passed an error as the first argument, the iteration will fail and the sequence will be ended; however, any iterations in progress will still complete. If `done` is passed a `result` value as the second argument, it will be added to the final results array or object.

#### `extensions`
#### `options`

The `extensions` object is used for specifying functions that give insight into the lifecycle of each iteration. The possible extension points are `create`, `before`, `after` and `error`. If an extension point is not specified, it defaults to a no-op function.
The `options` object is primarily used for specifying functions that give insight into the lifecycle of each iteration. The possible extension points are `create`, `before`, `after` and `error`. If an extension point is not specified, it defaults to a no-op function.

##### `extensions.create(value, key)`
The `options` object for `map` also allows specifying `concurrency` in which to run your iterations. By default, your iterations will run at maximum concurrency.

##### `options.concurrency`

Limits the amount of iterations allowed to run at a given time.

##### `options.create(value, key)`

Called at the very beginning of each iteration with the `value` being iterated and the `key` from the array or object. If `create` returns a value (`storage`), it is passed to the `before`, `after` and `error` extension points.

If a value is not returned, an empty object is used as `storage` for each other extension point.

This is useful for tracking information across an iteration.

##### `extensions.before(storage)`
##### `options.before(storage)`

Called immediately before each iteration with the `storage` value returned from the `create` extension point.

##### `extensions.after(result, storage)`
##### `options.after(result, storage)`

Called immediately after each iteration with the `result` of the iteration and the `storage` value returned from the `create` extension point.

##### `extensions.error(error, storage)`
##### `options.error(error, storage)`

Called immediately after a failed iteration with the `error` of the iteration and the `storage` value returned from the `create` extension point.

Expand All @@ -137,10 +143,10 @@ If all iterations completed successfully, the `error` argument will be empty and

If an iteration errored, the `error` argument will be passed from that iteration and the `results` will be whatever partial results had completed successfully before the error occurred.

### `mapSeries(values, iterator[, extensions][, callback])`
### `mapSeries(values, iterator[, options][, callback])`

Takes an object or array of `values` and an `iterator` function to execute with each value.
Optionally, takes an `extensions` object and a `callback` function that is called upon completion of the iterations.
Optionally, takes an `options` object and a `callback` function that is called upon completion of the iterations.

All iterations run in serial.

Expand All @@ -160,27 +166,27 @@ The `iterator` function is called once with each `value`, `key` and a function (

If `done` is passed an error as the first argument, the iteration will fail and the sequence will be ended without executing any more iterations. If `done` is passed a `result` value as the second argument, it will be added to the final results array or object.

#### `extensions`
#### `options`

The `extensions` object is used for specifying functions that give insight into the lifecycle of each iteration. The possible extension points are `create`, `before`, `after` and `error`. If an extension point is not specified, it defaults to a no-op function.
The `options` object is primarily used for specifying functions that give insight into the lifecycle of each iteration. The possible extension points are `create`, `before`, `after` and `error`. If an extension point is not specified, it defaults to a no-op function.

##### `extensions.create(value, key)`
##### `options.create(value, key)`

Called at the very beginning of each iteration with the `value` being iterated and the `key` from the array or object. If `create` returns a value (`storage`), it is passed to the `before`, `after` and `error` extension points.

If a value is not returned, an empty object is used as `storage` for each other extension point.

This is useful for tracking information across an iteration.

##### `extensions.before(storage)`
##### `options.before(storage)`

Called immediately before each iteration with the `storage` value returned from the `create` extension point.

##### `extensions.after(result, storage)`
##### `options.after(result, storage)`

Called immediately after each iteration with the `result` of the iteration and the `storage` value returned from the `create` extension point.

##### `extensions.error(error, storage)`
##### `options.error(error, storage)`

Called immediately after a failed iteration with the `error` of the iteration and the `storage` value returned from the `create` extension point.

Expand Down
12 changes: 6 additions & 6 deletions lib/helpers.js
Expand Up @@ -9,13 +9,13 @@ var defaultExts = {
error: noop,
};

function defaultExtensions(extensions) {
extensions = extensions || {};
function defaultExtensions(options) {
options = options || {};
return {
create: extensions.create || defaultExts.create,
before: extensions.before || defaultExts.before,
after: extensions.after || defaultExts.after,
error: extensions.error || defaultExts.error,
create: options.create || defaultExts.create,
before: options.before || defaultExts.before,
after: options.after || defaultExts.after,
error: options.error || defaultExts.error,
};
}

Expand Down
46 changes: 33 additions & 13 deletions lib/map.js
Expand Up @@ -4,11 +4,11 @@ var once = require('once');

var helpers = require('./helpers');

function map(values, iterator, extensions, done) {
// Allow for extensions to not be specified
if (typeof extensions === 'function') {
done = extensions;
extensions = {};
function map(values, iterator, options, done) {
// Allow for options to not be specified
if (typeof options === 'function') {
done = options;
options = {};
}

// Handle no callback case
Expand All @@ -26,35 +26,55 @@ function map(values, iterator, extensions, done) {
// Return the same type as passed in
var results = helpers.initializeResults(values);

var exts = helpers.defaultExtensions(extensions);
var extensions = helpers.defaultExtensions(options);

if (length === 0) {
return done(null, results);
}

for (idx = 0; idx < length; idx++) {
var key = keys[idx];
next(key);
var maxConcurrent = length;
if (options && options.concurrency) {
maxConcurrent = options.concurrency;
}
var running = 0;
var sync = false;
kickoff();

function kickoff() {
if (sync) {
return;
}
sync = true;
while (running < maxConcurrent && idx < length) {
var key = keys[idx];
next(key);
idx++;
}
sync = false;
}

function next(key) {
running++;
var value = values[key];

var storage = exts.create(value, key) || {};
var storage = extensions.create(value, key) || {};

exts.before(storage);
extensions.before(storage);
iterator(value, key, once(handler));

function handler(err, result) {
running--;
if (err) {
exts.error(err, storage);
extensions.error(err, storage);
return done(err, results);
}

exts.after(result, storage);
extensions.after(result, storage);
results[key] = result;
if (--count === 0) {
done(err, results);
} else {
kickoff();
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions lib/mapSeries.js
Expand Up @@ -4,11 +4,11 @@ var once = require('once');

var helpers = require('./helpers');

function mapSeries(values, iterator, extensions, done) {
// Allow for extensions to not be specified
if (typeof extensions === 'function') {
done = extensions;
extensions = {};
function mapSeries(values, iterator, options, done) {
// Allow for options to not be specified
if (typeof options === 'function') {
done = options;
options = {};
}

// Handle no callback case
Expand All @@ -25,7 +25,7 @@ function mapSeries(values, iterator, extensions, done) {
// Return the same type as passed in
var results = helpers.initializeResults(values);

var exts = helpers.defaultExtensions(extensions);
var extensions = helpers.defaultExtensions(options);

if (length === 0) {
return done(null, results);
Expand All @@ -37,18 +37,18 @@ function mapSeries(values, iterator, extensions, done) {
function next(key) {
var value = values[key];

var storage = exts.create(value, key) || {};
var storage = extensions.create(value, key) || {};

exts.before(storage);
extensions.before(storage);
iterator(value, key, once(handler));

function handler(err, result) {
if (err) {
exts.error(err, storage);
extensions.error(err, storage);
return done(err, results);
}

exts.after(result, storage);
extensions.after(result, storage);
results[key] = result;

if (++idx >= length) {
Expand Down
100 changes: 100 additions & 0 deletions test/map.js
Expand Up @@ -320,4 +320,104 @@ describe('map', function () {
// (last iterator call is triggered after this callback)
});
});

it('runs at maximum concurrency by default', function (done) {
var createOrder = [];
var beforeOrder = [];
var afterOrder = [];

var options = {
create: function (value, idx) {
createOrder.push(idx);
return { idx: idx, value: value };
},
before: function (storage) {
beforeOrder.push(storage.idx);
},
after: function (result, storage) {
afterOrder.push(storage.idx);
},
};

function iterator(value, key, cb) {
setTimeout(function () {
cb(null, value);
}, value * 25);
}

nal.map([10, 5, 1], iterator, options, function (err) {
expect(createOrder).toEqual(['0', '1', '2']);
expect(beforeOrder).toEqual(['0', '1', '2']);
expect(afterOrder).toEqual(['2', '1', '0']);
done(err);
});
});

it('allows limiting concurrency to 1 via options', function (done) {
var createOrder = [];
var beforeOrder = [];
var afterOrder = [];

var options = {
// Concurrency of 1 basically makes a series runner
concurrency: 1,
create: function (value, idx) {
createOrder.push(idx);
return { idx: idx, value: value };
},
before: function (storage) {
beforeOrder.push(storage.idx);
},
after: function (result, storage) {
afterOrder.push(storage.idx);
},
};

function iterator(value, key, cb) {
setTimeout(function () {
cb(null, value);
}, value * 25);
}

nal.map([10, 5, 1], iterator, options, function (err) {
expect(createOrder).toEqual(['0', '1', '2']);
expect(beforeOrder).toEqual(['0', '1', '2']);
expect(afterOrder).toEqual(['0', '1', '2']);
done(err);
});
});

it('allows limiting concurrency to 2 via options', function (done) {
var createOrder = [];
var beforeOrder = [];
var afterOrder = [];

var options = {
// Concurrency of 2
concurrency: 2,
create: function (value, idx) {
createOrder.push(idx);
return { idx: idx, value: value };
},
before: function (storage) {
beforeOrder.push(storage.idx);
},
after: function (result, storage) {
afterOrder.push(storage.idx);
},
};

function iterator(value, key, cb) {
setTimeout(function () {
cb(null, value);
}, value * 25);
}

nal.map([10, 1, 20, 5], iterator, options, function (err) {
expect(createOrder).toEqual(['0', '1', '2', '3']);
expect(beforeOrder).toEqual(['0', '1', '2', '3']);
expect(afterOrder).toEqual(['1', '0', '3', '2']);
done(err);
});
});
});

0 comments on commit 92ae356

Please sign in to comment.