Skip to content

Commit

Permalink
We decided to remove the 1 MiB per-message publish limit
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Dec 9, 2020
1 parent 007441b commit de1b1f3
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/errors.go
Expand Up @@ -26,7 +26,7 @@ var (

// ErrOversizedMessage indicates that the user published a message over the
// allowed serialized byte size limit. It is wrapped in another error.
ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishMessageBytes (%d)", MaxPublishMessageBytes)
ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)

// ErrServiceUninitialized indicates that a service (e.g. publisher or
// subscriber) cannot perform an operation because it is uninitialized.
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/publish_batcher.go
Expand Up @@ -110,7 +110,7 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat
func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error {
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
case msgSize > MaxPublishRequestBytes:
return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage)
case msgSize > b.availableBufferBytes:
return ErrOverflow
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/internal/wire/publish_batcher_test.go
Expand Up @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver
}

func TestPublishBatcherAddMessage(t *testing.T) {
const initAvailableBytes = MaxPublishMessageBytes + 1
const initAvailableBytes = MaxPublishRequestBytes
settings := DefaultPublishSettings
settings.BufferedByteLimit = initAvailableBytes

Expand Down Expand Up @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) {
})

t.Run("oversized message", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg)
}
})
Expand Down
18 changes: 13 additions & 5 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -24,16 +24,18 @@ const (
// batched in a single publish request.
MaxPublishRequestCount = 1000

// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
MaxPublishRequestBytes = 3500000
MaxPublishRequestBytes int = 3.5 * 1024 * 1024
)

// FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
type FrameworkType string

// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.
const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"

// PublishSettings control the batching of published messages. These settings
// apply per partition.
type PublishSettings struct {
Expand Down Expand Up @@ -70,6 +72,9 @@ type PublishSettings struct {
// The polling interval to watch for topic partition count updates. Set to 0
// to disable polling if the number of partitions will never update.
ConfigPollPeriod time.Duration

// The user-facing API type.
Framework FrameworkType
}

// DefaultPublishSettings holds the default values for PublishSettings.
Expand Down Expand Up @@ -132,6 +137,9 @@ type ReceiveSettings struct {
// specified, the client will use the partition assignment service to
// determine which partitions it should connect to.
Partitions []int

// The user-facing API type.
Framework FrameworkType
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/ps/publisher.go
Expand Up @@ -33,7 +33,7 @@ var (
ErrOverflow = bundler.ErrOverflow

// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishMessageBytes.
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage = bundler.ErrOversizedItem

// ErrPublisherStopped is set for a PublishResult when a message cannot be
Expand All @@ -55,7 +55,7 @@ func translateError(err error) error {
}

// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given
// topic.
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
Expand Down

0 comments on commit de1b1f3

Please sign in to comment.