Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests for MQTT retained messages in various cluster/domain conf… #5082

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
67 changes: 0 additions & 67 deletions .github/workflows/MQTT_test.yaml

This file was deleted.

48 changes: 48 additions & 0 deletions .github/workflows/mqtt-test.yaml
@@ -0,0 +1,48 @@
name: MQTTEx
on: [push, pull_request]

jobs:
test:
env:
GOPATH: /home/runner/work/nats-server
GO111MODULE: "on"
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
path: src/github.com/nats-io/nats-server

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version-file: src/github.com/nats-io/nats-server/go.mod
cache-dependency-path: src/github.com/nats-io/nats-server/go.sum

- name: Set up testing tools and environment
shell: bash --noprofile --norc -eo pipefail {0}
id: setup
run: |
wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb
sudo apt install ./mqtt-cli-4.20.0.deb
go install github.com/ConnectEverything/mqtt-test@v0.1.0

- name: Run tests (3 times to detect flappers)
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test -v --count=3 --run='MQTTEx' ./server

- name: Run tests with --race
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test -v --race --failfast --run='MQTTEx' ./server

- name: Run benchmarks
shell: bash --noprofile --norc -eo pipefail {0}
run: |
cd src/github.com/nats-io/nats-server
go test --run='-' --count=3 --bench 'MQTTEx' --benchtime=100x ./server

# TODO: compare benchmarks
6 changes: 6 additions & 0 deletions server/mqtt.go
Expand Up @@ -1954,21 +1954,27 @@ func (as *mqttAccountSessionManager) processRetainedMsg(_ *subscription, c *clie
}
// If lastSeq is 0 (nothing to recover, or done doing it) and this is
// from our own server, ignore.
as.mu.RLock()
if as.rrmLastSeq == 0 && rm.Origin == as.jsa.id {
as.mu.RUnlock()
return
}
as.mu.RUnlock()

// At this point we either recover from our own server, or process a remote retained message.
seq, _, _ := ackReplyInfo(reply)

// Handle this retained message, no need to copy the bytes.
as.handleRetainedMsg(rm.Subject, &mqttRetainedMsgRef{sseq: seq}, rm, false)

// If we were recovering (lastSeq > 0), then check if we are done.
as.mu.Lock()
if as.rrmLastSeq > 0 && seq >= as.rrmLastSeq {
as.rrmLastSeq = 0
close(as.rrmDoneCh)
as.rrmDoneCh = nil
}
as.mu.Unlock()
}

func (as *mqttAccountSessionManager) processRetainedMsgDel(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down
134 changes: 22 additions & 112 deletions server/mqtt_ex_test.go → server/mqtt_ex_bench_test.go
Expand Up @@ -17,51 +17,12 @@
package server

import (
"bytes"
"encoding/json"
"fmt"
"os"
"os/exec"
"strconv"
"testing"
"time"
)

func TestMQTTExCompliance(t *testing.T) {
mqttPath := os.Getenv("MQTT_CLI")
if mqttPath == "" {
if p, err := exec.LookPath("mqtt"); err == nil {
mqttPath = p
}
}
if mqttPath == "" {
t.Skip(`"mqtt" command is not found in $PATH nor $MQTT_CLI. See https://hivemq.github.io/mqtt-cli/docs/installation/#debian-package for installation instructions`)
}

conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
server_name: mqtt
jetstream {
store_dir = %q
}
mqtt {
listen: 127.0.0.1:-1
}
`, t.TempDir())))
s, o := RunServerWithConfig(conf)
defer testMQTTShutdownServer(s)

cmd := exec.Command(mqttPath, "test", "-V", "3", "-p", strconv.Itoa(o.MQTT.Port))

output, err := cmd.CombinedOutput()
t.Log(string(output))
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
t.Fatalf("mqtt cli exited with error: %v", exitError)
}
}
}

const (
KB = 1024
)
Expand All @@ -83,9 +44,6 @@ type mqttBenchContext struct {

Host string
Port int

// full path to mqtt-test command
testCmdPath string
}

var mqttBenchDefaultMatrix = mqttBenchMatrix{
Expand All @@ -103,7 +61,11 @@ type MQTTBenchmarkResult struct {
}

func BenchmarkMQTTEx(b *testing.B) {
bc := mqttNewBenchEx(b)
if mqttexTestCommandPath == "" {
b.Skip(`"mqtt-test" command is not found in $PATH.`)
}

bc := mqttBenchContext{}
b.Run("Server", func(b *testing.B) {
b.Cleanup(bc.startServer(b, false))
bc.runAll(b)
Expand Down Expand Up @@ -142,11 +104,11 @@ func (bc mqttBenchContext) benchmarkPub(b *testing.B) {

b.Run("PUB", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pub",
bc.runAndReport(b, "pub",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-publishers", strconv.Itoa(bc.Publishers),
"--publishers", strconv.Itoa(bc.Publishers),
)
})
})
Expand All @@ -165,11 +127,11 @@ func (bc mqttBenchContext) benchmarkPubRetained(b *testing.B) {

b.Run("PUBRET", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pub", "--retain",
bc.runAndReport(b, "pub", "--retain",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-publishers", strconv.Itoa(bc.Publishers),
"--publishers", strconv.Itoa(bc.Publishers),
)
})
})
Expand All @@ -185,11 +147,11 @@ func (bc mqttBenchContext) benchmarkPubSub(b *testing.B) {

b.Run("PUBSUB", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "pubsub",
bc.runAndReport(b, "pubsub",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N),
"--messages", strconv.Itoa(b.N),
"--size", strconv.Itoa(bc.MessageSize),
"--num-subscribers", strconv.Itoa(bc.Subscribers),
"--subscribers", strconv.Itoa(bc.Subscribers),
)
})
})
Expand All @@ -206,67 +168,23 @@ func (bc mqttBenchContext) benchmarkSubRet(b *testing.B) {

b.Run("SUBRET", func(b *testing.B) {
m.runMatrix(b, bc, func(b *testing.B, bc *mqttBenchContext) {
bc.runCommand(b, "subret",
bc.runAndReport(b, "subret",
"--qos", strconv.Itoa(bc.QOS),
"--n", strconv.Itoa(b.N), // number of subscribe requests
"--num-subscribers", strconv.Itoa(bc.Subscribers),
"--num-topics", strconv.Itoa(bc.Topics),
"--topics", strconv.Itoa(bc.Topics), // number of retained messages
"--subscribers", strconv.Itoa(bc.Subscribers),
"--size", strconv.Itoa(bc.MessageSize),
"--repeat", strconv.Itoa(b.N), // number of subscribe requests
)
})
})
}

func mqttBenchLookupCommand(b *testing.B, name string) string {
func (bc mqttBenchContext) runAndReport(b *testing.B, name string, extraArgs ...string) {
b.Helper()
cmd, err := exec.LookPath(name)
if err != nil {
b.Skipf("%q command is not found in $PATH. Please `go install github.com/nats-io/meta-nats/apps/go/mqtt/...@latest` and try again.", name)
}
return cmd
}

func (bc mqttBenchContext) runCommand(b *testing.B, name string, extraArgs ...string) {
b.Helper()

args := append([]string{
name,
"-q",
"--servers", fmt.Sprintf("%s:%d", bc.Host, bc.Port),
}, extraArgs...)

cmd := exec.Command(bc.testCmdPath, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
b.Fatalf("Error executing %q: %v", cmd.String(), err)
}
defer stdout.Close()
errbuf := bytes.Buffer{}
cmd.Stderr = &errbuf
if err = cmd.Start(); err != nil {
b.Fatalf("Error executing %q: %v", cmd.String(), err)
}
r := &MQTTBenchmarkResult{}
if err = json.NewDecoder(stdout).Decode(r); err != nil {
b.Fatalf("failed to decode output of %q: %v\n\n%s", cmd.String(), err, errbuf.String())
}
if err = cmd.Wait(); err != nil {
b.Fatalf("Error executing %q: %v\n\n%s", cmd.String(), err, errbuf.String())
}

r := mqttexRunTest(b, name, mqttExNewDial("", "", bc.Host, bc.Port, ""), extraArgs...)
r.report(b)
}

func (bc mqttBenchContext) initServer(b *testing.B) {
b.Helper()
bc.runCommand(b, "pubsub",
"--id", "__init__",
"--qos", "0",
"--n", "1",
"--size", "100",
"--num-subscribers", "1")
}

func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func() {
b.Helper()
b.StopTimer()
Expand All @@ -278,7 +196,7 @@ func (bc *mqttBenchContext) startServer(b *testing.B, disableRMSCache bool) func
o = s.getOpts()
bc.Host = o.MQTT.Host
bc.Port = o.MQTT.Port
bc.initServer(b)
mqttExInitServer(b, mqttExNewDial("", "", bc.Host, bc.Port, ""))
return func() {
testMQTTShutdownServer(s)
testDisableRMSCache = prevDisableRMSCache
Expand Down Expand Up @@ -314,7 +232,7 @@ func (bc *mqttBenchContext) startCluster(b *testing.B, disableRMSCache bool) fun
o := cl.randomNonLeader().getOpts()
bc.Host = o.MQTT.Host
bc.Port = o.MQTT.Port
bc.initServer(b)
mqttExInitServer(b, mqttExNewDial("", "", bc.Host, bc.Port, ""))
return func() {
cl.shutdown()
testDisableRMSCache = prevDisableRMSCache
Expand Down Expand Up @@ -410,15 +328,7 @@ func (r MQTTBenchmarkResult) report(b *testing.B) {
nsOp := float64(ns) / float64(r.Ops)
b.ReportMetric(nsOp/1000000, unit+"_ms/op")
}

// Diable ReportAllocs() since it confuses the github benchmarking action
// with the noise.
// b.ReportAllocs()
}

func mqttNewBenchEx(b *testing.B) *mqttBenchContext {
cmd := mqttBenchLookupCommand(b, "mqtt-test")
return &mqttBenchContext{
testCmdPath: cmd,
}
}