From 4982eeb32ebe85de211ae09d13fdaf6140d9e115 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 25 Nov 2020 13:13:11 +1100 Subject: [PATCH] feat(pubsublite): single partition publisher implementation (#3225) Implements the publisher for a single partition. A Bundler is used to batch messages. --- pubsublite/common/publish_data.go | 46 ++ pubsublite/common/publish_data_test.go | 68 +++ pubsublite/go.mod | 1 + pubsublite/go.sum | 1 + pubsublite/internal/wire/publish_batcher.go | 181 +++++++ .../internal/wire/publish_batcher_test.go | 344 ++++++++++++++ pubsublite/internal/wire/publisher.go | 259 ++++++++++ pubsublite/internal/wire/publisher_test.go | 447 ++++++++++++++++++ 8 files changed, 1347 insertions(+) create mode 100644 pubsublite/common/publish_data.go create mode 100644 pubsublite/common/publish_data_test.go create mode 100644 pubsublite/internal/wire/publish_batcher.go create mode 100644 pubsublite/internal/wire/publish_batcher_test.go create mode 100644 pubsublite/internal/wire/publisher.go create mode 100644 pubsublite/internal/wire/publisher_test.go diff --git a/pubsublite/common/publish_data.go b/pubsublite/common/publish_data.go new file mode 100644 index 00000000000..a4658160d18 --- /dev/null +++ b/pubsublite/common/publish_data.go @@ -0,0 +1,46 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package common + +import ( + "fmt" + "strconv" + "strings" +) + +// PublishMetadata holds the results of a published message. +type PublishMetadata struct { + Partition int + Offset int64 +} + +func (pm *PublishMetadata) String() string { + return fmt.Sprintf("%d:%d", pm.Partition, pm.Offset) +} + +// ParsePublishMetadata converts a string obtained from PublishMetadata.String() +// back to PublishMetadata. +func ParsePublishMetadata(input string) (*PublishMetadata, error) { + parts := strings.Split(input, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input) + } + + partition, pErr := strconv.ParseInt(parts[0], 10, 64) + offset, oErr := strconv.ParseInt(parts[1], 10, 64) + if pErr != nil || oErr != nil { + return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input) + } + return &PublishMetadata{Partition: int(partition), Offset: offset}, nil +} diff --git a/pubsublite/common/publish_data_test.go b/pubsublite/common/publish_data_test.go new file mode 100644 index 00000000000..89151374584 --- /dev/null +++ b/pubsublite/common/publish_data_test.go @@ -0,0 +1,68 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package common + +import ( + "testing" + + "cloud.google.com/go/internal/testutil" +) + +func TestPublishMetadataStringEncoding(t *testing.T) { + for _, tc := range []struct { + desc string + input string + want *PublishMetadata + wantErr bool + }{ + { + desc: "valid: zero", + input: "0:0", + want: &PublishMetadata{Partition: 0, Offset: 0}, + }, + { + desc: "valid: non-zero", + input: "3:1234", + want: &PublishMetadata{Partition: 3, Offset: 1234}, + }, + { + desc: "invalid: number", + input: "1234", + wantErr: true, + }, + { + desc: "invalid: partition", + input: "p:1234", + wantErr: true, + }, + { + desc: "invalid: offset", + input: "10:9offset", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + got, gotErr := ParsePublishMetadata(tc.input) + if !testutil.Equal(got, tc.want) || (gotErr != nil) != tc.wantErr { + t.Errorf("ParsePublishMetadata(%q): got (%v, %v), want (%v, err=%v)", tc.input, got, gotErr, tc.want, tc.wantErr) + } + + if tc.want != nil { + if got := tc.want.String(); got != tc.input { + t.Errorf("PublishMetadata(%v).String(): got %q, want: %q", tc.want, got, tc.input) + } + } + }) + } +} diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 09727ba1986..1466aa04337 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -11,4 +11,5 @@ require ( google.golang.org/api v0.35.0 google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.25.0 ) diff --git a/pubsublite/go.sum b/pubsublite/go.sum index bd119db5e1d..8f60186a9ef 100644 --- a/pubsublite/go.sum +++ b/pubsublite/go.sum @@ -216,6 +216,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go new file mode 100644 index 00000000000..8a32accf248 --- /dev/null +++ b/pubsublite/internal/wire/publish_batcher.go @@ -0,0 +1,181 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "container/list" + "errors" + "fmt" + + "cloud.google.com/go/pubsublite/common" + "google.golang.org/api/support/bundler" + "google.golang.org/protobuf/proto" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +var errPublishQueueEmpty = errors.New("pubsublite: received publish response from server with no batches in flight") + +// PublishResultFunc receives the result of a publish. +type PublishResultFunc func(*common.PublishMetadata, error) + +// messageHolder stores a message to be published, with associated metadata. +type messageHolder struct { + msg *pb.PubSubMessage + size int + onResult PublishResultFunc +} + +// publishBatch holds messages that are published in the same +// MessagePublishRequest. +type publishBatch struct { + msgHolders []*messageHolder +} + +func (b *publishBatch) ToPublishRequest() *pb.PublishRequest { + msgs := make([]*pb.PubSubMessage, len(b.msgHolders)) + for i, holder := range b.msgHolders { + msgs[i] = holder.msg + } + + return &pb.PublishRequest{ + RequestType: &pb.PublishRequest_MessagePublishRequest{ + MessagePublishRequest: &pb.MessagePublishRequest{ + Messages: msgs, + }, + }, + } +} + +// publishMessageBatcher manages batching of messages, as well as in-flight +// published batches. It is owned by singlePartitionPublisher. +type publishMessageBatcher struct { + partition int + // Used to batch messages. Setting HandlerLimit=1 results in ordered batches. + msgBundler *bundler.Bundler + // FIFO queue of in-flight batches of published messages. Results have not yet + // been received from the server. + publishQueue *list.List // Value = *publishBatch + // Used for error checking, to ensure the server returns increasing offsets + // for published messages. + minExpectedNextOffset int64 + // The available buffer size is managed by this batcher rather than the + // Bundler due to the in-flight publish queue. + availableBufferBytes int +} + +func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBatch func(*publishBatch)) *publishMessageBatcher { + batcher := &publishMessageBatcher{ + partition: partition, + publishQueue: list.New(), + availableBufferBytes: settings.BufferedByteLimit, + } + + msgBundler := bundler.NewBundler(&messageHolder{}, func(item interface{}) { + msgs, _ := item.([]*messageHolder) + if len(msgs) == 0 { + // This should not occur. + return + } + // The publishMessageBatcher is accessed by the singlePartitionPublisher and + // Bundler handler func (called in a goroutine). + // singlePartitionPublisher.onNewBatch() receives the new batch from the + // Bundler, which calls publishMessageBatcher.AddBatch(). Only the + // publisher's mutex is required. + onNewBatch(&publishBatch{msgHolders: msgs}) + }) + msgBundler.DelayThreshold = settings.DelayThreshold + msgBundler.BundleCountThreshold = settings.CountThreshold + msgBundler.BundleByteThreshold = settings.ByteThreshold // Soft limit + msgBundler.BundleByteLimit = MaxPublishRequestBytes // Hard limit + msgBundler.HandlerLimit = 1 // Handle batches serially for ordering + msgBundler.BufferedByteLimit = settings.BufferedByteLimit // Actually handled in the batcher + + batcher.msgBundler = msgBundler + return batcher +} + +func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error { + msgSize := proto.Size(msg) + switch { + case msgSize > MaxPublishMessageBytes: + return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes) + case msgSize > b.availableBufferBytes: + return ErrOverflow + } + + holder := &messageHolder{msg: msg, size: msgSize, onResult: onResult} + if err := b.msgBundler.Add(holder, msgSize); err != nil { + // As we've already checked the size of the message and overflow, the + // bundler should not return an error. + return fmt.Errorf("pubsublite: failed to batch message: %v", err) + } + b.availableBufferBytes -= msgSize + return nil +} + +func (b *publishMessageBatcher) AddBatch(batch *publishBatch) { + b.publishQueue.PushBack(batch) +} + +func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error { + frontElem := b.publishQueue.Front() + if frontElem == nil { + return errPublishQueueEmpty + } + if firstOffset < b.minExpectedNextOffset { + return fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset) + } + + batch, _ := frontElem.Value.(*publishBatch) + for i, msgHolder := range batch.msgHolders { + // Messages are ordered, so the offset of each message is firstOffset + i. + pm := &common.PublishMetadata{Partition: b.partition, Offset: firstOffset + int64(i)} + msgHolder.onResult(pm, nil) + b.availableBufferBytes += msgHolder.size + } + + b.minExpectedNextOffset = firstOffset + int64(len(batch.msgHolders)) + b.publishQueue.Remove(frontElem) + return nil +} + +func (b *publishMessageBatcher) OnPermanentError(err error) { + for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() { + if batch, ok := elem.Value.(*publishBatch); ok { + for _, msgHolder := range batch.msgHolders { + msgHolder.onResult(nil, err) + } + } + } + b.publishQueue.Init() +} + +func (b *publishMessageBatcher) InFlightBatches() []*publishBatch { + var batches []*publishBatch + for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() { + if batch, ok := elem.Value.(*publishBatch); ok { + batches = append(batches, batch) + } + } + return batches +} + +func (b *publishMessageBatcher) Flush() { + b.msgBundler.Flush() +} + +func (b *publishMessageBatcher) InFlightBatchesEmpty() bool { + return b.publishQueue.Len() == 0 +} diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go new file mode 100644 index 00000000000..3e12fe2bef1 --- /dev/null +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -0,0 +1,344 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "bytes" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsublite/common" + "cloud.google.com/go/pubsublite/internal/test" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// testPublishResultReceiver provides convenience methods for receiving and +// validating Publish results. +type testPublishResultReceiver struct { + done chan struct{} + msg string + t *testing.T + got *common.PublishMetadata + gotErr error +} + +func newTestPublishResultReceiver(t *testing.T, msg *pb.PubSubMessage) *testPublishResultReceiver { + return &testPublishResultReceiver{ + t: t, + msg: string(msg.Data), + done: make(chan struct{}), + } +} + +func (r *testPublishResultReceiver) set(pm *common.PublishMetadata, err error) { + r.got = pm + r.gotErr = err + close(r.done) +} + +func (r *testPublishResultReceiver) wait() bool { + select { + case <-time.After(serviceTestWaitTimeout): + r.t.Errorf("Publish(%q) result not available within %v", r.msg, serviceTestWaitTimeout) + return false + case <-r.done: + return true + } +} + +func (r *testPublishResultReceiver) ValidateResult(wantPartition int, wantOffset int64) { + if !r.wait() { + return + } + if r.gotErr != nil { + r.t.Errorf("Publish(%q) error: (%v), want: partition=%d,offset=%d", r.msg, r.gotErr, wantPartition, wantOffset) + } else if r.got.Partition != wantPartition || r.got.Offset != wantOffset { + r.t.Errorf("Publish(%q) got: partition=%d,offset=%d, want: partition=%d,offset=%d", r.msg, r.got.Partition, r.got.Offset, wantPartition, wantOffset) + } +} + +func (r *testPublishResultReceiver) ValidateError(wantErr error) { + if !r.wait() { + return + } + if !test.ErrorEqual(r.gotErr, wantErr) { + r.t.Errorf("Publish(%q) error: (%v), want: (%v)", r.msg, r.gotErr, wantErr) + } +} + +func (r *testPublishResultReceiver) ValidateErrorCode(wantCode codes.Code) { + if !r.wait() { + return + } + if !test.ErrorHasCode(r.gotErr, wantCode) { + r.t.Errorf("Publish(%q) error: (%v), want code: %v", r.msg, r.gotErr, wantCode) + } +} + +func (r *testPublishResultReceiver) ValidateErrorMsg(wantStr string) { + if !r.wait() { + return + } + if !test.ErrorHasMsg(r.gotErr, wantStr) { + r.t.Errorf("Publish(%q) error: (%v), want msg: %q", r.msg, r.gotErr, wantStr) + } +} + +// testPublishBatchReceiver receives message batches from the Bundler. +type testPublishBatchReceiver struct { + t *testing.T + batchesC chan *publishBatch +} + +func newTestPublishBatchReceiver(t *testing.T) *testPublishBatchReceiver { + return &testPublishBatchReceiver{ + t: t, + batchesC: make(chan *publishBatch, 3), + } +} + +func (br *testPublishBatchReceiver) onNewBatch(batch *publishBatch) { + br.batchesC <- batch +} + +func (br *testPublishBatchReceiver) ValidateBatches(want []*publishBatch) { + var got []*publishBatch + for count := 0; count < len(want); count++ { + select { + case <-time.After(serviceTestWaitTimeout): + br.t.Errorf("Publish batches count: got %d, want %d", count, len(want)) + case batch := <-br.batchesC: + got = append(got, batch) + } + } + + if !testutil.Equal(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})) { + br.t.Errorf("Batches got: %v\nwant: %v", got, want) + } +} + +func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver) *messageHolder { + h := &messageHolder{ + msg: msg, + size: proto.Size(msg), + } + if len(receiver) > 0 { + h.onResult = receiver[0].set + } + return h +} + +func TestPublishBatcherAddMessage(t *testing.T) { + const initAvailableBytes = MaxPublishMessageBytes + 1 + settings := DefaultPublishSettings + settings.BufferedByteLimit = initAvailableBytes + + receiver := newTestPublishBatchReceiver(t) + batcher := newPublishMessageBatcher(&settings, 0, receiver.onNewBatch) + + if got, want := batcher.availableBufferBytes, initAvailableBytes; got != want { + t.Errorf("availableBufferBytes: got %d, want %d", got, want) + } + + t.Run("small messages", func(t *testing.T) { + msg1 := &pb.PubSubMessage{Data: []byte("foo")} + msgSize1 := proto.Size(msg1) + if err := batcher.AddMessage(msg1, nil); err != nil { + t.Errorf("AddMessage(%v) got err: %v", msg1, err) + } + if got, want := batcher.availableBufferBytes, initAvailableBytes-msgSize1; got != want { + t.Errorf("availableBufferBytes: got %d, want %d", got, want) + } + + msg2 := &pb.PubSubMessage{Data: []byte("hello world")} + msgSize2 := proto.Size(msg2) + if err := batcher.AddMessage(msg2, nil); err != nil { + t.Errorf("AddMessage(%v) got err: %v", msg2, err) + } + if got, want := batcher.availableBufferBytes, initAvailableBytes-msgSize1-msgSize2; got != want { + t.Errorf("availableBufferBytes: got %d, want %d", got, want) + } + }) + + t.Run("oversized message", func(t *testing.T) { + msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)} + if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { + t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg) + } + }) + + t.Run("buffer overflow", func(t *testing.T) { + msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, batcher.availableBufferBytes)} + if gotErr, wantErr := batcher.AddMessage(msg, nil), ErrOverflow; !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("AddMessage(%v) got err: %v, want err: %v", msg, gotErr, wantErr) + } + }) +} + +func TestPublishBatcherBundlerCountThreshold(t *testing.T) { + settings := DefaultPublishSettings + settings.DelayThreshold = time.Minute // Batching delay disabled + settings.CountThreshold = 2 + + // Batch 1 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + wantBatch1 := &publishBatch{ + []*messageHolder{makeMsgHolder(msg1), makeMsgHolder(msg2)}, + } + + // Batch 2 + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + msg4 := &pb.PubSubMessage{Data: []byte{'4'}} + wantBatch2 := &publishBatch{ + []*messageHolder{makeMsgHolder(msg3), makeMsgHolder(msg4)}, + } + + // Batch 3 + msg5 := &pb.PubSubMessage{Data: []byte{'5'}} + wantBatch3 := &publishBatch{ + []*messageHolder{makeMsgHolder(msg5)}, + } + + receiver := newTestPublishBatchReceiver(t) + batcher := newPublishMessageBatcher(&settings, 0, receiver.onNewBatch) + + msgs := []*pb.PubSubMessage{msg1, msg2, msg3, msg4, msg5} + for _, msg := range msgs { + if err := batcher.AddMessage(msg, nil); err != nil { + t.Errorf("AddMessage(%v) got err: %v", msg, err) + } + } + batcher.Flush() + + receiver.ValidateBatches([]*publishBatch{wantBatch1, wantBatch2, wantBatch3}) +} + +func TestPublishBatcherBundlerBatchingDelay(t *testing.T) { + settings := DefaultPublishSettings + settings.DelayThreshold = 5 * time.Millisecond + + // Batch 1 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + wantBatch1 := &publishBatch{ + []*messageHolder{makeMsgHolder(msg1)}, + } + + // Batch 2 + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + wantBatch2 := &publishBatch{ + []*messageHolder{makeMsgHolder(msg2)}, + } + + receiver := newTestPublishBatchReceiver(t) + batcher := newPublishMessageBatcher(&settings, 0, receiver.onNewBatch) + + if err := batcher.AddMessage(msg1, nil); err != nil { + t.Errorf("AddMessage(%v) got err: %v", msg1, err) + } + time.Sleep(settings.DelayThreshold * 2) + if err := batcher.AddMessage(msg2, nil); err != nil { + t.Errorf("AddMessage(%v) got err: %v", msg2, err) + } + batcher.Flush() + + receiver.ValidateBatches([]*publishBatch{wantBatch1, wantBatch2}) +} + +func TestPublishBatcherBundlerOnPermanentError(t *testing.T) { + receiver := newTestPublishBatchReceiver(t) + batcher := newPublishMessageBatcher(&DefaultPublishSettings, 0, receiver.onNewBatch) + + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + pubResult1 := newTestPublishResultReceiver(t, msg1) + pubResult2 := newTestPublishResultReceiver(t, msg2) + batcher.AddBatch(&publishBatch{ + []*messageHolder{ + makeMsgHolder(msg1, pubResult1), + makeMsgHolder(msg2, pubResult2), + }, + }) + + wantErr := status.Error(codes.FailedPrecondition, "failed") + batcher.OnPermanentError(wantErr) + pubResult1.ValidateError(wantErr) + pubResult2.ValidateError(wantErr) +} + +func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) { + const partition = 2 + receiver := newTestPublishBatchReceiver(t) + batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch) + + t.Run("empty in-flight batches", func(t *testing.T) { + if gotErr, wantErr := batcher.OnPublishResponse(0), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("OnPublishResponse() got err: %v, want err: %v", gotErr, wantErr) + } + }) + + t.Run("set publish results", func(t *testing.T) { + // Batch 1 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + + // Batch 2 + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + pubResult1 := newTestPublishResultReceiver(t, msg1) + pubResult2 := newTestPublishResultReceiver(t, msg2) + pubResult3 := newTestPublishResultReceiver(t, msg3) + + batcher.AddBatch(&publishBatch{ + []*messageHolder{ + makeMsgHolder(msg1, pubResult1), + makeMsgHolder(msg2, pubResult2), + }, + }) + batcher.AddBatch(&publishBatch{ + []*messageHolder{ + makeMsgHolder(msg3, pubResult3), + }, + }) + if err := batcher.OnPublishResponse(70); err != nil { + t.Errorf("OnPublishResponse() got err: %v", err) + } + if err := batcher.OnPublishResponse(80); err != nil { + t.Errorf("OnPublishResponse() got err: %v", err) + } + + pubResult1.ValidateResult(partition, 70) + pubResult2.ValidateResult(partition, 71) + pubResult3.ValidateResult(partition, 80) + }) + + t.Run("inconsistent offset", func(t *testing.T) { + msg := &pb.PubSubMessage{Data: []byte{'4'}} + pubResult := newTestPublishResultReceiver(t, msg) + batcher.AddBatch(&publishBatch{ + []*messageHolder{ + makeMsgHolder(msg, pubResult), + }, + }) + + if gotErr, wantMsg := batcher.OnPublishResponse(80), "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) { + t.Errorf("OnPublishResponse() got err: %v, want err msg: %q", gotErr, wantMsg) + } + }) +} diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go new file mode 100644 index 00000000000..8c5e9adbed1 --- /dev/null +++ b/pubsublite/internal/wire/publisher.go @@ -0,0 +1,259 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "context" + "errors" + "reflect" + + "google.golang.org/grpc" + + vkit "cloud.google.com/go/pubsublite/apiv1" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +var ( + errInvalidInitialPubResponse = errors.New("pubsublite: first response from server was not an initial response for publish") + errInvalidMsgPubResponse = errors.New("pubsublite: received invalid publish response from server") +) + +// singlePartitionPublisher publishes messages to a single topic partition. +// +// Life of a successfully published message: +// - Publish() receives the message from the user. +// - It is added to `batcher.msgBundler`, which performs batching in accordance +// with user-configured PublishSettings. +// - onNewBatch() receives new message batches from the bundler. The batch is +// added to `batcher.publishQueue` (in-flight batches) and sent to the publish +// stream, if connected. If the stream is currently reconnecting, the entire +// queue is resent to the stream immediately after it has reconnected, in +// onStreamStatusChange(). +// - onResponse() receives the first cursor offset for the first batch in +// `batcher.publishQueue`. It assigns the cursor offsets for each message and +// releases the publish results to the user. +// +// See comments for unsafeInitiateShutdown() for error scenarios. +type singlePartitionPublisher struct { + // Immutable after creation. + pubClient *vkit.PublisherClient + topic topicPartition + initialReq *pb.PublishRequest + + // Fields below must be guarded with mu. + stream *retryableStream + batcher *publishMessageBatcher + enableSendToStream bool + + abstractService +} + +// singlePartitionPublisherFactory creates instances of singlePartitionPublisher +// for given partition numbers. +type singlePartitionPublisherFactory struct { + ctx context.Context + pubClient *vkit.PublisherClient + settings PublishSettings + topicPath string +} + +func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher { + pp := &singlePartitionPublisher{ + pubClient: f.pubClient, + topic: topicPartition{Path: f.topicPath, Partition: partition}, + initialReq: &pb.PublishRequest{ + RequestType: &pb.PublishRequest_InitialRequest{ + InitialRequest: &pb.InitialPublishRequest{ + Topic: f.topicPath, + Partition: int64(partition), + }, + }, + }, + } + pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch) + pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{})) + return pp +} + +// Start attempts to establish a publish stream connection. +func (pp *singlePartitionPublisher) Start() { + pp.mu.Lock() + defer pp.mu.Unlock() + + if pp.unsafeUpdateStatus(serviceStarting, nil) { + pp.stream.Start() + } +} + +// Stop initiates shutdown of the publisher. All pending messages are flushed. +func (pp *singlePartitionPublisher) Stop() { + pp.mu.Lock() + defer pp.mu.Unlock() + pp.unsafeInitiateShutdown(serviceTerminating, nil) +} + +// Publish a pub/sub message. +func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) { + pp.mu.Lock() + defer pp.mu.Unlock() + + processMessage := func() error { + if err := pp.unsafeCheckServiceStatus(); err != nil { + return err + } + if err := pp.batcher.AddMessage(msg, onResult); err != nil { + return err + } + return nil + } + + // If the new message cannot be published, flush pending messages and then + // terminate the stream once results are received. + if err := processMessage(); err != nil { + pp.unsafeInitiateShutdown(serviceTerminating, err) + onResult(nil, err) + } +} + +func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientStream, error) { + return pp.pubClient.Publish(addTopicRoutingMetadata(ctx, pp.topic)) +} + +func (pp *singlePartitionPublisher) initialRequest() (interface{}, bool) { + return pp.initialReq, true +} + +func (pp *singlePartitionPublisher) validateInitialResponse(response interface{}) error { + pubResponse, _ := response.(*pb.PublishResponse) + if pubResponse.GetInitialResponse() == nil { + return errInvalidInitialPubResponse + } + return nil +} + +func (pp *singlePartitionPublisher) onStreamStatusChange(status streamStatus) { + pp.mu.Lock() + defer pp.mu.Unlock() + + switch status { + case streamReconnecting: + // Prevent onNewBatch() from sending any new batches to the stream. + pp.enableSendToStream = false + + case streamConnected: + pp.unsafeUpdateStatus(serviceActive, nil) + + // To ensure messages are sent in order, we should resend in-flight batches + // to the stream immediately after reconnecting, before any new batches. + batches := pp.batcher.InFlightBatches() + for _, batch := range batches { + if !pp.stream.Send(batch.ToPublishRequest()) { + return + } + } + pp.enableSendToStream = true + + case streamTerminated: + pp.unsafeInitiateShutdown(serviceTerminated, pp.stream.Error()) + } +} + +func (pp *singlePartitionPublisher) onNewBatch(batch *publishBatch) { + pp.mu.Lock() + defer pp.mu.Unlock() + + pp.batcher.AddBatch(batch) + if pp.enableSendToStream { + // Note: if the underlying stream is reconnecting or Send() fails, all + // in-flight batches will be sent to the stream once the connection has been + // re-established. Thus the return value is ignored. + pp.stream.Send(batch.ToPublishRequest()) + } +} + +func (pp *singlePartitionPublisher) onResponse(response interface{}) { + pp.mu.Lock() + defer pp.mu.Unlock() + + processResponse := func() error { + pubResponse, _ := response.(*pb.PublishResponse) + if pubResponse.GetMessageResponse() == nil { + return errInvalidMsgPubResponse + } + firstOffset := pubResponse.GetMessageResponse().GetStartCursor().GetOffset() + if err := pp.batcher.OnPublishResponse(firstOffset); err != nil { + return err + } + pp.unsafeCheckDone() + return nil + } + if err := processResponse(); err != nil { + pp.unsafeInitiateShutdown(serviceTerminated, err) + } +} + +// unsafeInitiateShutdown must be provided a target serviceStatus, which must be +// one of: +// * serviceTerminating: attempts to successfully publish all pending messages +// before terminating the publisher. Occurs when: +// - The user calls Stop(). +// - A new message fails preconditions. This should block the publish of +// subsequent messages to ensure ordering, but all pending messages should +// be flushed. +// * serviceTerminated: immediately terminates the publisher and errors all +// in-flight batches and pending messages in the bundler. Occurs when: +// - The publish stream terminates with a non-retryable error. +// - An inconsistency is detected in the server's publish responses. Assume +// there is a bug on the server and terminate the publisher, as correct +// processing of messages cannot be guaranteed. +// +// Expected to be called with singlePartitionPublisher.mu held. +func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) { + if !pp.unsafeUpdateStatus(targetStatus, err) { + return + } + + // Close the stream if this is an immediate shutdown. Otherwise leave it open + // to send pending messages. + if targetStatus == serviceTerminated { + pp.enableSendToStream = false + pp.stream.Stop() + } + + // Bundler.Flush() blocks and invokes onNewBatch(), which acquires the mutex, + // so it cannot be held here. + // Updating the publisher status above prevents any new messages from being + // added to the Bundler after flush. + pp.mu.Unlock() + pp.batcher.Flush() + pp.mu.Lock() + + // If flushing pending messages, close the stream if there's nothing left to + // publish. + if targetStatus == serviceTerminating { + pp.unsafeCheckDone() + return + } + + // For immediate shutdown set the error message for all pending messages. + pp.batcher.OnPermanentError(err) +} + +// unsafeCheckDone closes the stream once all pending messages have been +// published during shutdown. +func (pp *singlePartitionPublisher) unsafeCheckDone() { + if pp.status == serviceTerminating && pp.batcher.InFlightBatchesEmpty() { + pp.stream.Stop() + } +} diff --git a/pubsublite/internal/wire/publisher_test.go b/pubsublite/internal/wire/publisher_test.go new file mode 100644 index 00000000000..7c5e2450fb8 --- /dev/null +++ b/pubsublite/internal/wire/publisher_test.go @@ -0,0 +1,447 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "bytes" + "context" + "testing" + "time" + + "cloud.google.com/go/pubsublite/internal/test" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +func testPublishSettings() PublishSettings { + settings := DefaultPublishSettings + // Send 1 message at a time to make tests deterministic. + settings.CountThreshold = 1 + // Send messages with minimal delay to speed up tests. + settings.DelayThreshold = time.Millisecond + settings.Timeout = 5 * time.Second + return settings +} + +// testPartitionPublisher wraps a singlePartitionPublisher for ease of testing. +type testPartitionPublisher struct { + pub *singlePartitionPublisher + serviceTestProxy +} + +func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher { + ctx := context.Background() + pubClient, err := newPublisherClient(ctx, "ignored", testClientOpts...) + if err != nil { + t.Fatal(err) + } + + pubFactory := &singlePartitionPublisherFactory{ + ctx: ctx, + pubClient: pubClient, + settings: settings, + topicPath: topic.Path, + } + tp := &testPartitionPublisher{ + pub: pubFactory.New(topic.Partition), + } + tp.initAndStart(t, tp.pub, "Publisher") + return tp +} + +func (tp *testPartitionPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver { + result := newTestPublishResultReceiver(tp.t, msg) + tp.pub.Publish(msg, result.set) + return result +} + +func (tp *testPartitionPublisher) FinalError() (err error) { + err = tp.serviceTestProxy.FinalError() + + // Verify that the stream has terminated. + if gotStatus, wantStatus := tp.pub.stream.Status(), streamTerminated; gotStatus != wantStatus { + tp.t.Errorf("%s retryableStream status: %v, want: %v", tp.name, gotStatus, wantStatus) + } + if tp.pub.stream.currentStream() != nil { + tp.t.Errorf("%s client stream should be nil", tp.name) + } + return +} + +func TestSinglePartitionPublisherInvalidInitialResponse(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), msgPubResp(0), nil) // Publish response instead of initial response + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + + wantErr := errInvalidInitialPubResponse + if gotErr := pub.StartError(); !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr) + } + if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) + } +} + +func TestSinglePartitionPublisherSpuriousPublishResponse(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + barrier := stream.PushWithBarrier(nil, msgPubResp(0), nil) // Publish response with no messages + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + // Send after startup to ensure the test is deterministic. + barrier.Release() + if gotErr, wantErr := pub.FinalError(), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) + } +} + +func TestSinglePartitionPublisherBatching(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + settings := testPublishSettings() + settings.DelayThreshold = time.Minute // Batching delay disabled, tested elsewhere + settings.CountThreshold = 3 + + // Batch 1 + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + + // Batch 2 + msg4 := &pb.PubSubMessage{Data: []byte{'3'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + stream.Push(msgPubReq(msg1, msg2, msg3), msgPubResp(0), nil) + stream.Push(msgPubReq(msg4), msgPubResp(33), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, settings) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result3 := pub.Publish(msg3) + result4 := pub.Publish(msg4) + // Stop flushes pending messages. + pub.Stop() + + result1.ValidateResult(topic.Partition, 0) + result2.ValidateResult(topic.Partition, 1) + result3.ValidateResult(topic.Partition, 2) + result4.ValidateResult(topic.Partition, 33) + + if gotErr := pub.FinalError(); gotErr != nil { + t.Errorf("Publisher final err: (%v), want: ", gotErr) + } +} + +func TestSinglePartitionPublisherResendMessages(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + + verifiers := test.NewVerifiers(t) + + // Simulate a transient error that results in a reconnect before any server + // publish responses are received. + stream1 := test.NewRPCVerifier(t) + stream1.Push(initPubReq(topic), initPubResp(), nil) + stream1.Push(msgPubReq(msg1), nil, nil) + stream1.Push(msgPubReq(msg2), nil, status.Error(codes.Aborted, "server aborted")) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream1) + + // The publisher should resend all in-flight batches to the second stream. + stream2 := test.NewRPCVerifier(t) + stream2.Push(initPubReq(topic), initPubResp(), nil) + stream2.Push(msgPubReq(msg1), msgPubResp(0), nil) + stream2.Push(msgPubReq(msg2), msgPubResp(1), nil) + stream2.Push(msgPubReq(msg3), msgPubResp(2), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream2) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + defer pub.StopVerifyNoError() + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result1.ValidateResult(topic.Partition, 0) + result2.ValidateResult(topic.Partition, 1) + + result3 := pub.Publish(msg3) + result3.ValidateResult(topic.Partition, 2) +} + +func TestSinglePartitionPublisherPublishPermanentError(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + permError := status.Error(codes.NotFound, "topic deleted") + + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + stream.Push(msgPubReq(msg1), nil, permError) // Permanent error terminates publisher + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result1.ValidateError(permError) + result2.ValidateError(permError) + + // This message arrives after the publisher has already stopped, so its error + // message is ErrServiceStopped. + result3 := pub.Publish(msg3) + result3.ValidateError(ErrServiceStopped) + + if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, permError) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, permError) + } +} + +func TestSinglePartitionPublisherBufferOverflow(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + settings := testPublishSettings() + settings.BufferedByteLimit = 15 + + msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)} + msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} // Causes overflow + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(0), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, settings) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + // Overflow is detected, which terminates the publisher, but previous messages + // are flushed. + result2 := pub.Publish(msg2) + // Delay the server response for the first Publish to verify that it is + // allowed to complete. + barrier.Release() + // This message arrives after the publisher has already stopped, so its error + // message is ErrServiceStopped. + result3 := pub.Publish(msg3) + + result1.ValidateResult(topic.Partition, 0) + result2.ValidateError(ErrOverflow) + result3.ValidateError(ErrServiceStopped) + + if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, ErrOverflow) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, ErrOverflow) + } +} + +func TestSinglePartitionPublisherBufferRefill(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + settings := testPublishSettings() + settings.BufferedByteLimit = 15 + + msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)} + msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + stream.Push(msgPubReq(msg1), msgPubResp(0), nil) + stream.Push(msgPubReq(msg2), msgPubResp(1), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, settings) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + result1.ValidateResult(topic.Partition, 0) + + // No overflow because msg2 is sent after the response for msg1 is received. + result2 := pub.Publish(msg2) + result2.ValidateResult(topic.Partition, 1) + + pub.StopVerifyNoError() +} + +func TestSinglePartitionPublisherInvalidCursorOffsets(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(4), nil) + // The server returns an inconsistent cursor offset for msg2, which causes the + // publisher client to fail permanently. + stream.Push(msgPubReq(msg2), msgPubResp(4), nil) + stream.Push(msgPubReq(msg3), msgPubResp(5), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result3 := pub.Publish(msg3) + barrier.Release() + + result1.ValidateResult(topic.Partition, 4) + + // msg2 and subsequent messages are errored. + wantMsg := "server returned publish response with inconsistent start offset" + result2.ValidateErrorMsg(wantMsg) + result3.ValidateErrorMsg(wantMsg) + if gotErr := pub.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) { + t.Errorf("Publisher final err: (%v), want msg: %q", gotErr, wantMsg) + } +} + +func TestSinglePartitionPublisherInvalidServerPublishResponse(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + msg := &pb.PubSubMessage{Data: []byte{'1'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + // Server sends duplicate initial publish response, which causes the publisher + // client to fail permanently. + stream.Push(msgPubReq(msg), initPubResp(), nil) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result := pub.Publish(msg) + + wantErr := errInvalidMsgPubResponse + result.ValidateError(wantErr) + if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) + } +} + +func TestSinglePartitionPublisherStopFlushesMessages(t *testing.T) { + topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} + finalErr := status.Error(codes.FailedPrecondition, "invalid message") + + msg1 := &pb.PubSubMessage{Data: []byte{'1'}} + msg2 := &pb.PubSubMessage{Data: []byte{'2'}} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}} + msg4 := &pb.PubSubMessage{Data: []byte{'4'}} + + verifiers := test.NewVerifiers(t) + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topic), initPubResp(), nil) + barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(5), nil) + stream.Push(msgPubReq(msg2), msgPubResp(6), nil) + stream.Push(msgPubReq(msg3), nil, finalErr) + verifiers.AddPublishStream(topic.Path, topic.Partition, stream) + + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) + if gotErr := pub.StartError(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } + + result1 := pub.Publish(msg1) + result2 := pub.Publish(msg2) + result3 := pub.Publish(msg3) + pub.Stop() + barrier.Release() + result4 := pub.Publish(msg4) + + // First 2 messages should be allowed to complete. + result1.ValidateResult(topic.Partition, 5) + result2.ValidateResult(topic.Partition, 6) + // msg3 failed with a server error, which should result in the publisher + // terminating with an error. + result3.ValidateError(finalErr) + // msg4 was sent after the user called Stop(), so should fail immediately with + // ErrServiceStopped. + result4.ValidateError(ErrServiceStopped) + + if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, finalErr) { + t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, finalErr) + } +}