Skip to content

Commit

Permalink
[feature] Refactor requests between nodes and add clientRooms method (
Browse files Browse the repository at this point in the history
#146)

This PR refactors the way requests are sent between nodes. The `subJson`
client has been removed, the subClient now listens to three channels by
default:
- the usual channel, where the message are broadcast
- the request channel, where any node can request information (for now,
  the clients in a given rooms or the rooms for a given client)
- the response channel, where a node will receive answer to its requests
  • Loading branch information
darrachequesne committed Nov 28, 2016
1 parent c38e323 commit 84261b2
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 79 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ The following options are allowed:
- `subEvent`: optional, the redis client event name to subscribe to (`message`)
- `pubClient`: optional, the redis client to publish events on
- `subClient`: optional, the redis client to subscribe to events on
- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`)
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`)

If you decide to supply `pubClient` and `subClient`, make sure you use
[node_redis](https://github.com/mranney/node_redis) as a client or one
Expand All @@ -56,12 +56,16 @@ that a regular `Adapter` does not
- `prefix`
- `pubClient`
- `subClient`
- `clientsTimeout`
- `requestsTimeout`

### RedisAdapter#clients(rooms:Array, fn:Function)

Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction)

### RedisAdapter#clientRooms(id:String, fn:Function)

Returns the list of rooms the client with the given ID has joined (even on another node).

## Client error handling

Access the `pubClient` and `subClient` properties of the
Expand Down
257 changes: 180 additions & 77 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ var async = require('async');

module.exports = adapter;

/**
* Request types, for messages between nodes
*/

var requestTypes = {
clients: 0,
clientRooms: 1,
};

/**
* Returns a redis Adapter class.
*
Expand All @@ -39,7 +48,7 @@ function adapter(uri, opts){

var prefix = opts.key || 'socket.io';
var subEvent = opts.subEvent || 'message';
var clientsTimeout = opts.clientsTimeout || 1000;
var requestsTimeout = opts.requestsTimeout || 1000;

// init clients if needed
function createClient(redis_opts) {
Expand All @@ -50,11 +59,9 @@ function adapter(uri, opts){
return redis(opts.port, opts.host, redis_opts);
}
}

if (!pub) pub = createClient();
if (!sub) sub = createClient({ return_buffers: true });

var subJson = sub.duplicate({ return_buffers: false });

// this server's key
var uid = uid2(6);
Expand All @@ -71,10 +78,12 @@ function adapter(uri, opts){

this.uid = uid;
this.prefix = prefix;
this.clientsTimeout = clientsTimeout;
this.requestsTimeout = requestsTimeout;

this.channel = prefix + '#' + nsp.name + '#';
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#';
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
this.requests = {};

if (String.prototype.startsWith) {
this.channelMatches = function (messageChannel, subscribedChannel) {
Expand All @@ -90,16 +99,11 @@ function adapter(uri, opts){

var self = this;

sub.subscribe(this.channel, function(err){
if (err) self.emit('error', err);
});

subJson.subscribe(this.syncChannel, function(err){
sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
if (err) self.emit('error', err);
});

sub.on(subEvent, this.onmessage.bind(this));
subJson.on(subEvent, this.onclients.bind(this));
}

/**
Expand All @@ -115,9 +119,16 @@ function adapter(uri, opts){
*/

Redis.prototype.onmessage = function(channel, msg){
if (!this.channelMatches(channel.toString(), this.channel)) {
channel = channel.toString();

if (this.channelMatches(channel, this.requestChannel)) {
return this.onrequest(channel, msg);
} else if (this.channelMatches(channel, this.responseChannel)) {
return this.onresponse(channel, msg);
} else if (!this.channelMatches(channel, this.channel)) {
return debug('ignore different channel');
}

var args = msgpack.decode(msg);
var packet;

Expand All @@ -139,40 +150,119 @@ function adapter(uri, opts){
};

/**
* Called with a subscription message on sync
* Called on request from another node
*
* @api private
*/

Redis.prototype.onclients = function(channel, msg){

Redis.prototype.onrequest = function(channel, msg){
var self = this;
var request;

if (!self.channelMatches(channel.toString(), self.syncChannel)) {
return debug('ignore different channel');
try {
request = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}

debug('received request %j', request);

switch (request.type) {

case requestTypes.clients:
Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}

var response = JSON.stringify({
requestid: request.requestid,
clients: clients
});

pub.publish(self.responseChannel, response);
});
break;

case requestTypes.clientRooms:
Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
if(err){
self.emit('error', err);
return;
}

if (!rooms) { return; }

var response = JSON.stringify({
requestid: request.requestid,
rooms: rooms
});

pub.publish(self.responseChannel, response);
});
break;

default:
debug('ignoring unknown request type: %s', request.type);
}
};

/**
* Called on response from another node
*
* @api private
*/

Redis.prototype.onresponse = function(channel, msg){
var self = this;
var response;

try {
var decoded = JSON.parse(msg);
response = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}

Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}
if (!response.requestid || !self.requests[response.requestid]) {
debug('ignoring unknown request');
return;
}

var responseChn = prefix + '-sync#response#' + decoded.transaction;
var response = JSON.stringify({
clients : clients
});
debug('received response %j', response);

pub.publish(responseChn, response);
});

var request = self.requests[response.requestid];

switch (request.type) {

case requestTypes.clients:
request.msgCount++;

// ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;

for(var i = 0; i < response.clients.length; i++){
request.clients[response.clients[i]] = true;
}

if (request.msgCount === request.numsub) {
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients)));
delete self.requests[request.requestid];
}
break;

case requestTypes.clientRooms:
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms));
delete self.requests[request.requestid];
break;

default:
debug('ignoring unknown request type: %s', request.type);
}
};

/**
Expand Down Expand Up @@ -292,6 +382,7 @@ function adapter(uri, opts){
* Gets a list of clients by sid.
*
* @param {Array} explicit set of rooms to check.
* @param {Function} callback
* @api public
*/

Expand All @@ -304,11 +395,9 @@ function adapter(uri, opts){
rooms = rooms || [];

var self = this;
var requestid = uid2(6);

var transaction = uid2(6);
var responseChn = prefix + '-sync#response#' + transaction;

pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
Expand All @@ -317,64 +406,78 @@ function adapter(uri, opts){

numsub = numsub[1];

var msg_count = 0;
var clients = {};

subJson.subscribe(responseChn, function(err) {
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

var request = JSON.stringify({
transaction : transaction,
rooms : rooms
});

/*If there is no response for 1 second, return result;*/
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}, self.clientsTimeout);
var request = JSON.stringify({
requestid : requestid,
type: requestTypes.clients,
rooms : rooms
});

subJson.on(subEvent, function onEvent(channel, msg) {
// if there is no response for x second, return result
var timeout = setTimeout(function() {
var request = self.requests[requestid];
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients)));
delete self.requests[requestid];
}, self.requestsTimeout);

self.requests[requestid] = {
type: requestTypes.clients,
numsub: numsub,
msgCount: 0,
clients: {},
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
});
};

if (!self.channelMatches(channel.toString(), responseChn)) {
return debug('ignore different channel');
}
/**
* Gets the list of rooms a given client has joined.
*
* @param {String} client id
* @param {Function} callback
* @api public
*/

var response = JSON.parse(msg);
Redis.prototype.clientRooms = function(id, fn){

//Ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;

for(var i = 0; i < response.clients.length; i++){
clients[response.clients[i]] = true;
}
var self = this;
var requestid = uid2(6);

msg_count++;
if(msg_count == numsub){
clearTimeout(timeout);
subJson.unsubscribe(responseChn);
subJson.removeListener(subEvent, onEvent);
var rooms = this.sids[id];

if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}
});
if (rooms) {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms)));
return;
}

pub.publish(self.syncChannel, request);
var request = JSON.stringify({
requestid : requestid,
type: requestTypes.clientRooms,
sid : id
});

});
// if there is no response for x second, return result
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response')));
delete self.requests[requestid];
}, self.requestsTimeout);

});
self.requests[requestid] = {
type: requestTypes.clientRooms,
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
};

Redis.uid = uid;
Redis.pubClient = pub;
Redis.subClient = sub;
Redis.prefix = prefix;
Redis.clientsTimeout = clientsTimeout;
Redis.requestsTimeout = requestsTimeout;

return Redis;

Expand Down

0 comments on commit 84261b2

Please sign in to comment.