Skip to content

Commit

Permalink
Update KV CAS 'chaos' test to allow stale reads
Browse files Browse the repository at this point in the history
When detecting a stale value, retry rather than fail immediately.

Signed-off-by: Marco Primi marco@synadia.com
  • Loading branch information
mprimi committed Nov 8, 2023
1 parent cab4ce3 commit a743862
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions server/jetstream_chaos_test.go
Expand Up @@ -19,6 +19,7 @@ package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -1150,11 +1151,9 @@ putGetLoop:
}

// Multiple clients updating a finite set of keys with CAS semantics.
// TODO check that revision is never lower than last one seen
// TODO check that KeyNotFound is never returned, as keys are initialized beforehand
func TestJetStreamChaosKvCAS(t *testing.T) {
const numOps = 10_000
const maxRetries = 50
const staleGetRetry = 4 * time.Second // (twice the max server downtime)
const retryDelay = 300 * time.Millisecond
const clusterSize = 3
const replicas = 3
Expand Down Expand Up @@ -1211,6 +1210,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
successfulUpdates := 0
casRejectUpdates := 0
otherUpdateErrors := 0
staleGets := 0

// Map to track last known revision for each of the keys
knownRevisions := map[string]uint64{}
Expand Down Expand Up @@ -1246,28 +1246,39 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
} else if updateErr != nil && strings.Contains(fmt.Sprint(updateErr), "wrong last sequence") {
// CAS rejected update, learn current revision for this key
casRejectUpdates += 1
checkFor(
t,
staleGetRetry,
retryDelay,
func() error {
kve, getErr := kv.Get(key)
if getErr != nil {
// Get failed
if chaosKvTestsDebug {
t.Logf("Client %d Get('%s') failed: %s", clientId, key, err)
}
return err
}

for r := 0; r <= maxRetries; r++ {
kve, getErr := kv.Get(key)
if getErr == nil {
currentRevision := kve.Revision()
if currentRevision < knownRevisions[key] {
// Revision number moved backward, this should never happen
t.Fatalf("Current revision for key %s is %d, which is lower than the last known revision %d", key, currentRevision, knownRevisions[key])

if kve.Revision() <= knownRevisions[key] {
// A higher revision is known to exist, this value must be stale
staleGets += 1
if chaosKvTestsDebug {
t.Logf("Client %d Get('%s') returned stale revision: %d, older than known: %d", clientId, key, kve.Revision(), knownRevisions[key])
}
return fmt.Errorf("get returned stale value")
}

knownRevisions[key] = currentRevision
// Revision is newer than known, worth storing for the next CAS attempt
knownRevisions[key] = kve.Revision()
if chaosKvTestsDebug {
t.Logf("Client %d learn key %s revision: %d", clientId, key, currentRevision)
t.Logf("Client %d learn key %s revision: %d", clientId, key, kve.Revision())
}
break
} else if r == maxRetries {
t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr)
} else {
time.Sleep(retryDelay)
}
}
return nil
},
)
} else if errors.Is(updateErr, nats.ErrKeyNotFound) {
t.Fatalf("Client %d updating key '%s' got %s", clientId, key, updateErr)
} else {
// Other update error
otherUpdateErrors += 1
Expand All @@ -1277,7 +1288,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
time.Sleep(retryDelay)
}
}
t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d other errors", clientId, successfulUpdates, casRejectUpdates, otherUpdateErrors)
t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d stale gets, %d other errors", clientId, successfulUpdates, casRejectUpdates, staleGets, otherUpdateErrors)
}

// Launch all clients
Expand Down

0 comments on commit a743862

Please sign in to comment.