Skip to content

Commit

Permalink
fixup! support iterable objects: String Map Set
Browse files Browse the repository at this point in the history
  • Loading branch information
o-rumiantsev committed May 3, 2019
1 parent 2e98b99 commit 3c1d416
Show file tree
Hide file tree
Showing 12 changed files with 709 additions and 942 deletions.
181 changes: 71 additions & 110 deletions lib/array.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,29 @@ const { asyncIter } = require('./async-iterator.js');
const { promisify } = require('util');

// Asynchronous map (iterate parallel)
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>, to be executed for each value in the array
// current - <any>, current element being processed in the array
// callback - <Function>
// err - <Error> | <null>
// value - <any>
// done - <Function>, on done, optional
// err - <Error> | <null>
// result - <Array>
const map = (items, fn, done) => {
done = done || common.emptyness;
// result - <Iterable>
const map = (items, fn, done = common.emptyness) => {
if (!items[Symbol.iterator]) {
done(new TypeError('items is not iterable'));
done(new TypeError('"items" argument is not iterable'));
return;
}
const isArray = Array.isArray(items);
const data = asyncIter(items).map(item => promisify(fn)(item));
const promise = isArray ? data.toArray() : data.collectTo(items.constructor);
promise.then(res => done(null, res), done);
asyncIter(items)
.parallel(item => promisify(fn)(item))
.then(res => done(null, isArray ? res : new items.constructor(res)))
.catch(done);
};

const DEFAULT_OPTIONS = { min: 5, percent: 0.7 };

// Non-blocking synchronous map
// items - <Array>, incoming dataset
// items - <Iterable>, incoming dataset
// fn - <Function>
// item - <any>
// index - <number>
Expand All @@ -38,90 +36,53 @@ const DEFAULT_OPTIONS = { min: 5, percent: 0.7 };
// percent - <number>, ratio of map time to all time
// done - <Function>, call on done
// err - <Error> | <null>
// result - <Array>
const asyncMap = (items, fn, options = {}, done) => {
// result - <Iterable>
const asyncMap = (items, fn, options = {}, done = common.emptyness) => {
if (typeof options === 'function') {
done = options;
options = DEFAULT_OPTIONS;
options = {};
}

const len = items.length || items.size;
const name = items.constructor.name;
let result = done ? new items.constructor() : null;
const data = common.iter(items);

if (!len || result === null) {
if (done) done(null, result);
if (!items[Symbol.iterator]) {
done(new TypeError('"items" argument is not iterable'));
return;
}

const min = options.min || DEFAULT_OPTIONS.min;
const percent = options.percent || DEFAULT_OPTIONS.percent;

let begin;
let sum = 0;
let count = 0;

const ratio = percent / (1 - percent);

const countNumber = () => {
const loopTime = Date.now() - begin;
const itemTime = sum / count;
const necessaryNumber = (ratio * loopTime) / itemTime;
return Math.max(necessaryNumber, min);
};

const next = () => {
const itemsNumber = count ? countNumber() : min;
const iterMax = Math.min(len, itemsNumber + count);

begin = Date.now();
for (; count < iterMax; count++) {
const itemResult = fn(data.next().value, count);
if (done) {
if (name === 'String') result += itemResult;
else if (name === 'Array') result.push(itemResult);
else if (name === 'Set') result.add(itemResult);
else result.set(itemResult);
}
}
sum += Date.now() - begin;

if (count < len) {
begin = Date.now();
setTimeout(next, 0);
} else if (done) {
done(null, result);
}
};

next();
const isArray = Array.isArray(items);
const iter = asyncIter(items)
.map(item => promisify(fn)(item))
.throttle(options.percent, options.min);
const collect = isArray ? iter.toArray() : iter.collectTo(items.constructor);
collect.then(res => done(null, res)).catch(done);
};

// Asynchronous filter (iterate parallel)
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>, to be executed for each value in the array
// value - <any>, item from items array
// callback - <Function>
// err - <Error> | <null>
// accepted - <boolean>
// done - <Function>, on done, optional
// err - <Error> | <null>
// result - <Array>
const filter = (items, fn, done) => {
done = done || common.emptyness;
// result - <Iterable>
const filter = (items, fn, done = common.emptyness) => {
if (!items[Symbol.iterator]) {
done(new TypeError('items is not iterable'));
done(new TypeError('"items" argument is not iterable'));
return;
}
const isArray = Array.isArray(items);
const data = asyncIter(items).filter(item => promisify(fn)(item));
const promise = isArray ? data.toArray() : data.collectTo(items.constructor);
promise.then(res => done(null, res), err => done(err));
asyncIter(items)
.parallel(async item => [await promisify(fn)(item), item])
.then(res => {
const filtered = res
.filter(([predicateResult]) => predicateResult)
.map(([, item]) => item);
done(null, isArray ? filtered : new items.constructor(filtered));
})
.catch(done);
};

// Asynchronous reduce
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>, to be executed for each value in array
// previous - <any>, value previously returned in the last iteration
// current - <any>, current element being processed in the array
Expand All @@ -131,21 +92,21 @@ const filter = (items, fn, done) => {
// data - <any>, resulting value
// counter - <number>, index of the current element
// being processed in array
// items - <Array>, the array reduce was called upon
// items - <Iterable>, the array reduce was called upon
// done - <Function>, on done, optional
// err - <Error> | <null>
// result - <Array>
// result - <Iterable>
// initial - <any>, optional value to be used as first
// argument in first iteration
const reduce = (items, fn, done, initial) => {
done = done || common.emptyness;
const reduce = (items, fn, done = common.emptyness, initial) => {
asyncIter(items)
.reduce((prev, cur) => promisify(fn)(prev, cur), initial)
.then(r => done(null, r), done);
.then(res => done(null, res))
.catch(done);
};

// Asynchronous reduceRight
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>, to be executed for each value in array
// previous - <any>, value previously returned in the last iteration
// current - <any>, current element being processed in the array
Expand All @@ -155,53 +116,53 @@ const reduce = (items, fn, done, initial) => {
// data - <any>, resulting value
// counter - <number>, index of the current element
// being processed in array
// items - <Array>, the array reduce was called upon
// items - <Iterable>, the array reduce was called upon
// done - <Function>, on done, optional
// err - <Error> | <null>
// result - <Array>
// result - <Iterable>
// initial - <any>, optional value to be used as first
// argument in first iteration
const reduceRight = (items, fn, done, initial) => {
done = done || common.emptyness;
const reduceRight = (items, fn, done = common.emptyness, initial) => {
asyncIter(items)
.reduceRight((prev, cur) => promisify(fn)(prev, cur), initial)
.then(r => done(null, r), done);
.then(res => done(null, res))
.catch(done);
};

// Asynchronous each (iterate in parallel)
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>
// value - <any>, item from items array
// callback - <Function>
// err - <Error> | <null>
// done - <Function>, on done, optional
// err - <Error> | <null>
// items - <Array>
const each = (items, fn, done) => {
done = done || common.emptyness;
// items - <Iterable>
const each = (items, fn, done = common.emptyness) => {
asyncIter(items)
.parallel(item => promisify(fn)(item))
.then(r => done(null, r), done);
.then(res => done(null, res))
.catch(done);
};

// Asynchronous series
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>
// value - <any>, item from items array
// callback - <Function>
// err - <Error> | <null>
// done - <Function>, on done, optional
// err - <Error> | <null>
// items - <Array>
const series = (items, fn, done) => {
done = done || common.emptyness;
// items - <Iterable>
const series = (items, fn, done = common.emptyness) => {
asyncIter(items)
.each(item => promisify(fn)(item))
.then(r => done(null, r), done);
.then(res => done(null, res))
.catch(done);
};

// Asynchronous find (iterate in series)
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>,
// value - <any>, item from items array
// callback - <Function>
Expand All @@ -210,16 +171,15 @@ const series = (items, fn, done) => {
// done - <Function>, on done, optional
// err - <Error> | <null>
// result - <any>
const find = (items, fn, done) => {
done = done || common.emptyness;
const find = (items, fn, done = common.emptyness) => {
asyncIter(items)
.find(item => promisify(fn)(item))
.then(r => done(null, r))
.catch(e => done(e));
.then(res => done(null, res))
.catch(done);
};

// Asynchronous every
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>,
// value - <any>, item from items array
// callback - <Function>
Expand All @@ -228,16 +188,18 @@ const find = (items, fn, done) => {
// done - <Function>, on done, optional
// err - <Error> | <null>
// result - <boolean>
const every = (items, fn, done) => {
done = done || common.emptyness;
const every = (items, fn, done = common.emptyness) => {
asyncIter(items)
.every(item => promisify(fn)(item))
.then(r => done(null, r))
.catch(e => done(e));
.parallel(item => promisify(fn)(item))
.then(res => {
const accepted = res.every(predicateResult => predicateResult);
done(null, accepted);
})
.catch(done);
};

// Asynchronous some (iterate in series)
// items - <Array>, incoming
// items - <Iterable>, incoming
// fn - <Function>
// value - <any>, item from items array
// callback - <Function>
Expand All @@ -246,12 +208,11 @@ const every = (items, fn, done) => {
// done - <Function>, on done
// err - <Error> | <null>
// result - <boolean>
const some = (items, fn, done) => {
done = done || common.emptyness;
const some = (items, fn, done = common.emptyness) => {
asyncIter(items)
.some(item => promisify(fn)(item))
.then(r => done(null, r))
.catch(e => done(e));
.then(res => done(null, res))
.catch(done);
};

module.exports = {
Expand Down
16 changes: 8 additions & 8 deletions lib/async-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ class AsyncIterator {
return this;
}

throttle(percent) {
return new ThrottleIterator(this, percent);
throttle(percent, min) {
return new ThrottleIterator(this, percent, min);
}

enumerate() {
Expand Down Expand Up @@ -421,29 +421,29 @@ class ThrottleIterator extends AsyncIterator {
this.ratio = percent / (1 - percent);

this.sum = 0;
this.count = 0;
this.iterCount = 0;
this.begin = Date.now();
this.iterMax = this.min;
}

async next() {
if (this.iterMax > this.count) {
this.count++;
if (this.iterMax > this.iterCount) {
this.iterCount++;
return await this.base.next();
}

this.sum += Date.now() - this.begin;
const itemTime = this.sum / this.count;
const itemTime = this.sum / this.iterCount;

this.begin = Date.now();
await timeout();
const loopTime = Date.now() - this.begin;

const number = Math.max((this.ratio * loopTime) / itemTime, this.min);

this.iterMax = Math.round(number) + this.count;
this.iterMax = Math.round(number) + this.iterCount;

this.count++;
this.iterCount++;
this.begin = Date.now();
return await this.base.next();
}
Expand Down

0 comments on commit 3c1d416

Please sign in to comment.