Skip to content

artcom/mqtt-topping

Repository files navigation

mqtt-topping

Wraps the MQTT.js client to multiplex incoming messages to the subscribed handlers and supports querying retained topics via HTTP.

MQTT.js events can also be registered on the mqtt-topping client.

Expects that the default MQTT message payload is formatted as JSON.

MQTT Client

Features

  • Subscribe and unsubscribe handler callbacks to individual (wildcard) topics
  • JSON.stringify all published payloads
  • JSON.parse all incoming payloads
  • Ignore non-JSON payloads
  • Decide whether to retain a message or not depending on the topic name (retained unless topic is prefixed with on or do)
  • Publishes and subscriptions are sent with quality-of-service 2

Connect, Subscribe, Publish, Unpublish and Register Event "offline"

const { connectAsync } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")

  client.on("offline", () => console.error("Client is offline. Trying to reconnect."))

  await client.subscribe("my/topic", (payload, topic, packet) => {
    console.log(
      "Received Payload " + payload + " for Topic " + topic + " (retained = " + packet.retain + ")"
    )
  })

  await client.publish("my/topic", "myPayload")

  await client.unpublish("my/topic")
}

HTTP Client

Features

  • Works with the broker plugin "HiveMQ Retained Message Query Plugin"
  • Supports single and batch queries including wildcard topics, additional options are:
    • parseJson: Parse the result.payload as JSON. Default is true.
    • depth: Specifies the recursive depth of the query. A depth > 0 returns subtopics in result.children. Default is 0.
    • flatten: Flattens all topics into a flat array. Default is false.
  • Supports single and batch json queries which:
    • return entire topic trees (topics with subtopics) as one JSON object
    • ignore topic payloads if subtopics exist

Single Query

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("my/topic", "myPayload")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.query({ topic: "my", depth: 1 })

  // {
  // "topic": "my",
  // "children": [
  //     {
  //         "topic": "my/topic",
  //         "payload": "myPayload"
  //     }
  //   ]
  // }
}

Batch Query

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("my/topic1", "myPayload1")
  await client.publish("my/topic2", "myPayload2")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.queryBatch([{ topic: "my/topic1" }, { topic: "my/topic2" }])

  // [
  //   {
  //       "topic": "my/topic1",
  //       "payload": "myPayload1"
  //   },
  //   {
  //       "topic": "my/topic2",
  //       "payload": "myPayload2"
  //   }
  // ]
}

QueryJson

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("my/topic", "myPayload")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.queryJson("my")

  // {
  //   "topic": "myPayload"
  // }
}

QueryJsonBatch

const { connectAsync, HttpClient } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("january/first", "eat")
  await client.publish("january/second", "sleep")
  await client.publish("february/first", "work")
  await client.publish("february/second", "repeat")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await httpClient.queryJsonBatch(["january", "february"])

  // [
  //   {
  //     "first": "eat"
  //     "second": "sleep"
  //   },
  //   {
  //     "first": "work"
  //     "second": "repeat"
  //   }
  // ]
}

Unpublish Recursively

const { connectAsync, HttpClient, unpublishRecursively } = require("@artcom/mqtt-topping")

async function main() {
  const client = await connectAsync("tcp://broker.example.com")
  const httpClient = new HttpClient("http://broker.example.com/query")

  await client.publish("january/first", "eat")
  await client.publish("january/second", "sleep")
  await client.publish("february/first", "work")
  await client.publish("february/second", "repeat")

  // wait a few milliseconds to ensure the data is processed on the server

  const result = await unpublishRecursively(mqttClient, httpClient, "february")

  // remaining published topics on the broker
  // january/first: "eat"
  // january/second: "sleep"
}

Development

Build

npm install
npm run build

Test

The tests require a running MQTT broker instance with the "HiveMQ Retained Message Query Plugin".

npm install
npm run build
npm run test

About

Syntactical sugar on top of the MQTT client cake

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •