Skip to content

Latest commit

 

History

History
416 lines (283 loc) · 16.1 KB

Aedes.md

File metadata and controls

416 lines (283 loc) · 16.1 KB

Aedes

new Aedes([options]) / new Aedes.Server([options])

  • options <object>
    • mq <MQEmitter> middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default: mqemitter
    • concurrency <number> maximum number of concurrent messages delivered by mq. Default: 100
    • persistence <Persistence> middleware that stores QoS > 0, retained, will packets and subscriptions. Default: aedes-persistence (in memory)
    • queueLimit <number> maximum number of queued messages before client session is established. If number of queued items exceeds, connectionError throws an error Client queue limit reached. Default: 42
    • maxClientsIdLength option to override MQTT 3.1.0 clients Id length limit. Default: 23
    • heartbeatInterval <number> an interval in millisconds at which server beats its health signal in $SYS/<aedes.id>/heartbeat topic. Default: 60000
    • id <string> aedes broker unique identifier. Default: uuidv4()
    • connectTimeout <number> maximum waiting time in milliseconds waiting for a CONNECT packet. Default: 30000
    • keepaliveLimit <number> maximum client keep alive time allowed, 0 means no limit. Default: 0
  • Returns <Aedes>

Create a new Aedes server.

Aedes is the class and function exposed by this module. It can be created by Aedes() or using new Aedes(). An variant aedes.Server is for TypeScript or ES modules.

aedes.id

  • <string> Default: uuidv4()

Server unique identifier.

aedes.connectedClients

  • <number> Default: 0

Number of connected clients in server.

aedes.closed

  • <boolean> Default: false

a read-only flag indicates if server is closed or not.

Event: client

Emitted when the client registers itself to server. The client is not ready yet. Its connecting state equals to true.

Server publishes a SYS topic $SYS/<aedes.id>/new/clients to inform it registers the client into its registration pool. client.id is the payload.

Event: clientReady

Emitted when the client has received all its offline messages and be initialized. The client connected state equals to true and is ready for processing incoming messages.

Event: clientDisconnect

Emitted when a client disconnects.

Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients to inform it deregisters the client. client.id is the payload.

Event: clientError

Emitted when an error occurs.

Event: connectionError

Emitted when an error occurs. Unlike clientError it raises only when client is uninitialized.

Event: keepaliveTimeout

Emitted when timeout happes in the client keepalive.

Event: publish

Emitted when servers delivers the packet to subscribed client. If there are no clients subscribed to the packet topic, server still publish the packet and emit the event. client is null when packet is an internal message like aedes heartbeat message and LWT.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

Event: ack

Emitted an QoS 1 or 2 acknowledgement when the packet successfully delivered to the client.

Event: ping

Emitted when client sends a PINGREQ.

Event: subscribe

  • subscriptions <object>
  • client <Client>

Emitted when client successfully subscribe the subscriptions in server.

subscriptions is an array of { topic: topic, qos: qos }. The array excludes duplicated topics and includes negated subscriptions where qos equals to 128. See more on authorizeSubscribe

Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers to inform a client successfully subscribed to one or more topics. The payload is a JSON that has clientId and subs props, subs equals to subscriptions array.

Event: unsubscribe

  • unsubscriptions Array<string>
  • client <Client>

Emitted when client successfully unsubscribe the subscriptions in server.

unsubscriptions are an array of unsubscribed topics.

Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers to inform a client successfully unsubscribed to one or more topics. The payload is a JSON that has clientId and subs props, subs equals to unsubscriptions array.

Event: connackSent

Emitted when server sends an acknowledge to client. Please refer to the MQTT specification for the explanation of returnCode object property in CONNACK.

Event: closed

Emitted when server is closed.

aedes.handle (stream)

  • stream: <net.Socket> | <stream.Duplex>
  • Returns: <Client>

A connection listener that pipe stream to aedes.

const aedes = require('./aedes')()
const server = require('net').createServer(aedes.handle)

aedes.subscribe (topic, deliverfunc, callback)

  • topic: <string>
  • deliverfunc: <Function> (packet, cb) => void
    • packet: <aedes-packet> & PUBLISH
    • cb: <Function>
  • callback: <Function>

Directly subscribe a topic in server side. Bypass authorizeSubscribe

The topic and deliverfunc is a compound key to differentiate the uniqueness of its subscription pool. topic could be the one that is existed, in this case deliverfunc will be invoked as well as SUBSCRIBE does.

deliverfunc supports backpressue.

In aedes internal, deliverfunc is a function that delivers messages to subscribed clients.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

In general you would find most properities in packet is same as what the incoming PUBLISH is. For sure cmd property in packet structure in deliverfunc must be publish.

Note! it requires deliverfunc to call cb before the function returns, otherwise some subscribed clients with same topic will not receive messages.

callback is invoked when server successfully registers the subscription.

aedes.unsubscribe (topic, deliverfunc, callback)

Reverse of aedes.subscribe.

Note! the deliverfunc should be same as when aedes.subscribe does, otherwise the unsubscription will fail.

aedes.publish (packet, callback)

  • packet <object> PUBLISH
  • callback <Function> (error) => void
    • error <Error> | null

Directly deliver packet on behalf of server to subscribed clients. Bypass authorizePublish.

callback will be invoked with error arugments after finish.

aedes.close ([callback])

  • callback: <Function>

Close aedes server and disconnects all clients.

callback will be invoked when server is closed.

Handler: preConnect (client, packet, callback)

  • client: <Client>
  • packet: <object> CONNECT
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked when server receives a valid CONNECT packet. The packet can be modified.

client object is in default state. If invoked callback with no errors and successful be true, server will continue to establish a session.

Any error will be raised in connectionError event.

Some Use Cases:

  1. Rate Limit / Throttle by client.conn.remoteAddress
  2. Check aedes.connectedClient to limit maximum connections
  3. IP blacklisting
aedes.preConnect = function(client, packet, callback) {
  callback(null, client.conn.remoteAddress === '::1') {
}
aedes.preConnect = function(client, packet, callback) {
  callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}

Handler: authenticate (client, username, password, callback)

  • client: <Client>
  • username: <string>
  • password: <Buffer>
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked after preConnect.

Server parses the CONNECT packet, initializes client object which set client.id to match the one in CONNECT packet and extract username and password as parameters for user-defined authentication flow.

If invoked callback with no errors and successful be true, server authenticates client and continues to setup the client session.

If authenticated, server acknowledges a CONNACK with returnCode=0, otherwise returnCode=5. Users could define the value between 2 and 5 by defining a returnCode property in error object.

aedes.authenticate = function (client, username, password, callback) {
  callback(null, username === 'matteo')
}
aedes.authenticate = function (client, username, password, callback) {
  var error = new Error('Auth error')
  error.returnCode = 4
  callback(error, null)
}

Please refer to Connect Return Code to see their meanings.

Handler: authorizePublish (client, packet, callback)

  • client: <Client> | null
  • packet: <object> PUBLISH
  • callback: <Function> (error) => void
    • error <Error> | null

Invoked when

  1. publish LWT to all online clients
  2. incoming client publish

client is null when aedes publishes obsolete LWT without connected clients

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error. If an error occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)

aedes.authorizePublish = function (client, packet, callback) {
  if (packet.topic === 'aaaa') {
    return callback(new Error('wrong topic'))
  }
  if (packet.topic === 'bbb') {
    packet.payload = Buffer.from('overwrite packet payload')
  }
  callback(null)
}

By default authorizePublish throws error in case a client publish to topics with $SYS/ prefix to prevent possible DoS (see #597). If you write your own implementation of authorizePublish we suggest you to add a check for this. Default implementation:

function defaultAuthorizePublish (client, packet, callback) {
  if (packet.topic.startsWith($SYS_PREFIX)) {
    return callback(new Error($SYS_PREFIX + ' topic is reserved'))
  }
  callback(null)
}

Handler: authorizeSubscribe (client, subscription, callback)

  • client: <Client>
  • subscription: <object>
  • callback: <Function> (error) => void
    • error <Error> | null
    • subscription: <object> | null

Invoked when

  1. restore subscriptions in non-clean session.
  2. incoming client SUBSCRIBE

subscription is a dictionary object like { topic: hello, qos: 0 }.

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error.

In general user should not touch the subscription and pass to callback, but server gives an option to change the subscription on-the-fly.

aedes.authorizeSubscribe = function (client, sub, callback) {
  if (sub.topic === 'aaaa') {
    return callback(new Error('wrong topic'))
  }
  if (sub.topic === 'bbb') {
    // overwrites subscription
    sub.topic = 'foo'
    sub.qos = 1
  }
  callback(null, sub)
}

To negate a subscription, set the subscription to null. Aedes ignores the negated subscription and the qos in SubAck is set to 128 based on MQTT 3.11 spec:

aedes.authorizeSubscribe = function (client, sub, callback) {
  // prohibited to subscribe 'aaaa' and suppress error
  callback(null, sub.topic === 'aaaa' ? null : sub)
}

Handler: authorizeForward (client, packet)

  • client: <Client>
  • packet: <aedes-packet> & PUBLISH
  • Returns: <aedes-packet> | null

Invoked when

  1. aedes sends retained messages when client reconnects
  2. aedes pre-delivers subscribed message to clients

Return null will not forward packet to clients.

In general user should not touch the packet and return it what it is, but server gives an option to change the packet on-the-fly and forward it to clients.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

aedes.authorizeForward = function (client, packet) {
  if (packet.topic === 'aaaa' && client.id === "I should not see this") {
    return
  }
  if (packet.topic === 'bbb') {
    packet.payload = new Buffer('overwrite packet payload')
  }
  return packet
}

Handler: published (packet, client, callback)

same as Event: publish, but provides a backpressure functionality. TLDR; If you are doing operations on packets that MUST require finishing operations on a packet before handling the next one use this otherwise, expecially for long running operations, you should use Event: publish instead.