Skip to content

Commit

Permalink
Add test that shows clfs behavior on dups
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Mar 7, 2024
1 parent f30c7e1 commit bb404c5
Showing 1 changed file with 120 additions and 0 deletions.
120 changes: 120 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -6968,3 +6968,123 @@ func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) {
require_True(t, leader != nil)
checkTimer(leader)
}

func TestJetStreamClusterCLFSOnDuplicates(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

nc2, js2 := jsClientConnect(t, c.randomServer())
defer nc2.Close()

streamName := "TESTW"
_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"foo"},
Replicas: 3,
Storage: nats.FileStorage,
MaxAge: 3 * time.Minute,
Duplicates: 2 * time.Minute,
})
require_NoError(t, err)

// Give the stream to be ready.
time.Sleep(3 * time.Second)

var wg sync.WaitGroup

// The test will be successful if it runs for this long without dup issues.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

go func() {
tick := time.NewTicker(10 * time.Second)
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-tick.C:
c.streamLeader(globalAccountName, streamName).JetStreamStepdownStream(globalAccountName, streamName)
}
}
}()
wg.Add(1)

for i := 0; i < 5; i++ {
go func(i int) {
var err error
sub, err := js2.PullSubscribe("foo", fmt.Sprintf("A:%d", i))
require_NoError(t, err)

for {
select {
case <-ctx.Done():
wg.Done()
return
default:
}

msgs, err := sub.Fetch(100, nats.MaxWait(200*time.Millisecond))
if err != nil {
continue
}
for _, msg := range msgs {
msg.Ack()
}
}
}(i)
wg.Add(1)
}

// Sync producer that only does a couple of duplicates, cancel the test
// if we get too many errors without responses.
errCh := make(chan error, 10)
go func() {
// Try sync publishes normally in this state and see if it times out.
for i := 0; ; i++ {
select {
case <-ctx.Done():
wg.Done()
return
default:
}

var succeeded bool
var failures int
for n := 0; n < 10; n++ {
_, err := js.Publish("foo", []byte("test"), nats.MsgId(fmt.Sprintf("sync:checking:%d", i)), nats.RetryAttempts(30), nats.AckWait(500*time.Millisecond))
if err != nil {
failures++
continue
}
succeeded = true
}
if !succeeded {
errCh <- fmt.Errorf("Too many publishes failed with timeout: failures=%d, i=%d", failures, i)
}
}
}()
wg.Add(1)

Loop:
for n := uint64(0); true; n++ {
select {
case <-ctx.Done():
break Loop
case e := <-errCh:
t.Error(e)
break Loop
default:
}
// Cause a lot of duplicates very fast until producer stalls.
for i := 0; i < 128; i++ {
msgID := nats.MsgId(fmt.Sprintf("id.%d.%d", n, i))
js.PublishAsync("foo", []byte("test"), msgID, nats.RetryAttempts(10))
}
}
cancel()
wg.Wait()
}

0 comments on commit bb404c5

Please sign in to comment.