Skip to content

Commit

Permalink
Update test to check error using require_* helpers
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Primi marco@synadia.com
  • Loading branch information
mprimi committed Nov 17, 2023
1 parent 19c8414 commit 136687e
Showing 1 changed file with 29 additions and 77 deletions.
106 changes: 29 additions & 77 deletions server/jetstream_chaos_test.go
Expand Up @@ -192,14 +192,10 @@ func jsClientConnectCluster(t testing.TB, c *cluster) (*nats.Conn, nats.JetStrea
connectURL := strings.Join(serverConnectURLs, ",")

nc, err := nats.Connect(connectURL)
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
require_NoError(t, err)

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Failed to init JetStream context: %s", err)
}
require_NoError(t, err)

return nc, js
}
Expand Down Expand Up @@ -290,18 +286,14 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes
Subjects: []string{chaosConsumerTestsSubject},
Replicas: replicas,
})
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}
require_NoError(t, err)

ackFutures := make([]nats.PubAckFuture, 0, publishBatchSize)

for i := 1; i <= numMessages; i++ {
message := []byte(fmt.Sprintf("%d", i))
pubAckFuture, err := pubJs.PublishAsync(chaosConsumerTestsSubject, message, nats.ExpectLastSequence(uint64(i-1)))
if err != nil {
t.Fatalf("Publish error: %s", err)
}
require_NoError(t, err)
ackFutures = append(ackFutures, pubAckFuture)

if (i > 0 && i%publishBatchSize == 0) || i == numMessages {
Expand All @@ -312,7 +304,7 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes
case <-pubAckFuture.Ok():
// Noop
case pubAckErr := <-pubAckFuture.Err():
t.Fatalf("Error publishing: %s", pubAckErr)
require_NoError(t, pubAckErr)
case <-time.After(30 * time.Second):
t.Fatalf("Timeout verifying pubAck for message: %s", pubAckFuture.Msg().Data)
}
Expand Down Expand Up @@ -362,9 +354,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) {
chaosConsumerTestsSubject,
nats.OrderedConsumer(),
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, err)
defer sub.Unsubscribe()

if chaosConsumerTestsDebug {
Expand Down Expand Up @@ -394,10 +384,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) {
}

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}

require_NoError(t, err)
if metadata.Sequence.Stream != uint64(i) {
t.Fatalf("Expecting stream sequence %d, got %d instead", i, metadata.Sequence.Stream)
}
Expand Down Expand Up @@ -454,9 +441,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) {
deliveryCount += 1

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
require_NoError(t, err)
seq := metadata.Sequence.Stream

var expectedMsgData = []byte(fmt.Sprintf("%d", seq))
Expand Down Expand Up @@ -505,9 +490,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) {

subOpts := []nats.SubOpt{}
sub, err := subJs.Subscribe(chaosConsumerTestsSubject, handleMsg, subOpts...)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, err)
defer sub.Unsubscribe()

chaos.start()
Expand Down Expand Up @@ -663,9 +646,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) {
deliveryCount += 1

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
require_NoError(t, err)
seq := metadata.Sequence.Stream

var expectedMsgData = []byte(fmt.Sprintf("%d", seq))
Expand Down Expand Up @@ -778,9 +759,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) {
chaosConsumerTestsSubject,
durableConsumerName,
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, err)
defer sub.Unsubscribe()

if chaosConsumerTestsDebug {
Expand Down Expand Up @@ -819,9 +798,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) {
deliveredCount += 1

metadata, err := msg.Metadata()
if err != nil {
t.Fatalf("Failed to get message metadata: %v", err)
}
require_NoError(t, err)

streamSeq := metadata.Sequence.Stream

Expand Down Expand Up @@ -878,34 +855,25 @@ func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int, allowDir
}

kvs, err := js.CreateKeyValue(&config)
if err != nil {
t.Fatalf("Error creating bucket: %v", err)
}
require_NoError(t, err)

if !allowDirectGet {
// Manually disable DirectGet on underlying stream, routing get request through the leader
streamInfo, err := js.StreamInfo("KV_" + chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get stream info: %s", err)
}
require_NoError(t, err)

streamInfo.Config.AllowDirect = false
streamInfo, err = js.UpdateStream(&streamInfo.Config)
if err != nil {
t.Fatalf("Failed to update KV stream: %s", err)
} else if streamInfo.Config.AllowDirect {
t.Fatalf("Failed to disable 'DirectGet'")
}
require_NoError(t, err)
require_False(t, streamInfo.Config.AllowDirect)

kvs, err = js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed reopen KV handle: %s", err)
}
require_NoError(t, err)
}

status, err := kvs.Status()
if err != nil {
t.Fatalf("Error retrieving bucket status: %v", err)
}
require_NoError(t, err)

t.Logf("Bucket created: %s", status.Bucket())
}

Expand Down Expand Up @@ -942,17 +910,12 @@ func TestJetStreamChaosKvPutGet(t *testing.T) {

// Create KV bucket
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

// Initialize the only key
firstRevision, err := kv.Create(key, []byte("INITIAL VALUE"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
} else if firstRevision != 1 {
t.Fatalf("Unexpected revision: %d", firstRevision)
}
require_NoError(t, err)
require_Equal(t, firstRevision, 1)

// Start chaos
chaos.start()
Expand Down Expand Up @@ -1055,17 +1018,12 @@ func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) {
defer nc.Close()

kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

// Initialize key value
firstRevision, err := kv.Create(key, []byte("INITIAL VALUE"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
} else if firstRevision != 1 {
t.Fatalf("Unexpected revision: %d", firstRevision)
}
require_NoError(t, err)
require_Equal(t, firstRevision, 1)

// Start chaos
chaos.start()
Expand Down Expand Up @@ -1183,9 +1141,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {

// Create bucket
kv, err := js.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

// Create set of keys and initialize them with dummy value
keys := make([]string, numKeys)
Expand All @@ -1194,9 +1150,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
keys[k] = key

_, err := kv.Create(key, []byte("Initial value"))
if err != nil {
t.Fatalf("Failed to create key: %v", err)
}
require_NoError(t, err)
}

wgStart := sync.WaitGroup{}
Expand Down Expand Up @@ -1292,9 +1246,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) {
defer cNc.Close()

cKv, err := cJs.KeyValue(chaosKvTestsBucketName)
if err != nil {
t.Fatalf("Failed to get KV store: %v", err)
}
require_NoError(t, err)

wgStart.Add(1)
wgComplete.Add(1)
Expand Down

0 comments on commit 136687e

Please sign in to comment.