Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry for stale reads in KV CAS chaos test #4769

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
168 changes: 74 additions & 94 deletions server/jetstream_chaos_test.go
Expand Up @@ -19,6 +19,7 @@ package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -191,14 +192,10 @@ func jsClientConnectCluster(t testing.TB, c *cluster) (*nats.Conn, nats.JetStrea
connectURL := strings.Join(serverConnectURLs, ",")

nc, err := nats.Connect(connectURL)
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
require_NoError(t, err)

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Failed to init JetStream context: %s", err)
}
require_NoError(t, err)

return nc, js
}
Expand Down Expand Up @@ -289,18 +286,14 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes
Subjects: []string{chaosConsumerTestsSubject},
Replicas: replicas,
})
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}
require_NoError(t, err)

ackFutures := make([]nats.PubAckFuture, 0, publishBatchSize)

for i := 1; i <= numMessages; i++ {
message := []byte(fmt.Sprintf("%d", i))
pubAckFuture, err := pubJs.PublishAsync(chaosConsumerTestsSubject, message, nats.ExpectLastSequence(uint64(i-1)))
if err != nil {
t.Fatalf("Publish error: %s", err)
}
require_NoError(t, err)
ackFutures = append(ackFutures, pubAckFuture)

if (i > 0 && i%publishBatchSize == 0) || i == numMessages {
Expand All @@ -311,7 +304,7 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes
case <-pubAckFuture.Ok():
// Noop
case pubAckErr := <-pubAckFuture.Err():
t.Fatalf("Error publishing: %s", pubAckErr)
require_NoError(t, pubAckErr)
case <-time.After(30 * time.Second):
t.Fatalf("Timeout verifying pubAck for message: %s", pubAckFuture.Msg().Data)
}
Expand Down Expand Up @@ -361,9 +354,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) {
chaosConsumerTestsSubject,
nats.OrderedConsumer(),
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, err)
defer sub.Unsubscribe()

if chaosConsumerTestsDebug {
Expand Down Expand Up @@ -393,10 +384,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) {
}

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}

require_NoError(t, err)
if metadata.Sequence.Stream != uint64(i) {
t.Fatalf("Expecting stream sequence %d, got %d instead", i, metadata.Sequence.Stream)
}
Expand Down Expand Up @@ -453,9 +441,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) {
deliveryCount += 1

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
require_NoError(t, err)
seq := metadata.Sequence.Stream

var expectedMsgData = []byte(fmt.Sprintf("%d", seq))
Expand Down Expand Up @@ -504,9 +490,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) {

subOpts := []nats.SubOpt{}
sub, err := subJs.Subscribe(chaosConsumerTestsSubject, handleMsg, subOpts...)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, err)
defer sub.Unsubscribe()

chaos.start()
Expand Down Expand Up @@ -662,9 +646,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) {
deliveryCount += 1

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
require_NoError(t, err)
seq := metadata.Sequence.Stream

var expectedMsgData = []byte(fmt.Sprintf("%d", seq))
Expand Down Expand Up @@ -777,9 +759,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) {
chaosConsumerTestsSubject,
durableConsumerName,
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, err)
defer sub.Unsubscribe()

if chaosConsumerTestsDebug {
Expand Down Expand Up @@ -818,9 +798,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) {
deliveredCount += 1

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
require_NoError(t, err)

streamSeq := metadata.Sequence.Stream

Expand Down Expand Up @@ -864,27 +842,38 @@ const (
)

// Creates KV store (a.k.a. bucket).
func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int) {
func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int, allowDirectGet bool) {
t.Helper()

pubNc, pubJs := jsClientConnectCluster(t, c)
defer pubNc.Close()
nc, js := jsClientConnectCluster(t, c)
defer nc.Close()

config := nats.KeyValueConfig{
Bucket: chaosKvTestsBucketName,
Replicas: replicas,
Description: "Test bucket",
}

kvs, err := pubJs.CreateKeyValue(&config)
if err != nil {
t.Fatalf("Error creating bucket: %v", err)
kvs, err := js.CreateKeyValue(&config)
require_NoError(t, err)

if !allowDirectGet {
// Manually disable DirectGet on underlying stream, routing get request through the leader
streamInfo, err := js.StreamInfo("KV_" + chaosKvTestsBucketName)
require_NoError(t, err)

streamInfo.Config.AllowDirect = false
streamInfo, err = js.UpdateStream(&streamInfo.Config)
require_NoError(t, err)
require_False(t, streamInfo.Config.AllowDirect)

kvs, err = js.KeyValue(chaosKvTestsBucketName)
require_NoError(t, err)
}

status, err := kvs.Status()
if err != nil {
t.Fatalf("Error retrieving bucket status: %v", err)
}
require_NoError(t, err)

t.Logf("Bucket created: %s", status.Bucket())
}

Expand All @@ -897,12 +886,12 @@ func TestJetStreamChaosKvPutGet(t *testing.T) {
const clusterSize = 3
const replicas = 3
const key = "key"
const staleReadsOk = true // Set to false to check for violations of 'read committed' consistency
const allowDirectGet = true // Set to false to check for violations of 'read committed' consistency

c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize)
defer c.shutdown()

createBucketForKvChaosTest(t, c, replicas)
createBucketForKvChaosTest(t, c, replicas, allowDirectGet)

chaos := createClusterChaosMonkeyController(
t,
Expand All @@ -921,17 +910,12 @@ func TestJetStreamChaosKvPutGet(t *testing.T) {

// Create KV bucket
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

// Initialize the only key
firstRevision, err := kv.Create(key, []byte("INITIAL VALUE"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
} else if firstRevision != 1 {
t.Fatalf("Unexpected revision: %d", firstRevision)
}
require_NoError(t, err)
require_Equal(t, firstRevision, 1)

// Start chaos
chaos.start()
Expand Down Expand Up @@ -978,7 +962,7 @@ putGetLoop:

if putRevision > getRevision {
// Stale read, violates 'read committed' consistency criteria
if !staleReadsOk {
if !allowDirectGet {
t.Fatalf("PUT value %s (rev: %d) then read value %s (rev: %d)", putValue, putRevision, getValue, getRevision)
} else {
staleReadsCount += 1
Expand Down Expand Up @@ -1011,11 +995,12 @@ func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) {
const clusterSize = 3
const replicas = 3
const key = "key"
const allowDirectGet = true

c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize)
defer c.shutdown()

createBucketForKvChaosTest(t, c, replicas)
createBucketForKvChaosTest(t, c, replicas, allowDirectGet)

chaos := createClusterChaosMonkeyController(
t,
Expand All @@ -1033,17 +1018,12 @@ func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) {
defer nc.Close()

kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

// Initialize key value
firstRevision, err := kv.Create(key, []byte("INITIAL VALUE"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
} else if firstRevision != 1 {
t.Fatalf("Unexpected revision: %d", firstRevision)
}
require_NoError(t, err)
require_Equal(t, firstRevision, 1)

// Start chaos
chaos.start()
Expand Down Expand Up @@ -1129,21 +1109,20 @@ putGetLoop:
}

// Multiple clients updating a finite set of keys with CAS semantics.
// TODO check that revision is never lower than last one seen
// TODO check that KeyNotFound is never returned, as keys are initialized beforehand
func TestJetStreamChaosKvCAS(t *testing.T) {
const numOps = 10_000
const maxRetries = 50
const staleGetRetry = 4 * time.Second // (twice the max server downtime)
const retryDelay = 300 * time.Millisecond
const clusterSize = 3
const replicas = 3
const numKeys = 15
const numClients = 5
const allowDirectGet = true

c := createJetStreamClusterExplicit(t, chaosKvTestsClusterName, clusterSize)
defer c.shutdown()

createBucketForKvChaosTest(t, c, replicas)
createBucketForKvChaosTest(t, c, replicas, allowDirectGet)

chaos := createClusterChaosMonkeyController(
t,
Expand All @@ -1162,9 +1141,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {

// Create bucket
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

// Create set of keys and initialize them with dummy value
keys := make([]string, numKeys)
Expand All @@ -1173,9 +1150,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
keys[k] = key

_, err := kv.Create(key, []byte("Initial value"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
}
require_NoError(t, err)
}

wgStart := sync.WaitGroup{}
Expand All @@ -1189,6 +1164,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
successfulUpdates := 0
casRejectUpdates := 0
otherUpdateErrors := 0
staleGets := 0

// Map to track last known revision for each of the keys
knownRevisions := map[string]uint64{}
Expand Down Expand Up @@ -1224,28 +1200,34 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
} else if updateErr != nil && strings.Contains(fmt.Sprint(updateErr), "wrong last sequence") {
// CAS rejected update, learn current revision for this key
casRejectUpdates += 1

for r := 0; r <= maxRetries; r++ {
checkFor(t, staleGetRetry, retryDelay, func() error {
kve, getErr := kv.Get(key)
if getErr == nil {
currentRevision := kve.Revision()
if currentRevision < knownRevisions[key] {
// Revision number moved backward, this should never happen
t.Fatalf("Current revision for key %s is %d, which is lower than the last known revision %d", key, currentRevision, knownRevisions[key])

if getErr != nil {
// Get failed
if chaosKvTestsDebug {
t.Logf("Client %d Get('%s') failed: %s", clientId, key, err)
}
return err
}

knownRevisions[key] = currentRevision
if kve.Revision() <= knownRevisions[key] {
// A higher revision is known to exist, this value must be stale
staleGets += 1
if chaosKvTestsDebug {
t.Logf("Client %d learn key %s revision: %d", clientId, key, currentRevision)
t.Logf("Client %d Get('%s') returned stale revision: %d, older than known: %d", clientId, key, kve.Revision(), knownRevisions[key])
}
break
} else if r == maxRetries {
t.Fatalf("Failed to GET (retried %d times): %v", maxRetries, getErr)
} else {
time.Sleep(retryDelay)
return fmt.Errorf("get returned stale value")
}
}

// Revision is newer than known, worth storing for the next CAS attempt
knownRevisions[key] = kve.Revision()
if chaosKvTestsDebug {
t.Logf("Client %d learn key %s revision: %d", clientId, key, kve.Revision())
}
return nil
})
} else if errors.Is(updateErr, nats.ErrKeyNotFound) {
t.Fatalf("Client %d updating key '%s' got %s", clientId, key, updateErr)
} else {
// Other update error
otherUpdateErrors += 1
Expand All @@ -1255,7 +1237,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
time.Sleep(retryDelay)
}
}
t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d other errors", clientId, successfulUpdates, casRejectUpdates, otherUpdateErrors)
t.Logf("Client %d done, %d kv updates, %d CAS rejected, %d stale gets, %d other errors", clientId, successfulUpdates, casRejectUpdates, staleGets, otherUpdateErrors)
}

// Launch all clients
Expand All @@ -1264,9 +1246,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
defer cNc.Close()

cKv, err := cJs.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

wgStart.Add(1)
wgComplete.Add(1)
Expand Down