Skip to content

Commit

Permalink
feat: keepalive manager (#1865)
Browse files Browse the repository at this point in the history
* fix: keepalive management

* fix: skip shift on 'publish' packets

* fix: better keepalive checks

* fix: refactor methods

* fix: example

* refactor: rename method

* fix: keepalive tests

* fix: remove .only

* fix: connack

* fix: log mock

* fix: do not allow setting 0 as keepalive in manager

* fix: remove useless rescheduling on connack

* fix: flaky test

* fix: add math.ceil to keepalive interval every
  • Loading branch information
robertsLando committed May 13, 2024
1 parent ea4ec78 commit bad160a
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 271 deletions.
3 changes: 1 addition & 2 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ npm test

This will run both `browser` and `node` tests.


### Running specific tests

For example, you can run `node -r esbuild-register --test test/pingTimer.ts`
For example, you can run `node -r esbuild-register --test test/keepaliveManager.ts`

### Browser

Expand Down
6 changes: 3 additions & 3 deletions example.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import mqtt from './src/index'

const client = mqtt.connect('mqtts://test.mosquitto.org', {
keepalive: 10,
port: 8883,
const client = mqtt.connect('mqtt://broker.hivemq.com', {
keepalive: 3,
port: 1883,
reconnectPeriod: 15000,
rejectUnauthorized: false,
})
Expand Down
107 changes: 107 additions & 0 deletions src/lib/KeepaliveManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import type MqttClient from './client'
import getTimer, { type Timer } from './get-timer'
import type { TimerVariant } from './shared'

export default class KeepaliveManager {
private _keepalive: number

private timerId: number

private timer: Timer

private destroyed = false

private counter: number

private client: MqttClient

private _keepaliveTimeoutTimestamp: number

private _intervalEvery: number

/** Timestamp of next keepalive timeout */
get keepaliveTimeoutTimestamp() {
return this._keepaliveTimeoutTimestamp
}

/** Milliseconds of the actual interval */
get intervalEvery() {
return this._intervalEvery
}

get keepalive() {
return this._keepalive
}

constructor(client: MqttClient, variant: TimerVariant) {
this.client = client
this.timer = getTimer(variant)
this.setKeepalive(client.options.keepalive)
}

private clear() {
if (this.timerId) {
this.timer.clear(this.timerId)
this.timerId = null
}
}

/** Change the keepalive */
setKeepalive(value: number) {
// keepalive is in seconds
value *= 1000

if (
// eslint-disable-next-line no-restricted-globals
isNaN(value) ||
value <= 0 ||
value > 2147483647
) {
throw new Error(
`Keepalive value must be an integer between 0 and 2147483647. Provided value is ${value}`,
)
}

this._keepalive = value

this.reschedule()

this.client['log'](`KeepaliveManager: set keepalive to ${value}ms`)
}

destroy() {
this.clear()
this.destroyed = true
}

reschedule() {
if (this.destroyed) {
return
}

this.clear()
this.counter = 0

// https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.5_Keep
const keepAliveTimeout = Math.ceil(this._keepalive * 1.5)

this._keepaliveTimeoutTimestamp = Date.now() + keepAliveTimeout
this._intervalEvery = Math.ceil(this._keepalive / 2)

this.timerId = this.timer.set(() => {
// this should never happen, but just in case
if (this.destroyed) {
return
}

this.counter += 1

// after keepalive seconds, send a pingreq
if (this.counter === 2) {
this.client.sendPing()
} else if (this.counter > 2) {
this.client.onKeepaliveTimeout()
}
}, this._intervalEvery)
}
}
56 changes: 0 additions & 56 deletions src/lib/PingTimer.ts

This file was deleted.

84 changes: 29 additions & 55 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
} from './shared'
import TopicAliasSend from './topic-alias-send'
import { TypedEventEmitter } from './TypedEmitter'
import PingTimer from './PingTimer'
import KeepaliveManager from './KeepaliveManager'
import isBrowser, { isWebWorker } from './is-browser'

const setImmediate =
Expand Down Expand Up @@ -433,10 +433,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

public noop: (error?: any) => void

/** Timestamp of last received control packet */
public pingResp: number

public pingTimer: PingTimer
public keepaliveManager: KeepaliveManager

/**
* The connection to the Broker. In browsers env this also have `socket` property
Expand Down Expand Up @@ -572,8 +569,8 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
// map of a subscribe messageId and a topic
this.messageIdToTopic = {}

// Ping timer, setup in _setupPingTimer
this.pingTimer = null
// Keepalive manager, setup in _setupKeepaliveManager
this.keepaliveManager = null
// Is the client connected?
this.connected = false
// Are we disconnecting?
Expand Down Expand Up @@ -660,7 +657,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('close :: clearing connackTimer')
clearTimeout(this.connackTimer)

this._destroyPingTimer()
this._destroyKeepaliveManager()

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
Expand Down Expand Up @@ -1780,7 +1777,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this._setupReconnect()
}

this._destroyPingTimer()
this._destroyKeepaliveManager()

if (done && !this.connected) {
this.log(
Expand Down Expand Up @@ -2064,45 +2061,36 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}

/**
* _setupPingTimer - setup the ping timer
*
* @api private
* _setupKeepaliveManager - setup the keepalive manager
*/
private _setupPingTimer() {
private _setupKeepaliveManager() {
this.log(
'_setupPingTimer :: keepalive %d (seconds)',
'_setupKeepaliveManager :: keepalive %d (seconds)',
this.options.keepalive,
)

if (!this.pingTimer && this.options.keepalive) {
this.pingTimer = new PingTimer(
this.options.keepalive,
() => {
this._checkPing()
},
if (!this.keepaliveManager && this.options.keepalive) {
this.keepaliveManager = new KeepaliveManager(
this,
this.options.timerVariant,
)
this.pingResp = Date.now()
}
}

private _destroyPingTimer() {
if (this.pingTimer) {
this.log('_destroyPingTimer :: destroying ping timer')
this.pingTimer.destroy()
this.pingTimer = null
private _destroyKeepaliveManager() {
if (this.keepaliveManager) {
this.log('_destroyKeepaliveManager :: destroying keepalive manager')
this.keepaliveManager.destroy()
this.keepaliveManager = null
}
}

/**
* _shiftPingInterval - reschedule the ping interval
*
* @api private
* Reschedule the ping interval
*/
private _shiftPingInterval() {
public reschedulePing() {
if (
this.pingTimer &&
this.keepaliveManager &&
this.options.keepalive &&
this.options.reschedulePings
) {
Expand All @@ -2115,34 +2103,20 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
*/
private _reschedulePing() {
this.log('_reschedulePing :: rescheduling ping')
this.pingTimer.reschedule()
this.keepaliveManager.reschedule()
}

/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
private _checkPing() {
this.log('_checkPing :: checking ping...')
// give 100ms offset to avoid ping timeout when receiving fast responses
const timeSincePing = Date.now() - this.pingResp - 100
if (timeSincePing <= this.options.keepalive * 1000) {
this.log('_checkPing :: ping response received in time')
this._sendPing()
} else {
// do a forced cleanup since socket will be in bad shape
this.emit('error', new Error('Keepalive timeout'))
this.log('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
}

private _sendPing() {
public sendPing() {
this.log('_sendPing :: sending pingreq')
this._sendPacket({ cmd: 'pingreq' })
}

public onKeepaliveTimeout() {
this.emit('error', new Error('Keepalive timeout'))
this.log('onKeepaliveTimeout :: calling _cleanUp with force true')
this._cleanUp(true)
}

/**
* _resubscribe
* @api private
Expand Down Expand Up @@ -2205,7 +2179,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac

this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
this._setupKeepaliveManager()

this.connected = true

Expand Down
14 changes: 7 additions & 7 deletions src/lib/get-timer.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import isBrowser, { isWebWorker, isReactNativeBrowser } from './is-browser'
import { clearTimeout as clearT, setTimeout as setT } from 'worker-timers'
import { clearInterval as clearI, setInterval as setI } from 'worker-timers'
import type { TimerVariant } from './shared'

// dont directly assign globals to class props otherwise this throws in web workers: Uncaught TypeError: Illegal invocation
// See: https://stackoverflow.com/questions/9677985/uncaught-typeerror-illegal-invocation-in-chrome

export interface Timer {
set: typeof setT
clear: typeof clearT
set: typeof setI
clear: typeof clearI
}

const workerTimer: Timer = {
set: setT,
clear: clearT,
set: setI,
clear: clearI,
}

const nativeTimer: Timer = {
set: (func, time) => setTimeout(func, time),
clear: (timerId) => clearTimeout(timerId),
set: (func, time) => setInterval(func, time),
clear: (timerId) => clearInterval(timerId),
}

const getTimer = (variant: TimerVariant): Timer => {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/handlers/connack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const handleConnack: PacketHandler = (client, packet: IConnackPacket) => {
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
client['_shiftPingInterval']()
}

if (packet.properties.maximumPacketSize) {
if (!options.properties) {
options.properties = {}
Expand Down

0 comments on commit bad160a

Please sign in to comment.