Skip to content

Commit

Permalink
feat(pubsublite): single partition publisher implementation (#3225)
Browse files Browse the repository at this point in the history
Implements the publisher for a single partition. A Bundler is used to batch messages.
  • Loading branch information
tmdiep committed Nov 25, 2020
1 parent 200e261 commit 4982eeb
Show file tree
Hide file tree
Showing 8 changed files with 1,347 additions and 0 deletions.
46 changes: 46 additions & 0 deletions pubsublite/common/publish_data.go
@@ -0,0 +1,46 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package common

import (
"fmt"
"strconv"
"strings"
)

// PublishMetadata holds the results of a published message.
type PublishMetadata struct {
Partition int
Offset int64
}

func (pm *PublishMetadata) String() string {
return fmt.Sprintf("%d:%d", pm.Partition, pm.Offset)
}

// ParsePublishMetadata converts a string obtained from PublishMetadata.String()
// back to PublishMetadata.
func ParsePublishMetadata(input string) (*PublishMetadata, error) {
parts := strings.Split(input, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input)
}

partition, pErr := strconv.ParseInt(parts[0], 10, 64)
offset, oErr := strconv.ParseInt(parts[1], 10, 64)
if pErr != nil || oErr != nil {
return nil, fmt.Errorf("pubsublite: invalid encoded PublishMetadata %q", input)
}
return &PublishMetadata{Partition: int(partition), Offset: offset}, nil
}
68 changes: 68 additions & 0 deletions pubsublite/common/publish_data_test.go
@@ -0,0 +1,68 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package common

import (
"testing"

"cloud.google.com/go/internal/testutil"
)

func TestPublishMetadataStringEncoding(t *testing.T) {
for _, tc := range []struct {
desc string
input string
want *PublishMetadata
wantErr bool
}{
{
desc: "valid: zero",
input: "0:0",
want: &PublishMetadata{Partition: 0, Offset: 0},
},
{
desc: "valid: non-zero",
input: "3:1234",
want: &PublishMetadata{Partition: 3, Offset: 1234},
},
{
desc: "invalid: number",
input: "1234",
wantErr: true,
},
{
desc: "invalid: partition",
input: "p:1234",
wantErr: true,
},
{
desc: "invalid: offset",
input: "10:9offset",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, gotErr := ParsePublishMetadata(tc.input)
if !testutil.Equal(got, tc.want) || (gotErr != nil) != tc.wantErr {
t.Errorf("ParsePublishMetadata(%q): got (%v, %v), want (%v, err=%v)", tc.input, got, gotErr, tc.want, tc.wantErr)
}

if tc.want != nil {
if got := tc.want.String(); got != tc.input {
t.Errorf("PublishMetadata(%v).String(): got %q, want: %q", tc.want, got, tc.input)
}
}
})
}
}
1 change: 1 addition & 0 deletions pubsublite/go.mod
Expand Up @@ -11,4 +11,5 @@ require (
google.golang.org/api v0.35.0
google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.25.0
)
1 change: 1 addition & 0 deletions pubsublite/go.sum
Expand Up @@ -216,6 +216,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
181 changes: 181 additions & 0 deletions pubsublite/internal/wire/publish_batcher.go
@@ -0,0 +1,181 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"container/list"
"errors"
"fmt"

"cloud.google.com/go/pubsublite/common"
"google.golang.org/api/support/bundler"
"google.golang.org/protobuf/proto"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var errPublishQueueEmpty = errors.New("pubsublite: received publish response from server with no batches in flight")

// PublishResultFunc receives the result of a publish.
type PublishResultFunc func(*common.PublishMetadata, error)

// messageHolder stores a message to be published, with associated metadata.
type messageHolder struct {
msg *pb.PubSubMessage
size int
onResult PublishResultFunc
}

// publishBatch holds messages that are published in the same
// MessagePublishRequest.
type publishBatch struct {
msgHolders []*messageHolder
}

func (b *publishBatch) ToPublishRequest() *pb.PublishRequest {
msgs := make([]*pb.PubSubMessage, len(b.msgHolders))
for i, holder := range b.msgHolders {
msgs[i] = holder.msg
}

return &pb.PublishRequest{
RequestType: &pb.PublishRequest_MessagePublishRequest{
MessagePublishRequest: &pb.MessagePublishRequest{
Messages: msgs,
},
},
}
}

// publishMessageBatcher manages batching of messages, as well as in-flight
// published batches. It is owned by singlePartitionPublisher.
type publishMessageBatcher struct {
partition int
// Used to batch messages. Setting HandlerLimit=1 results in ordered batches.
msgBundler *bundler.Bundler
// FIFO queue of in-flight batches of published messages. Results have not yet
// been received from the server.
publishQueue *list.List // Value = *publishBatch
// Used for error checking, to ensure the server returns increasing offsets
// for published messages.
minExpectedNextOffset int64
// The available buffer size is managed by this batcher rather than the
// Bundler due to the in-flight publish queue.
availableBufferBytes int
}

func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBatch func(*publishBatch)) *publishMessageBatcher {
batcher := &publishMessageBatcher{
partition: partition,
publishQueue: list.New(),
availableBufferBytes: settings.BufferedByteLimit,
}

msgBundler := bundler.NewBundler(&messageHolder{}, func(item interface{}) {
msgs, _ := item.([]*messageHolder)
if len(msgs) == 0 {
// This should not occur.
return
}
// The publishMessageBatcher is accessed by the singlePartitionPublisher and
// Bundler handler func (called in a goroutine).
// singlePartitionPublisher.onNewBatch() receives the new batch from the
// Bundler, which calls publishMessageBatcher.AddBatch(). Only the
// publisher's mutex is required.
onNewBatch(&publishBatch{msgHolders: msgs})
})
msgBundler.DelayThreshold = settings.DelayThreshold
msgBundler.BundleCountThreshold = settings.CountThreshold
msgBundler.BundleByteThreshold = settings.ByteThreshold // Soft limit
msgBundler.BundleByteLimit = MaxPublishRequestBytes // Hard limit
msgBundler.HandlerLimit = 1 // Handle batches serially for ordering
msgBundler.BufferedByteLimit = settings.BufferedByteLimit // Actually handled in the batcher

batcher.msgBundler = msgBundler
return batcher
}

func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error {
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes)
case msgSize > b.availableBufferBytes:
return ErrOverflow
}

holder := &messageHolder{msg: msg, size: msgSize, onResult: onResult}
if err := b.msgBundler.Add(holder, msgSize); err != nil {
// As we've already checked the size of the message and overflow, the
// bundler should not return an error.
return fmt.Errorf("pubsublite: failed to batch message: %v", err)
}
b.availableBufferBytes -= msgSize
return nil
}

func (b *publishMessageBatcher) AddBatch(batch *publishBatch) {
b.publishQueue.PushBack(batch)
}

func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error {
frontElem := b.publishQueue.Front()
if frontElem == nil {
return errPublishQueueEmpty
}
if firstOffset < b.minExpectedNextOffset {
return fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset)
}

batch, _ := frontElem.Value.(*publishBatch)
for i, msgHolder := range batch.msgHolders {
// Messages are ordered, so the offset of each message is firstOffset + i.
pm := &common.PublishMetadata{Partition: b.partition, Offset: firstOffset + int64(i)}
msgHolder.onResult(pm, nil)
b.availableBufferBytes += msgHolder.size
}

b.minExpectedNextOffset = firstOffset + int64(len(batch.msgHolders))
b.publishQueue.Remove(frontElem)
return nil
}

func (b *publishMessageBatcher) OnPermanentError(err error) {
for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() {
if batch, ok := elem.Value.(*publishBatch); ok {
for _, msgHolder := range batch.msgHolders {
msgHolder.onResult(nil, err)
}
}
}
b.publishQueue.Init()
}

func (b *publishMessageBatcher) InFlightBatches() []*publishBatch {
var batches []*publishBatch
for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() {
if batch, ok := elem.Value.(*publishBatch); ok {
batches = append(batches, batch)
}
}
return batches
}

func (b *publishMessageBatcher) Flush() {
b.msgBundler.Flush()
}

func (b *publishMessageBatcher) InFlightBatchesEmpty() bool {
return b.publishQueue.Len() == 0
}

0 comments on commit 4982eeb

Please sign in to comment.