Skip to content

Commit

Permalink
Simplify test add check expecting the stream num deleted to be last c…
Browse files Browse the repository at this point in the history
…ount

Signed-off-by: Byron Ruth <byron@nats.io>
  • Loading branch information
bruth committed Apr 1, 2024
1 parent e272ed1 commit 900c363
Showing 1 changed file with 117 additions and 55 deletions.
172 changes: 117 additions & 55 deletions server/consumer_test.go
Expand Up @@ -5,15 +5,14 @@ import (
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"

"github.com/nats-io/nats.go"
)

func TestJetStreamWorkQueueMultiConsumer(t *testing.T) {
s := RunBasicJetStreamServer(t)
s := RunJetStreamServerOnPort(53951, t.TempDir())
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
Expand All @@ -36,20 +35,19 @@ func TestJetStreamWorkQueueMultiConsumer(t *testing.T) {
go func() {
cnt := 0
for j := 0; j < 1000; j++ {

// Each loop will generate 3000 messages, 1000 for each consumer.
var pas []nats.PubAckFuture
for i := 0; i < 1000; i++ {
cnt++
pa, err := js.PublishAsync(fmt.Sprintf("test.1.%d", cnt), nil)
pa, err := js.PublishAsync(fmt.Sprintf("test.0.%d", cnt), nil)
require_NoError(t, err)
pas = append(pas, pa)

pa, err = js.PublishAsync(fmt.Sprintf("test.2.%d", cnt), nil)
pa, err = js.PublishAsync(fmt.Sprintf("test.1.%d", cnt), nil)
require_NoError(t, err)
pas = append(pas, pa)

pa, err = js.PublishAsync(fmt.Sprintf("test.3.%d", cnt), nil)
pa, err = js.PublishAsync(fmt.Sprintf("test.2.%d", cnt), nil)
require_NoError(t, err)
pas = append(pas, pa)
}
Expand All @@ -67,62 +65,110 @@ func TestJetStreamWorkQueueMultiConsumer(t *testing.T) {
t.Logf("Published %d messages", cnt*3)
}()

wg := &sync.WaitGroup{}
wg.Add(3)
// Change to 0, 1, or 2 to test each consumer.
worker := 0
// Set to true to periodically unsubscribe and rebind the consumer.
toggleUnsub := false

// Keep track of the last message counter which is determined
// based on the third token in the subject.
totalRem := 1_000_000
// Keep track of the left over messages that were requested, but not received.
subRem := 0
// Keep track of the last message counter that was received.
lstCnt := 0

// Start three consumers that will consume from the work queue
// each with a different filter subject bound to the second token.
for i := 0; i < 3; i++ {
_, err := js.AddConsumer("WQ", &nats.ConsumerConfig{
Durable: fmt.Sprintf("WQ-%d", i+1),
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: fmt.Sprintf("test.%d.*", i+1),
})
_, err = js.AddConsumer("WQ", &nats.ConsumerConfig{
Durable: fmt.Sprintf("WQ-%d", worker),
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: fmt.Sprintf("test.%d.*", worker),
})
require_NoError(t, err)

// Start each worker in a routine.
t.Logf("[%d] Starting worker", worker)

// Setup the initial pull subscription bound to the consumer.
sub, err := js.PullSubscribe(
fmt.Sprintf("test.%d.*", worker),
fmt.Sprintf("WQ-%d", worker),
nats.Bind("WQ", fmt.Sprintf("WQ-%d", worker)),
)
require_NoError(t, err)

for totalRem > 0 {
// Fetch a random number of messages up to 512.
n := rand.Intn(512) + 1
t0 := time.Now()
msgs, err := sub.Fetch(n, nats.MaxWait(2*time.Second))
if err == nats.ErrTimeout {
t.Logf("[%d] Timeout fetching %d messages in %v", worker, n, time.Since(t0))
continue
}
require_NoError(t, err)
if len(msgs) != n {
t.Logf("[%d] Fetched %d/%d messages in %v", worker, len(msgs), n, time.Since(t0))
}

// Start each worker in a routine.
go func(i int) {
t.Logf("[%d] Starting worker", i+1)
defer wg.Done()
for _, msg := range msgs {
md, _ := msg.Metadata()
if md.NumDelivered > 1 {
t.Errorf("Expected only one delivery, got %d", md.NumDelivered)
return
}
// Check that the counter is only +1 than the last.
cnt, err := strconv.Atoi(msg.Subject[7:])
require_NoError(t, err)
if cnt == lstCnt+1 {
lstCnt = cnt
msg.Ack()
continue
}

// Setup the initial pull subscription bound to the consumer.
sub, err := js.PullSubscribe(
fmt.Sprintf("test.%d.*", i+1),
fmt.Sprintf("WQ-%d", i+1),
nats.Bind("WQ", fmt.Sprintf("WQ-%d", i+1)),
)
// A mismatch is detected. Log the consumer, metadata, and return.
mdd, err := json.Marshal(md)
require_NoError(t, err)
defer sub.Drain()

// Keep track of the last message counter which is determined
// based on the third token in the subject.
rem := 3_000_000
lstCnt := 0
for rem > 0 {
// Randomly drain the consumer and rebind the consumer every so often.
if rand.Intn(100)%5 == 0 {
err = sub.Drain()
require_NoError(t, err)
ci, err := js.ConsumerInfo("WQ", fmt.Sprintf("WQ-%d", worker))
require_NoError(t, err)
cis := fmt.Sprintf("Delivered: %d, AckFloor: %d, Pending: %d\n", ci.Delivered.Consumer, ci.AckFloor.Consumer, ci.NumPending)

// Rebind with a new subscription.
sub, err = js.PullSubscribe(
fmt.Sprintf("test.%d.*", i+1),
fmt.Sprintf("WQ-%d", i+1),
nats.Bind("WQ", fmt.Sprintf("WQ-%d", i+1)),
)
require_NoError(t, err)
}
t.Errorf("[%d] Expected %d, got %d\nMetadata:\n%s\nConsumer info:\n%s", worker, lstCnt+1, cnt, mdd, cis)
return
}

// Keep track of the messages that were requested vs. received. The remaining will be fetched
// prior to unsubscribing.
totalRem -= len(msgs)
subRem += (n - len(msgs))

// Fetch a random number of messages up to 100.
n := rand.Intn(100) + 1
msgs, err := sub.Fetch(n, nats.MaxWait(500*time.Millisecond))
// Randomly unsub the consumer and rebind a fifth of the time.
if toggleUnsub && rand.Intn(100)%20 == 0 {
if subRem > 0 {
t.Logf("[%d] Prepping drain.. %d remaining messages", worker, subRem)
}

// Ensure outstanding queued/pending messages are processed.
for subRem > 0 {
t0 := time.Now()
msgs, err := sub.Fetch(1, nats.MaxWait(time.Second))
if err == nats.ErrTimeout {
t.Logf("[%d] Timeout fetching 1 message in %v", worker, time.Since(t0))
continue
}
require_NoError(t, err)

subRem -= len(msgs)
totalRem -= len(msgs)

for _, msg := range msgs {
// Check that the counter is only +1 than the last.
md, _ := msg.Metadata()
if md.NumDelivered > 1 {
t.Errorf("Expected only one delivery, got %d", md.NumDelivered)
return
}

cnt, err := strconv.Atoi(msg.Subject[7:])
require_NoError(t, err)
if cnt == lstCnt+1 {
Expand All @@ -131,22 +177,38 @@ func TestJetStreamWorkQueueMultiConsumer(t *testing.T) {
continue
}

// A mismatch is detected. Log the consumer, metadata, and return.
md, _ := msg.Metadata()
mdd, err := json.Marshal(md)
require_NoError(t, err)
ci, err := js.ConsumerInfo("WQ", fmt.Sprintf("WQ-%d", i+1))
ci, err := js.ConsumerInfo("WQ", fmt.Sprintf("WQ-%d", worker))
require_NoError(t, err)
cis := fmt.Sprintf("Delivered: %d, AckFloor: %d, Pending: %d\n", ci.Delivered.Consumer, ci.AckFloor.Consumer, ci.NumPending)

t.Errorf("[%d] Expected %d, got %d\nMetadata:\n%s\nConsumer info:\n%s", i, lstCnt+1, cnt, mdd, cis)
t.Errorf("[%d] Expected %d, got %d\nMetadata:\n%s\nConsumer info:\n%s", worker, lstCnt+1, cnt, mdd, cis)
return
}

rem -= len(msgs)
}
}(i)

err = sub.Unsubscribe()
require_NoError(t, err)

// Rebind with a new subscription.
sub, err = js.PullSubscribe(
fmt.Sprintf("test.%d.*", worker),
fmt.Sprintf("WQ-%d", worker),
nats.Bind("WQ", fmt.Sprintf("WQ-%d", worker)),
)
require_NoError(t, err)
}
}

wg.Wait()
t.Logf("[%d] Finished consuming %d messages", worker, lstCnt)

sub.Unsubscribe()

// Allow stream to catch up deleted messages.
time.Sleep(2 * time.Second)

si, err := js.StreamInfo("WQ")
require_NoError(t, err)
require_Equal(t, lstCnt, si.State.NumDeleted)
}

0 comments on commit 900c363

Please sign in to comment.