Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests for MQTT retained messages in various cluster/domain conf… #5082

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
34 changes: 2 additions & 32 deletions .github/workflows/MQTT_test.yaml
@@ -1,10 +1,6 @@
name: MQTTEx
on: [push, pull_request]

permissions:
pull-requests: write # to comment on PRs
contents: write # to comment on commits (to upload artifacts?)

jobs:
test:
env:
Expand All @@ -29,39 +25,13 @@ jobs:
run: |
wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb
sudo apt install ./mqtt-cli-4.20.0.deb
go install github.com/ConnectEverything/mqtt-test@latest

# - name: Download benchmark result for ${{ github.base_ref || github.ref_name }}
# uses: dawidd6/action-download-artifact@v2
# continue-on-error: true
# with:
# path: src/github.com/nats-io/nats-server/bench
# name: bench-output-${{ runner.os }}
# branch: ${{ github.base_ref || github.ref }}
go install github.com/ConnectEverything/mqtt-test@v0.1.0

- name: Run tests and benchmarks
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test -v --run='MQTTEx' ./server
# go test --run='-' --count=10 --bench 'MQTT_' ./server | tee output.txt
# go test --run='-' --count=10 --bench 'MQTTEx' --timeout=20m --benchtime=100x ./server | tee -a output.txt
go test --run='-' --count=3 --bench 'MQTTEx' --benchtime=100x ./server

# - name: Compare benchmarks
# uses: benchmark-action/github-action-benchmark@v1
# with:
# tool: go
# output-file-path: src/github.com/nats-io/nats-server/output.txt
# github-token: ${{ secrets.GITHUB_TOKEN }}
# alert-threshold: 140%
# comment-on-alert: true
# # fail-on-alert: true
# external-data-json-path: src/github.com/nats-io/nats-server/bench/benchmark-data.json

# - name: Store benchmark result for ${{ github.ref_name }}
# if: always()
# uses: actions/upload-artifact@v3
# with:
# path: src/github.com/nats-io/nats-server/bench
# name: bench-output-${{ runner.os }}
# TODO: compare benchmarks
134 changes: 22 additions & 112 deletions server/mqtt_ex_test.go → server/mqtt_ex_bench_test.go
Expand Up @@ -17,51 +17,12 @@
package server

import (
"bytes"
"encoding/json"
"fmt"
"os"
"os/exec"
"strconv"
"testing"
"time"
)

func TestMQTTExCompliance(t *testing.T) {
mqttPath := os.Getenv("MQTT_CLI")
if mqttPath == "" {
if p, err := exec.LookPath("mqtt"); err == nil {
mqttPath = p
}
}
if mqttPath == "" {
t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`)
}

conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: mqtt
jetstream {
store_dir = %q
}
mqtt {
listen: 127.0.0.1:-1
}
`, t.TempDir())))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)

cmd := exec.Command(mqttPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port))

output, err := cmd.CombinedOutput()
t.Log(string(output))
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
t.Fatalf("mqtt cli exited with error: %v", exitError)
}
}
}

const (
KB = 1024
)
Expand All @@ -83,9 +44,6 @@ type mqttBenchContext struct {

Host string
Port int

// full path to mqtt-test command
testCmdPath string
}

var mqttBenchDefaultMatrix = mqttBenchMatrix{
Expand All @@ -103,7 +61,11 @@ type MQTTBenchmarkResult struct {
}

func BenchmarkMQTTEx(b *testing.B) {
bc := mqttNewBenchEx(b)
if mqttexTestCommandPath == "" {
b.Skip(`"mqtt-test" command is not found in $PATH.`)
}

bc := mqttBenchContext{}
b.Run("Server", func(b *testing.B) {
b.Cleanup(bc.startServer(b, false))
bc.runAll(b)
Expand Down Expand Up @@ -142,11 +104,11 @@ func (bc mqttBenchContext) benchmarkPub(b *testing.B) {

b.Run("PUB", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pub",
bc.runAndReport(b, "pub",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-publishers", strconv.Itoa(bc.Publishers),
"--publishers", strconv.Itoa(bc.Publishers),
)
})
})
Expand All @@ -165,11 +127,11 @@ func (bc mqttBenchContext) benchmarkPubRetained(b *testing.B) {

b.Run("PUBRET", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pub", "--retain",
bc.runAndReport(b, "pub", "--retain",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-publishers", strconv.Itoa(bc.Publishers),
"--publishers", strconv.Itoa(bc.Publishers),
)
})
})
Expand All @@ -185,11 +147,11 @@ func (bc mqttBenchContext) benchmarkPubSub(b *testing.B) {

b.Run("PUBSUB", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pubsub",
bc.runAndReport(b, "pubsub",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-subscribers", strconv.Itoa(bc.Subscribers),
"--subscribers", strconv.Itoa(bc.Subscribers),
)
})
})
Expand All @@ -206,67 +168,23 @@ func (bc mqttBenchContext) benchmarkSubRet(b *testing.B) {

b.Run("SUBRET", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "subret",
bc.runAndReport(b, "subret",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N), // number of subscribe requests
"--num-subscribers", strconv.Itoa(bc.Subscribers),
"--num-topics", strconv.Itoa(bc.Topics),
"--topics", strconv.Itoa(bc.Topics), // number of retained messages
"--subscribers", strconv.Itoa(bc.Subscribers),
"--size", strconv.Itoa(bc.MessageSize),
"--repeat", strconv.Itoa(b.N), // number of subscribe requests
)
})
})
}

func mqttBenchLookupCommand(b *testing.B, name string) string {
func (bc mqttBenchContext) runAndReport(b *testing.B, name string, extraArgs ...string) {
b.Helper()
cmd, err := exec.LookPath(name)
if err != nil {
b.Skipf("%q command is not found in $PATH. Please `go install github.com/nats-io/meta-nats/apps/go/mqtt/...@latest` and try again.", name)
}
return cmd
}

func (bc mqttBenchContext) runCommand(b *testing.B, name string, extraArgs ...string) {
b.Helper()

args := append([]string{
name,
"-q",
"--servers", fmt.Sprintf("%s:%d", bc.Host, bc.Port),
}, extraArgs...)

cmd := exec.Command(bc.testCmdPath, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
b.Fatalf("Error executing %q: %v", cmd.String(), err)
}
defer stdout.Close()
errbuf := bytes.Buffer{}
cmd.Stderr = &errbuf
if err = cmd.Start(); err != nil {
b.Fatalf("Error executing %q: %v", cmd.String(), err)
}
r := &MQTTBenchmarkResult{}
if err = json.NewDecoder(stdout).Decode(r); err != nil {
b.Fatalf("failed to decode output of %q: %v\n\n%s", cmd.String(), err, errbuf.String())
}
if err = cmd.Wait(); err != nil {
b.Fatalf("Error executing %q: %v\n\n%s", cmd.String(), err, errbuf.String())
}

r := mqttexRunTest(b, name, mqttExNewDial("", "", bc.Host, bc.Port, ""), extraArgs...)
r.report(b)
}

func (bc mqttBenchContext) initServer(b *testing.B) {
b.Helper()
bc.runCommand(b, "pubsub",
"--id", "__init__",
"--qos", "0",
"--n", "1",
"--size", "100",
"--num-subscribers", "1")
}

func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func() {
b.Helper()
b.StopTimer()
Expand All @@ -278,7 +196,7 @@ func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func
o = s.getOpts()
bc.Host = o.MQTT.Host
bc.Port = o.MQTT.Port
bc.initServer(b)
mqttExInitServer(b, mqttExNewDial("", "", bc.Host, bc.Port, ""))
return func() {
testMQTTShutdownServer(s)
testDisableRMSCache = prevDisableRMSCache
Expand Down Expand Up @@ -314,7 +232,7 @@ func (bc *mqttBenchContext) startCluster(b *testing.B, disableRMSCache bool) fun
o := cl.randomNonLeader().getOpts()
bc.Host = o.MQTT.Host
bc.Port = o.MQTT.Port
bc.initServer(b)
mqttExInitServer(b, mqttExNewDial("", "", bc.Host, bc.Port, ""))
return func() {
cl.shutdown()
testDisableRMSCache = prevDisableRMSCache
Expand Down Expand Up @@ -410,15 +328,7 @@ func (r MQTTBenchmarkResult) report(b *testing.B) {
nsOp := float64(ns) / float64(r.Ops)
b.ReportMetric(nsOp/1000000, unit+"_ms/op")
}

// Diable ReportAllocs() since it confuses the github benchmarking action
// with the noise.
// b.ReportAllocs()
}

func mqttNewBenchEx(b *testing.B) *mqttBenchContext {
cmd := mqttBenchLookupCommand(b, "mqtt-test")
return &mqttBenchContext{
testCmdPath: cmd,
}
}