Skip to content

Commit

Permalink
Fixing inconsistent RPC request ACK message format (#272)
Browse files Browse the repository at this point in the history
* updated to reflect change to RPC REQ ACK message format

* fixed all specs, e2e, de-tangled rpc error handling logic
  • Loading branch information
jdmnd authored and yasserf committed Nov 1, 2016
1 parent 808426c commit 9330530
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 44 deletions.
33 changes: 16 additions & 17 deletions src/rpc/rpc-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ RpcHandler.prototype._respondToRpc = function( message ) {
}

if( this._providers[ name ] ) {
response = new RpcResponse( this._connection,name, correlationId );
response = new RpcResponse( this._connection, name, correlationId );
this._providers[ name ]( data, response );
} else {
this._connection.sendMsg( C.TOPIC.RPC, C.ACTIONS.REJECTION, [ name, correlationId ] );
Expand Down Expand Up @@ -183,29 +183,28 @@ RpcHandler.prototype._$handle = function( message ) {
return;
}

// handle auth/denied subscription errors
if( message.action === C.ACTIONS.ERROR ) {
if( message.data[ 0 ] === C.EVENT.MESSAGE_PERMISSION_ERROR ) {
return;
}
if( message.data[ 0 ] === C.EVENT.MESSAGE_DENIED && message.data[ 2 ] === C.ACTIONS.SUBSCRIBE ) {
this._ackTimeoutRegistry.remove( message.data[ 1 ], C.ACTIONS.SUBSCRIBE );
return;
}
}

/*
* Error messages always have the error as first parameter. So the
* order is different to ack and response messages
*/
if( message.action === C.ACTIONS.ERROR ) {
if( message.data[ 0 ] === C.EVENT.MESSAGE_PERMISSION_ERROR ) {
return;
}
else if( message.data[ 0 ] === C.EVENT.MESSAGE_DENIED ) {
if( message.data[ 2 ] === C.ACTIONS.SUBSCRIBE ) {
this._ackTimeoutRegistry.remove( message.data[ 1 ], C.ACTIONS.SUBSCRIBE );
return;
}
else if( message.data[ 2 ] === C.ACTIONS.REQUEST ) {
rpcName = message.data[ 1 ];
correlationId = message.data[ 3 ];
}
if( message.action === C.ACTIONS.ERROR || message.action === C.ACTIONS.ACK ) {
if( message.data[ 0 ] === C.EVENT.MESSAGE_DENIED && message.data[ 2 ] === C.ACTIONS.REQUEST ) {
correlationId = message.data[ 3 ];
} else {
rpcName = message.data[ 1 ];
correlationId = message.data[ 2 ];
}

rpcName = message.data[ 1 ];
} else {
rpcName = message.data[ 0 ];
correlationId = message.data[ 1 ];
Expand Down Expand Up @@ -247,4 +246,4 @@ RpcHandler.prototype._reprovide = function() {
};


module.exports = RpcHandler;
module.exports = RpcHandler;
8 changes: 6 additions & 2 deletions src/rpc/rpc-response.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ var RpcResponse = function( connection, name, correlationId ) {
*/
RpcResponse.prototype.ack = function() {
if( this._isAcknowledged === false ) {
this._connection.sendMsg( C.TOPIC.RPC, C.ACTIONS.ACK, [ this._name, this._correlationId ] );
this._connection.sendMsg(
C.TOPIC.RPC,
C.ACTIONS.ACK,
[ C.ACTIONS.REQUEST, this._name, this._correlationId ]
);
this._isAcknowledged = true;
}
};
Expand Down Expand Up @@ -104,4 +108,4 @@ RpcResponse.prototype._performAutoAck = function() {
}
};

module.exports = RpcResponse;
module.exports = RpcResponse;
2 changes: 1 addition & 1 deletion test-specs/features
2 changes: 1 addition & 1 deletion test-unit/mocks/message/connection-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ ConnectionMock.prototype.authenticate = function() {

};

module.exports = ConnectionMock;
module.exports = ConnectionMock;
40 changes: 20 additions & 20 deletions test-unit/unit/rpc/rpc-handlerSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,44 +44,44 @@ describe( 'handles rpc providers', function() {
done();
}, 100 );
});

it( 'replies to sync rpc request', function(){
rpcHandler._$handle({
topic: 'RPC',
action: 'REQ',
data: [ 'addTwo', '678', 'O{"numA":2,"numB":3, "sync": true}' ]
});

expect( connectionMock.lastSendMessage ).toBe( msg( 'P|RES|addTwo|678|N5+' ) );
});

it( 'replies to async rpc request', function( done ){
rpcHandler._$handle({
topic: 'RPC',
action: 'REQ',
data: [ 'addTwo', '123', 'O{"numA":7,"numB":3}' ]
});

setTimeout(function(){
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|A|addTwo|123+' ) );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|A|REQ|addTwo|123+' ) );
}, 3 );

setTimeout(function(){
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|RES|addTwo|123|N10+' ) );
done();
}, 30 );
});

it( 'sends rejection if no provider exists', function() {
rpcHandler._$handle({
topic: 'RPC',
action: 'REQ',
data: [ 'doesNotExist', '432', 'O{"numA":7,"numB":3}' ]
});

expect( connectionMock.lastSendMessage ).toBe( msg( 'P|REJ|doesNotExist|432+' ) );
});

it( 'deregisters providers', function() {
rpcHandler.unprovide( 'addTwo' );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|US|addTwo+' ) );
Expand All @@ -103,54 +103,54 @@ describe( 'handles rpc providers', function() {
action: 'REQ',
data: [ 'addTwo', '434', 'O{"numA":2,"numB":7, "sync": true}' ]
});

expect( connectionMock.lastSendMessage ).toBe( msg( 'P|REJ|addTwo|434+' ) );
});
});

describe( 'makes rpcs', function() {
var rpcHandler,
callback = jasmine.createSpy( 'rpc callback' );

it( 'creates the RPC handler', function(){
rpcHandler = new RpcHandler( options, connectionMock, clientMock );
expect( rpcHandler.provide ).toBeDefined();
});

it( 'makes a successful rpc for addTwo', function() {
rpcHandler.make( 'addTwo', { numA: 3, numB: 8 }, callback );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|REQ|addTwo|1|O{"numA":3,"numB":8}+' ) );
expect( callback ).not.toHaveBeenCalled();

rpcHandler._$handle({
topic: 'RPC',
action: 'RES',
data: [ 'addTwo', '1', 'N11' ]
});

expect( callback ).toHaveBeenCalledWith( null, 11 );
});

it( 'makes rpc for addTwo, but receives an error', function() {
rpcHandler.make( 'addTwo', { numA: 3, numB: 8 }, callback );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|REQ|addTwo|1|O{"numA":3,"numB":8}+' ) );

rpcHandler._$handle({
topic: 'RPC',
action: 'E',
data: [ 'NO_PROVIDER', 'addTwo', '1' ]
});

expect( callback ).toHaveBeenCalledWith( 'NO_PROVIDER' );
});

it( 'makes rpc for addTwo, but doesn\'t receive ack in time', function( done ) {
rpcHandler.make( 'addTwo', { numA: 3, numB: 8 }, callback );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|REQ|addTwo|1|O{"numA":3,"numB":8}+' ) );

setTimeout(function() {
expect( callback ).toHaveBeenCalledWith( 'ACK_TIMEOUT' );
done();
}, 30 );
});
});
});
6 changes: 3 additions & 3 deletions test-unit/unit/rpc/rpc-responseSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe( 'sends the correct response messages - happy path', function(){

it( 'sends an ack message automatically', function( done ){
setTimeout(function(){
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|A|addTwo|123+' ) );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|A|REQ|addTwo|123+' ) );
done();
}, 10 );
});
Expand Down Expand Up @@ -44,7 +44,7 @@ describe( 'sends the correct response messages - ack behaviour', function(){

it( 'sends ack message', function() {
response.ack();
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|A|addTwo|123+' ) );
expect( connectionMock.lastSendMessage ).toBe( msg( 'P|A|REQ|addTwo|123+' ) );
});

it( 'doesn\'t send multiple ack messages', function() {
Expand Down Expand Up @@ -89,4 +89,4 @@ describe( 'sends the correct response messages - error behaviour', function(){
it( 'throws an error when trying to send a completed response', function() {
expect(function(){ response.send( 'bla' ); }).toThrow();
});
});
});

0 comments on commit 9330530

Please sign in to comment.