Skip to content

Commit

Permalink
test(pubsublite): integration tests for publish and subscribe (#3449)
Browse files Browse the repository at this point in the history
Integration tests for Pub/Sub Lite publisher and subscriber clients.
  • Loading branch information
tmdiep committed Jan 6, 2021
1 parent 221bfba commit 3830238
Show file tree
Hide file tree
Showing 6 changed files with 731 additions and 33 deletions.
22 changes: 2 additions & 20 deletions pubsublite/integration_test.go
Expand Up @@ -15,12 +15,12 @@ package pubsublite

import (
"context"
"math/rand"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/api/iterator"
Expand All @@ -33,16 +33,6 @@ const gibi = 1 << 30

var (
resourceIDs = uid.NewSpace("go-admin-test", nil)
rng *rand.Rand

// A random zone is selected for each integration test run.
supportedZones = []string{
"us-central1-a",
"us-central1-b",
"us-central1-c",
"europe-west1-b",
"europe-west1-d",
}

// The server returns topic and subscription configs with project numbers in
// resource paths. These will not match a project id specified for integration
Expand All @@ -53,10 +43,6 @@ var (
}
)

func init() {
rng = testutil.NewRand(time.Now())
}

func initIntegrationTest(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
Expand Down Expand Up @@ -101,16 +87,12 @@ func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient,
}
}

func randomLiteZone() string {
return supportedZones[rng.Intn(len(supportedZones))]
}

func TestIntegration_ResourceAdminOperations(t *testing.T) {
initIntegrationTest(t)

ctx := context.Background()
proj := testutil.ProjID()
zone := randomLiteZone()
zone := test.RandomLiteZone()
region, _ := ZoneToRegion(zone)
resourceID := resourceIDs.New()

Expand Down
18 changes: 18 additions & 0 deletions pubsublite/internal/test/msg_tracker.go
Expand Up @@ -88,3 +88,21 @@ func (mt *MsgTracker) Wait(timeout time.Duration) error {
return nil
}
}

// Empty returns true if there are no tracked messages remaining.
func (mt *MsgTracker) Empty() bool {
mt.mu.Lock()
defer mt.mu.Unlock()
return len(mt.msgMap) == 0
}

// Status returns an error if there are tracked messages remaining.
func (mt *MsgTracker) Status() error {
mt.mu.Lock()
defer mt.mu.Unlock()

if len(mt.msgMap) == 0 {
return nil
}
return fmt.Errorf("%d messages not received", len(mt.msgMap))
}
10 changes: 5 additions & 5 deletions pubsublite/internal/test/msg_validators.go
Expand Up @@ -69,7 +69,8 @@ func parseMsgIndex(msg string) int64 {
//
// Note: a normal scenario resulting in unordered messages is when the Publish
// stream breaks while there are in-flight batches, which are resent upon
// stream reconnect.
// stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a
// test.
func (or *OrderingReceiver) Receive(data, key string) error {
or.mu.Lock()
defer or.mu.Unlock()
Expand All @@ -79,11 +80,10 @@ func (or *OrderingReceiver) Receive(data, key string) error {
return fmt.Errorf("failed to parse index from message: %q", data)
}

// Verify non-decreasing ordering. Allow duplicates, which can be verified
// with DuplicateMsgDetector.
// Verify increasing ordering.
lastIdx, exists := or.received[key]
if exists && idx < lastIdx {
return fmt.Errorf("message ordering failed for key %s, expected message idx >= %d, got %d", key, lastIdx, idx)
if exists && idx <= lastIdx {
return fmt.Errorf("message ordering failed for key %s, expected message idx > %d, got %d", key, lastIdx, idx)
}
or.received[key] = idx
return nil
Expand Down
10 changes: 2 additions & 8 deletions pubsublite/internal/test/msg_validators_test.go
Expand Up @@ -46,13 +46,10 @@ func TestOrderingReceiver(t *testing.T) {
if gotErr := receiver.Receive("foo/3", "foo"); gotErr != nil {
t.Errorf("OrderingReceiver.Receive() got err: %v", gotErr)
}
if gotErr := receiver.Receive("foo/3", "foo"); gotErr != nil {
t.Errorf("OrderingReceiver.Receive() got err: %v", gotErr)
}
if gotErr := receiver.Receive("foo/4", "foo"); gotErr != nil {
t.Errorf("OrderingReceiver.Receive() got err: %v", gotErr)
}
if gotErr, wantMsg := receiver.Receive("foo/3", "foo"), "expected message idx >= 4, got 3"; !ErrorHasMsg(gotErr, wantMsg) {
if gotErr, wantMsg := receiver.Receive("foo/4", "foo"), "expected message idx > 4, got 4"; !ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("OrderingReceiver.Receive() got err: %v, want msg: %q", gotErr, wantMsg)
}
})
Expand All @@ -61,10 +58,7 @@ func TestOrderingReceiver(t *testing.T) {
if gotErr := receiver.Receive("bar/30", "bar"); gotErr != nil {
t.Errorf("OrderingReceiver.Receive() got err: %v", gotErr)
}
if gotErr := receiver.Receive("bar/30", "bar"); gotErr != nil {
t.Errorf("OrderingReceiver.Receive() got err: %v", gotErr)
}
if gotErr, wantMsg := receiver.Receive("bar/29", "bar"), "expected message idx >= 30, got 29"; !ErrorHasMsg(gotErr, wantMsg) {
if gotErr, wantMsg := receiver.Receive("bar/29", "bar"), "expected message idx > 30, got 29"; !ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("OrderingReceiver.Receive() got err: %v, want msg: %q", gotErr, wantMsg)
}
})
Expand Down
48 changes: 48 additions & 0 deletions pubsublite/internal/test/zones.go
@@ -0,0 +1,48 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package test

import (
"math/rand"
"time"

"cloud.google.com/go/internal/testutil"
)

var (
supportedZones = []string{
"us-central1-a",
"us-central1-b",
"us-central1-c",
"us-east1-b",
"us-east1-c",
"europe-west1-b",
"europe-west1-d",
"asia-east1-a",
"asia-east1-c",
"asia-southeast1-a",
"asia-southeast1-c",
}

rng *rand.Rand
)

func init() {
rng = testutil.NewRand(time.Now())
}

// RandomLiteZone chooses a random Pub/Sub Lite zone for integration tests.
func RandomLiteZone() string {
return supportedZones[rng.Intn(len(supportedZones))]
}

0 comments on commit 3830238

Please sign in to comment.