Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): single partition publisher implementation #3225

Merged
merged 8 commits into from Nov 25, 2020
46 changes: 46 additions & 0 deletions pubsublite/common/publish_data.go
@@ -0,0 +1,46 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package common

import (
"fmt"
"strconv"
"strings"
)

// PublishMetadata holds the results of a published message.
type PublishMetadata struct {
Partition int
Offset int64
}

func (pm *PublishMetadata) String() string {
return fmt.Sprintf("%d:%d", pm.Partition, pm.Offset)
}

// ParsePublishMetadata converts a string obtained from PublishMetadata.String()
// back to PublishMetadata.
func ParsePublishMetadata(input string) (*PublishMetadata, error) {
parts := strings.Split(input, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input)
}

partition, pErr := strconv.ParseInt(parts[0], 10, 64)
offset, oErr := strconv.ParseInt(parts[1], 10, 64)
if pErr != nil || oErr != nil {
return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input)
}
return &PublishMetadata{Partition: int(partition), Offset: offset}, nil
}
68 changes: 68 additions & 0 deletions pubsublite/common/publish_data_test.go
@@ -0,0 +1,68 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package common

import (
"testing"

"cloud.google.com/go/internal/testutil"
)

func TestPublishMetadataStringEncoding(t *testing.T) {
for _, tc := range []struct {
desc string
input string
want *PublishMetadata
wantErr bool
}{
{
desc: "valid: zero",
input: "0:0",
want: &PublishMetadata{Partition: 0, Offset: 0},
},
{
desc: "valid: non-zero",
input: "3:1234",
want: &PublishMetadata{Partition: 3, Offset: 1234},
},
{
desc: "invalid: number",
input: "1234",
wantErr: true,
},
{
desc: "invalid: partition",
input: "p:1234",
wantErr: true,
},
{
desc: "invalid: offset",
input: "10:9offset",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, gotErr := ParsePublishMetadata(tc.input)
if !testutil.Equal(got, tc.want) || (gotErr != nil) != tc.wantErr {
t.Errorf("ParsePublishMetadata(%q): got (%v, %v), want (%v, err=%v)", tc.input, got, gotErr, tc.want, tc.wantErr)
}

if tc.want != nil {
if got := tc.want.String(); got != tc.input {
t.Errorf("PublishMetadata(%v).String(): got %q, want: %q", tc.want, got, tc.input)
}
}
})
}
}
1 change: 1 addition & 0 deletions pubsublite/go.sum
Expand Up @@ -216,6 +216,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
181 changes: 181 additions & 0 deletions pubsublite/internal/wire/publish_batcher.go
@@ -0,0 +1,181 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"container/list"
"errors"
"fmt"

"cloud.google.com/go/pubsublite/common"
"github.com/golang/protobuf/proto"
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
"google.golang.org/api/support/bundler"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var errPublishQueueEmpty = errors.New("pubsublite: received publish response from server with no batches in flight")

// PublishResultFunc receives the result of a publish.
type PublishResultFunc func(*common.PublishMetadata, error)

// messageHolder stores a message to be published, with associated metadata.
type messageHolder struct {
msg *pb.PubSubMessage
size int
onResult PublishResultFunc
}

// publishBatch holds messages that are published in the same
// MessagePublishRequest.
type publishBatch struct {
msgHolders []*messageHolder
}

func (b *publishBatch) ToPublishRequest() *pb.PublishRequest {
msgs := make([]*pb.PubSubMessage, len(b.msgHolders))
for i, holder := range b.msgHolders {
msgs[i] = holder.msg
}

return &pb.PublishRequest{
RequestType: &pb.PublishRequest_MessagePublishRequest{
MessagePublishRequest: &pb.MessagePublishRequest{
Messages: msgs,
},
},
}
}

// publishMessageBatcher manages batching of messages, as well as in-flight
// published batches. It is owned by singlePartitionPublisher.
type publishMessageBatcher struct {
partition int
// Used to batch messages. Setting HandlerLimit=1 results in ordered batches.
msgBundler *bundler.Bundler
// FIFO queue of in-flight batches of published messages. Results have not yet
// been received from the server.
publishQueue *list.List // Value = *publishBatch
// Used for error checking, to ensure the server returns increasing offsets
// for published messages.
minExpectedNextOffset int64
// The available buffer size is managed by this batcher rather than the
// Bundler due to the in-flight publish queue.
availableBufferBytes int
}

func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBatch func(*publishBatch)) *publishMessageBatcher {
batcher := &publishMessageBatcher{
partition: partition,
publishQueue: list.New(),
availableBufferBytes: settings.BufferedByteLimit,
}

msgBundler := bundler.NewBundler(&messageHolder{}, func(item interface{}) {
msgs, _ := item.([]*messageHolder)
if len(msgs) == 0 {
// This should not occur.
return
}
// The publishMessageBatcher is accessed by the singlePartitionPublisher and
// Bundler handler func (called in a goroutine).
// singlePartitionPublisher.onNewBatch() receives the new batch from the
// Bundler, which calls publishMessageBatcher.AddBatch(). Only the
// publisher's mutex is required.
onNewBatch(&publishBatch{msgHolders: msgs})
})
msgBundler.DelayThreshold = settings.DelayThreshold
msgBundler.BundleCountThreshold = settings.CountThreshold
msgBundler.BundleByteThreshold = settings.ByteThreshold // Soft limit
msgBundler.BundleByteLimit = MaxPublishRequestBytes // Hard limit
msgBundler.HandlerLimit = 1 // Handle batches serially for ordering
msgBundler.BufferedByteLimit = settings.BufferedByteLimit * 2 // Effectively disabled, handled in the batcher
hongalex marked this conversation as resolved.
Show resolved Hide resolved

batcher.msgBundler = msgBundler
return batcher
}

func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error {
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes)
case msgSize > b.availableBufferBytes:
return ErrOverflow
}

holder := &messageHolder{msg: msg, size: msgSize, onResult: onResult}
if err := b.msgBundler.Add(holder, msgSize); err != nil {
// As we've already checked the size of the message and overflow, the
// bundler should not return an error.
return fmt.Errorf("pubsublite: failed to batch message: %v", err)
}
b.availableBufferBytes -= msgSize
return nil
}

func (b *publishMessageBatcher) AddBatch(batch *publishBatch) {
b.publishQueue.PushBack(batch)
}

func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error {
frontElem := b.publishQueue.Front()
if frontElem == nil {
return errPublishQueueEmpty
}
if firstOffset < b.minExpectedNextOffset {
return fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset)
}

batch, _ := frontElem.Value.(*publishBatch)
for i, msgHolder := range batch.msgHolders {
// Messages are ordered, so the offset of each message is firstOffset + i.
pm := &common.PublishMetadata{Partition: b.partition, Offset: firstOffset + int64(i)}
msgHolder.onResult(pm, nil)
b.availableBufferBytes += msgHolder.size
}

b.minExpectedNextOffset = firstOffset + int64(len(batch.msgHolders))
b.publishQueue.Remove(frontElem)
return nil
}

func (b *publishMessageBatcher) OnPermanentError(err error) {
for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() {
if batch, ok := elem.Value.(*publishBatch); ok {
for _, msgHolder := range batch.msgHolders {
msgHolder.onResult(nil, err)
}
}
}
b.publishQueue.Init()
}

func (b *publishMessageBatcher) InFlightBatches() []*publishBatch {
var batches []*publishBatch
for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() {
if batch, ok := elem.Value.(*publishBatch); ok {
batches = append(batches, batch)
}
}
return batches
}

func (b *publishMessageBatcher) Flush() {
b.msgBundler.Flush()
}

func (b *publishMessageBatcher) InFlightBatchesEmpty() bool {
return b.publishQueue.Len() == 0
}