Skip to content

Commit

Permalink
Consistently respect the configured concurrency (#81)
Browse files Browse the repository at this point in the history
* Respect the concurrency when resuming a paused queue

* Also respect a decrease of concurrency

* Guard against non-numerical concurrency values

* Forgo the need to pause / resume queues when increasing their concurrency

* Consistently guard against non-numerical concurrency values

* Avoid using the getter unnecessarily

---------

Co-authored-by: Mart Jansink <mart@cinemait.nl>
  • Loading branch information
mart-jansink and Mart Jansink committed Jan 26, 2024
1 parent b8d9920 commit 62eb43e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 27 deletions.
40 changes: 28 additions & 12 deletions queue.js
Expand Up @@ -4,15 +4,15 @@

var reusify = require('reusify')

function fastqueue (context, worker, concurrency) {
function fastqueue (context, worker, _concurrency) {
if (typeof context === 'function') {
concurrency = worker
_concurrency = worker
worker = context
context = null
}

if (concurrency < 1) {
throw new Error('fastqueue concurrency must be greater than 1')
if (!(_concurrency >= 1)) {
throw new Error('fastqueue concurrency must be equal to or greater than 1')
}

var cache = reusify(Task)
Expand All @@ -27,7 +27,23 @@ function fastqueue (context, worker, concurrency) {
saturated: noop,
pause: pause,
paused: false,
concurrency: concurrency,

get concurrency () {
return _concurrency
},
set concurrency (value) {
if (!(value >= 1)) {
throw new Error('fastqueue concurrency must be equal to or greater than 1')
}
_concurrency = value

if (self.paused) return
for (; queueHead && _running < _concurrency;) {
_running++
release()
}
},

running: running,
resume: resume,
idle: idle,
Expand Down Expand Up @@ -77,7 +93,7 @@ function fastqueue (context, worker, concurrency) {
function resume () {
if (!self.paused) return
self.paused = false
for (var i = 0; i < self.concurrency; i++) {
for (; queueHead && _running < _concurrency;) {
_running++
release()
}
Expand All @@ -96,7 +112,7 @@ function fastqueue (context, worker, concurrency) {
current.callback = done || noop
current.errorHandler = errorHandler

if (_running === self.concurrency || self.paused) {
if (_running >= _concurrency || self.paused) {
if (queueTail) {
queueTail.next = current
queueTail = current
Expand All @@ -120,7 +136,7 @@ function fastqueue (context, worker, concurrency) {
current.callback = done || noop
current.errorHandler = errorHandler

if (_running === self.concurrency || self.paused) {
if (_running >= _concurrency || self.paused) {
if (queueHead) {
current.next = queueHead
queueHead = current
Expand All @@ -140,7 +156,7 @@ function fastqueue (context, worker, concurrency) {
cache.release(holder)
}
var next = queueHead
if (next) {
if (next && _running <= _concurrency) {
if (!self.paused) {
if (queueTail === queueHead) {
queueTail = null
Expand Down Expand Up @@ -203,9 +219,9 @@ function Task () {
}
}

function queueAsPromised (context, worker, concurrency) {
function queueAsPromised (context, worker, _concurrency) {
if (typeof context === 'function') {
concurrency = worker
_concurrency = worker
worker = context
context = null
}
Expand All @@ -217,7 +233,7 @@ function queueAsPromised (context, worker, concurrency) {
}, cb)
}

var queue = fastqueue(context, asyncWrapper, concurrency)
var queue = fastqueue(context, asyncWrapper, _concurrency)

var pushCb = queue.push
var unshiftCb = queue.unshift
Expand Down
74 changes: 59 additions & 15 deletions test/test.js
Expand Up @@ -6,10 +6,22 @@ var test = require('tape')
var buildQueue = require('../')

test('concurrency', function (t) {
t.plan(2)
t.plan(6)
t.throws(buildQueue.bind(null, worker, 0))
t.throws(buildQueue.bind(null, worker, NaN))
t.doesNotThrow(buildQueue.bind(null, worker, 1))

var queue = buildQueue(worker, 1)
t.throws(function () {
queue.concurrency = 0
})
t.throws(function () {
queue.concurrency = NaN
})
t.doesNotThrow(function () {
queue.concurrency = 2
})

function worker (arg, cb) {
cb(null, true)
}
Expand Down Expand Up @@ -137,10 +149,11 @@ test('drain', function (t) {
})

test('pause && resume', function (t) {
t.plan(7)
t.plan(13)

var queue = buildQueue(worker, 1)
var worked = false
var expected = [42, 24]

t.notOk(queue.paused, 'it should not be paused')

Expand All @@ -151,34 +164,45 @@ test('pause && resume', function (t) {
t.equal(result, true, 'result matches')
})

queue.push(24, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
})

t.notOk(worked, 'it should be paused')
t.ok(queue.paused, 'it should be paused')

queue.resume()
queue.pause()
queue.resume()
queue.resume() // second resume is a no-op

t.notOk(queue.paused, 'it should not be paused')

function worker (arg, cb) {
t.equal(arg, 42)
t.notOk(queue.paused, 'it should not be paused')
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')
t.equal(arg, expected.shift())
worked = true
cb(null, true)
process.nextTick(function () { cb(null, true) })
}
})

test('pause in flight && resume', function (t) {
t.plan(9)
t.plan(16)

var queue = buildQueue(worker, 1)
var expected = [42, 24]
var expected = [42, 24, 12]

t.notOk(queue.paused, 'it should not be paused')

queue.push(42, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
t.ok(queue.paused, 'it should be paused')
process.nextTick(function () { queue.resume() })
process.nextTick(function () {
queue.resume()
queue.pause()
queue.resume()
})
})

queue.push(24, function (err, result) {
Expand All @@ -187,40 +211,60 @@ test('pause in flight && resume', function (t) {
t.notOk(queue.paused, 'it should not be paused')
})

queue.push(12, function (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
t.notOk(queue.paused, 'it should not be paused')
})

queue.pause()

function worker (arg, cb) {
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')
t.equal(arg, expected.shift())
process.nextTick(function () { cb(null, true) })
}
})

test('altering concurrency', function (t) {
t.plan(7)
t.plan(24)

var queue = buildQueue(worker, 1)
var count = 0

queue.pause()

queue.push(24, workDone)
queue.push(24, workDone)
queue.push(24, workDone)

queue.pause()

queue.concurrency = 3 // concurrency changes are ignored while paused
queue.concurrency = 2

queue.resume()

t.equal(queue.running(), 2, '2 jobs running')

queue.concurrency = 3

t.equal(queue.running(), 3, '3 jobs running')

queue.concurrency = 1

t.equal(queue.running(), 3, '3 jobs running') // running jobs can't be killed

queue.push(24, workDone)
queue.push(24, workDone)
queue.push(24, workDone)
queue.push(24, workDone)

function workDone (err, result) {
t.error(err, 'no error')
t.equal(result, true, 'result matches')
}

function worker (arg, cb) {
t.equal(0, count, 'works in parallel')
t.ok(queue.running() <= queue.concurrency, 'should respect the concurrency')
setImmediate(function () {
count++
cb(null, true)
})
}
Expand Down

0 comments on commit 62eb43e

Please sign in to comment.