Skip to content

Commit

Permalink
added methods .info() and .size()
Browse files Browse the repository at this point in the history
closes #17
  • Loading branch information
M. Peter committed Jul 14, 2016
1 parent 75463ab commit b6bcec3
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 71 deletions.
4 changes: 3 additions & 1 deletion Gruntfile.coffee
Expand Up @@ -36,7 +36,8 @@ module.exports = (grunt) ->
options:
require: [ "should" ]
reporter: "spec"
bail: false
bail: ( if process.env.BAIL then true else false )
grep: process.env.GREP
timeout: 3000
slow: 3

Expand Down Expand Up @@ -89,3 +90,4 @@ module.exports = (grunt) ->
# build the project
grunt.registerTask "build", [ "clear", "coffee:base", "includereplace" ]
grunt.registerTask "build-dev", [ "clear", "coffee:base", "docs", "test" ]

23 changes: 23 additions & 0 deletions README.md
Expand Up @@ -146,6 +146,28 @@ Stop the worker and close the connection.
After this it's no longer possible to reuse the worker-instance.
It's just intended to kill all timers and connections so your script will end.

### `.info( cb )`

Get the current queue attributes.
This is just a shortcut to the [`rsmq.getQueueAttributes`](https://github.com/smrchy/rsmq#getqueueattributes).

**Arguments**

* `cb` : *( `Function` )*: Callback with `( err, attributes )`. See [rsmq-docs](https://github.com/smrchy/rsmq#getqueueattributes) for details.

### `.size( [hidden=false], cb )`

Get the current queue size.

**Arguments**

* `hidden` : *( `Boolean` optional; default = `false` )*: The count of messages including the currently hidden/"in flight" messages.
* `cb` : *( `Function` optional )*: Callback with `( err, size )`. The `size` is a `Number` and repersents the number of messages in the queue. If `hidden=true` you will receive the numebr of currently hidden messages

**Return**

*( Self )*: The instance itself for chaining.

## Events

### `message`
Expand Down Expand Up @@ -309,6 +331,7 @@ This is an advanced example showing some features in action.
## Release History
|Version|Date|Description|
|:--:|:--:|:--|
|0.5.0|2016-07-14|Added methods `.info(cb)` ([Issue#17](https://github.com/mpneuried/rsmq-worker/issues/17)) and `.size( [hidden,] cb )`|
|0.4.3|2016-06-20|Optimized event listeners [Issue#15](https://github.com/mpneuried/rsmq-worker/issues/15). Thanks to [Kevin Turner](https://github.com/kpturner )|
|0.4.2|2016-05-06|Added the `.quit()` function [Issue#11](https://github.com/mpneuried/rsmq-worker/issues/11). Thanks to [Sam Fung](https://github.com/5amfung )|
|0.4.1|2016-04-05|Fixed missing isNumber function|
Expand Down
51 changes: 51 additions & 0 deletions _src/lib/rsmq-worker.coffee
Expand Up @@ -220,7 +220,58 @@ class RSMQWorker extends require( "mpbasic" )()
changeInterval: ( interval )=>
@config.interval = interval
return @

###
## info
`RSMQWorker.info( cb )`
Get the queue attributes
@param { Function } cb The callback function
@return { RSMQWorker } The instance itself for chaining.
@api public
###
info: ( cb )=>
@queue.getQueueAttributes qname: @queuename, ( err, resp )=>
if err
@error "queue info", err
cb( err )
return
cb( null, resp )
return
return @

###
## size
`RSMQWorker.size( hidden, cb )`
Get the queue size.
@param { Boolean } [hidden=false] Get the message count of the queue including the currently hidden/"in flight" messages.
@param { Function } cb The callback function
@return { RSMQWorker } The instance itself for chaining.
@api public
###
size: ( [hidden]..., cb )=>
@queue.getQueueAttributes qname: @queuename, ( err, resp )=>
if err
@error "queue size", err
cb( err )
return

_size = resp?.msgs or 0
if hidden is true
_size = resp.hiddenmsgs or 0
cb( null, _size )
return
return @

###
## _initRSMQ
Expand Down
158 changes: 147 additions & 11 deletions _src/test/main.coffee
@@ -1,16 +1,18 @@
should = require( 'should' )
utils = require( './utils' )
rand = require( 'randoms' )
async = require( 'async' )

_queuename = utils.randomString( 10, 1 )
_queuename = rand.string( 10, 1 )
worker = null
_created = null

describe "----- rsmq-worker TESTS -----", ->

before ( done )->

RSMQWorker = require( "../." )
worker = new RSMQWorker( _queuename, { interval: [ 0, 1, 5 ] } )

_created = Math.round( Date.now() / 1000 )
worker.on "ready", ->
done()
return
Expand All @@ -25,7 +27,7 @@ describe "----- rsmq-worker TESTS -----", ->
return

describe 'Main Tests', ->

_tRecv = 0
# Implement tests cases here
it "check interval config", ( done )->
worker.config.interval.length.should.equal( 3 )
Expand All @@ -37,7 +39,7 @@ describe "----- rsmq-worker TESTS -----", ->

# Implement tests cases here
it "first test", ( done )->
_examplemsg = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg = rand.string( rand.number( 4, 99 ), 3 )

_testFn = ( msg, next, id )->

Expand All @@ -52,10 +54,11 @@ describe "----- rsmq-worker TESTS -----", ->


worker.send( _examplemsg )
_tRecv++
return

it "delay test", ( done )->
_examplemsg = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg = rand.string( rand.number( 4, 99 ), 3 )
_start = Date.now()
_delay = 5
@timeout( _delay*1.5*1000 )
Expand All @@ -72,10 +75,11 @@ describe "----- rsmq-worker TESTS -----", ->
worker.on( "message", _testFn )

worker.send( _examplemsg, _delay )
_tRecv++
return

it "delay test with callback", ( done )->
_examplemsg = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg = rand.string( rand.number( 4, 99 ), 3 )
_start = Date.now()
_delay = 5
@timeout( _delay*1.5*1000 )
Expand All @@ -91,13 +95,145 @@ describe "----- rsmq-worker TESTS -----", ->

worker.on( "message", _testFn )

_tRecv++
worker.send _examplemsg, _delay, ( err )->
should.not.exist( err )
return
return

it "test size method", ( done )->
@timeout( 15000 )
_COUNT = 10
_examplemsgs = []
for _x in [1.._COUNT]
_examplemsgs.push rand.string( rand.number( 4, 99 ), 3 )

_runHiddenSize = ( next )->
return ->
# check hidden size and go on
worker.size true, ( err, size )->
throw err if err
should.exist( size )
size.should.be.a.number
size.should.equal( 1 )
_idx++
next()
worker.start()
return
return

_idx = 0
_testFn = ( msg, next, id )->
should.equal( msg, _examplemsgs[ _idx ] )

if _idx is 0
# stop and wait to check the hidden size
setTimeout( _runHiddenSize( next ), 1000 )
worker.stop()
return

next()

_idx++
# done if all messages are received
if _idx >= _COUNT
worker.removeListener( "message", _testFn )
done()
return

worker.on( "message", _testFn )

worker.stop()

_fnSend = ( msg, cba )->
_tRecv++
worker.send( msg, 0, cba )
return

async.every _examplemsgs, _fnSend, ( err )->
throw err if err

worker.size ( err, size )->
worker.start() # start immediate so the following tests will not fail due to a stopped worker

throw err if err
should.exist( size )
size.should.be.a.number
size.should.equal( _COUNT )

return
return
return

it "test info method", ( done )->
@timeout( 15000 )
_COUNT = 10
_examplemsgs = []
for _x in [1.._COUNT]
_examplemsgs.push rand.string( rand.number( 4, 99 ), 3 )

_idx = 0
_testFn = ( msg, next, id )->
should.equal( msg, _examplemsgs[ _idx ] )

next()

_idx++
# done if all messages are received
if _idx >= _COUNT
worker.removeListener( "message", _testFn )
done()
return

worker.on( "message", _testFn )

worker.stop()

_fnSend = ( msg, cba )->
_tRecv++
worker.send( msg, 0, cba )
return

async.every _examplemsgs, _fnSend, ( err )->
throw err if err

worker.info ( err, info )->
worker.start() # start immediate so the following tests will not fail due to a stopped worker

throw err if err
should.exist( info )
info.should.have.property( "msgs" )
.with.equal( _COUNT )

info.should.have.property( "delay" )
.with.equal( 0 )

info.should.have.property( "vt" )
.with.equal( 30 )

info.should.have.property( "maxsize" )
.with.equal( 65536 )

info.should.have.property( "totalsent" )
.with.equal( _tRecv )

info.should.have.property( "totalrecv" )
.with.equal( _tRecv - _COUNT )

info.should.have.property( "created" )
.with.approximately( _created, 10 )

info.should.have.property( "modified" )
.with.approximately( _created, 10 )



return
return
return

it "error throw within message processing - Issue #3 (A)", ( done )->
_examplemsg = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg = rand.string( rand.number( 4, 99 ), 3 )
@timeout( 3000 )

_testFn = ( msg, next, id )->
Expand Down Expand Up @@ -125,7 +261,7 @@ describe "----- rsmq-worker TESTS -----", ->
return

it "code error within message processing - Issue #3 (B)", ( done )->
_examplemsg = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg = rand.string( rand.number( 4, 99 ), 3 )
@timeout( 3000 )

_testFn = ( msg, next, id )->
Expand Down Expand Up @@ -156,9 +292,9 @@ describe "----- rsmq-worker TESTS -----", ->
return
return

_examplemsg2 = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg2 = rand.string( rand.number( 4, 99 ), 3 )
it "test stop method - Pull #5 stop", ( done )->
_examplemsg = utils.randomString( utils.randRange( 4, 99 ), 3 )
_examplemsg = rand.string( rand.number( 4, 99 ), 3 )
@timeout( 6000 )

idx = 0
Expand Down
53 changes: 0 additions & 53 deletions _src/test/utils.coffee

This file was deleted.

0 comments on commit b6bcec3

Please sign in to comment.