Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #606 from namgk/master
Browse files Browse the repository at this point in the history
mqtt over websocket on different path
  • Loading branch information
mcollina committed Mar 14, 2017
2 parents 9e451ae + 5d27802 commit f1c814a
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 16 deletions.
7 changes: 0 additions & 7 deletions lib/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,6 @@ function validate(opts, validationOptions) {

var result = validator.validate(opts, '/Options', validationOptions);

// check empty interfaces
if (opts.hasOwnProperty('interfaces')) {
if (opts.interfaces.length === 0) {
result.addError('no interfaces were defined');
}
}

// check required credentials
if (opts.hasOwnProperty('interfaces')) {
var hasCredentials = opts.hasOwnProperty('credentials');
Expand Down
13 changes: 10 additions & 3 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ function Server(opts, callback) {
// initialize servers list
this.servers = [];


steed.series([

// steed.series: wait for ascoltatore
Expand Down Expand Up @@ -236,7 +237,6 @@ function Server(opts, callback) {
}
});


that.on("clientConnected", function(client) {
if(that.modernOpts.publishNewClient) {
that.publish({
Expand Down Expand Up @@ -609,10 +609,17 @@ Server.prototype.close = function(callback) {
*
* @api public
* @param {HttpServer} server
* @param {String} path
*/
Server.prototype.attachHttpServer = function(server) {
Server.prototype.attachHttpServer = function(server, path) {
var that = this;
ws.createServer({ server: server }, function(stream) {

var opt = { server: server };
if (path) {
opt.path = path;
}

ws.createServer(opt, function(stream) {
var conn = new Connection(stream);
new Client(conn, that);
});
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"author": "Matteo Collina <hello@matteocollina.com>",
"license": "MIT",
"devDependencies": {
"browserify": "~13.0.0",
"chai": "^3.5.0",
"coveralls": "~2.11.1",
"dox-foundation": "~0.5.4",
Expand All @@ -62,9 +63,9 @@
"sinon-chai": "~2.8.0",
"supertest": "~1.2.0",
"tmp": "0.0.24",
"browserify": "~13.0.0",
"uglify-js": "^2.4.16",
"underscore": "^1.7.0"
"underscore": "^1.7.0",
"ws": "^1.0.1"
},
"dependencies": {
"array-from": "^2.1.1",
Expand Down
6 changes: 3 additions & 3 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ module.exports = function(moscaSettings, createConnection) {
});

function finish () {
client.removeListener('error', finish)
client.stream.removeListener('close', finish)
done()
client.removeListener('error', finish);
client.stream.removeListener('close', finish);
done();
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ describe("mosca.Server", function() {
// Simulate a situation that it takes same time to do authorizeSubscribe.
this.instance.authorizeSubscribe = function(client, topic, callback) {
setTimeout(function(){
callback(null, true)
callback(null, true);
}, 300);
};

Expand Down
201 changes: 201 additions & 0 deletions test/server_websocket_attach.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
var mqtt = require('mqtt');
var websocket = require('ws');
var http = require('http');

var port = nextPort();
var path = '/test';
var mqttPath = '/mqttws';
var mqttTopic = 'atopic';
var ping = 'ping';
var pong = 'pong';

describe("mosca.Server - Mqtt-over-WS attached to existing http server", function() {
var server, mqttServ;

beforeEach(function(){
server = http.createServer();
mqttServ = new mosca.Server({interfaces:[]});
});

afterEach(function(){
server.close();
});

it("should not occupy 1883 port while attached to http server", function(done) {
mqttServ.attachHttpServer(server);
server.listen(1883, done);
});

it("should be able to do mqtt over WebSocket", function(done) {
mqttServ.attachHttpServer(server);
server.listen(port, function(){
var client = mqtt.connect('ws://localhost:' + port);
client.subscribe(mqttTopic);
client.on("message", function(topic, payload) {
expect(topic).to.equal(mqttTopic);
expect(payload.toString()).to.equal(ping);
done();
});
client.publish(mqttTopic, ping);
});
});

it("should be able to do mqtt over WebSocket on specific path", function(done) {
mqttServ.attachHttpServer(server, mqttPath);
server.listen(port, function(){
var client = mqtt.connect('ws://localhost:' + port + mqttPath);
client.subscribe(mqttTopic);
client.on("message", function(topic, payload) {
expect(topic).to.equal(mqttTopic);
expect(payload.toString()).to.equal(ping);
done();
});
client.publish(mqttTopic, ping);
});
});

it("should not be able to do mqtt over WebSocket on different path", function(done) {
mqttServ.attachHttpServer(server, mqttPath);
server.listen(port, function(){
var client = mqtt.connect('ws://localhost:' + port + '/junk');
client.subscribe(mqttTopic);
var failed = false;// ensuring done is called once
client.on("message", function(topic, payload) {
failed = true;
done(failed);
});
client.publish(mqttTopic, ping);
setTimeout(function(){
if (!failed){
done();
}
}, 3000);
});
});

it("should not be able to do mqtt over WebSocket on root path", function(done) {
mqttServ.attachHttpServer(server, mqttPath);
server.listen(port, function(){
var client = mqtt.connect('ws://localhost:' + port);
client.subscribe(mqttTopic);
var failed = false;
client.on("message", function(topic, payload) {
failed = true;
done(failed);
});
client.publish(mqttTopic, ping);
setTimeout(function(){
if (!failed){
done();
}
}, 2000);
});
});
});

describe("mosca.Server - Websocket and Mqtt-over-WS attached to the same http server", function() {
var server, mqttServ, wss;

beforeEach(function(){
server = http.createServer();
mqttServ = new mosca.Server({interfaces:[]});

wss = new websocket.Server({
server: server,
path: path,
perMessageDeflate: false
});
});

afterEach(function(){
server.close();
});

it("ws client should not connect when mqtt is attached to http server without path", function(done) {
mqttServ.attachHttpServer(server);
server.listen(port, function(){
var ws = new websocket('ws://localhost:' + port + path, {
perMessageDeflate: false
});

ws.on('error', function(e) {
expect(e).to.not.be.undefined;
done();
});
});
});

it("ws client should be able to connect when specific path is used", function(done) {
mqttServ.attachHttpServer(server, mqttPath);
wss.on('connection', function(conn){
conn.on('message', function(msg){
expect(msg).to.equal(ping);
conn.send(pong);
});
});

server.listen(port, function(){
var ws = new websocket('ws://localhost:' + port + path, {
perMessageDeflate: false
});

ws.on('open', function(){
ws.send(ping);
});

ws.on('message', function(msg){
expect(msg).to.equal(pong);
done();
});
});
});

it("mqtt client should be able to connect as well", function(done) {
mqttServ.attachHttpServer(server, mqttPath);
server.listen(port, function(){
var client = mqtt.connect('ws://localhost:' + port + mqttPath);
client.subscribe(mqttTopic);
client.on("message", function(topic, payload) {
expect(topic).to.equal(mqttTopic);
expect(payload.toString()).to.equal(ping);
done();
});
client.publish(mqttTopic, ping);
});
});

it("both ws and mqtt client should be able to connect at the same time", function(done) {
mqttServ.attachHttpServer(server, mqttPath);
wss.on('connection', function(conn){
conn.on('message', function(msg){
expect(msg).to.equal(ping);
conn.send(pong);
});
});

server.listen(port, function(){
var client = mqtt.connect('ws://localhost:' + port + mqttPath);
var ws = new websocket('ws://localhost:' + port + path, {
perMessageDeflate: false
});

client.on('connect', function () {
client.subscribe(mqttTopic);
setTimeout(function(){// wait for ws to connect
ws.send(ping);
}, 2000);
});

ws.on('message', function(msg){
expect(msg).to.equal(pong);
client.publish(mqttTopic, ping);
});

client.on("message", function(topic, payload) {
expect(topic).to.equal(mqttTopic);
expect(payload.toString()).to.equal(ping);
done();
});
});
});
});

0 comments on commit f1c814a

Please sign in to comment.