Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): publisher client #3303

Merged
merged 13 commits into from Dec 23, 2020
3 changes: 2 additions & 1 deletion pubsublite/go.mod
Expand Up @@ -4,11 +4,12 @@ go 1.11

require (
cloud.google.com/go v0.74.0
cloud.google.com/go/pubsub v1.9.1
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
google.golang.org/grpc v1.34.0
Expand Down
12 changes: 12 additions & 0 deletions pubsublite/go.sum
Expand Up @@ -15,6 +15,7 @@ cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOY
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww=
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go v0.73.0/go.mod h1:BkDh9dFvGjCitVw03TNjKbBxXNKULXXIq6orU6HrJ4Q=
cloud.google.com/go v0.74.0 h1:kpgPA77kSSbjSs+fWHkPTxQ6J5Z2Qkruo5jfXEkHxNQ=
cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
Expand All @@ -29,6 +30,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.9.1 h1:hXEte3a/Brd+Tl9ecEkHH3ow9wpnOTZ28lSOszYj6Cg=
cloud.google.com/go/pubsub v1.9.1/go.mod h1:7QTUeCiy+P1dVPO8hHVbZSHDfibbgm1gbKyOVYnqb8g=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down Expand Up @@ -105,6 +108,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201117184057-ae444373da19/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
Expand Down Expand Up @@ -212,6 +216,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjN
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 h1:lwlPPsmjDKK0J6eG6xDWd5XPehI0R024zxjDnw3esPA=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand All @@ -235,6 +240,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -279,6 +286,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down Expand Up @@ -322,6 +330,8 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2 h1:vEtypaVub6UvKkiXZ2xx9QIvp9TL7sI7xp7vdi2kezA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11 h1:9j/upNXDRpADUw2RpUfJ7E7GHtfhDih62kX6JM8vs2c=
golang.org/x/tools v0.0.0-20201211025543-abf6a1d87e11/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
Expand Down Expand Up @@ -391,6 +401,8 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201203001206-6486ece9c497/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a h1:GnJAhasbD8HiT8DZMvsEx3QLVy/X0icq/MGr0MqRJ2M=
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down
10 changes: 9 additions & 1 deletion pubsublite/internal/wire/errors.go
Expand Up @@ -13,13 +13,21 @@

package wire

import "errors"
import (
"errors"
"fmt"
)

// Errors exported from this package.
var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

// ErrOversizedMessage indicates that the user published a message over the
// allowed serialized byte size limit. It is wrapped in another error.
ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)

// ErrServiceUninitialized indicates that a service (e.g. publisher or
// subscriber) cannot perform an operation because it is uninitialized.
ErrServiceUninitialized = errors.New("pubsublite: service must be started")
Expand Down
5 changes: 3 additions & 2 deletions pubsublite/internal/wire/publish_batcher.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -109,8 +110,8 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat
func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult PublishResultFunc) error {
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes)
case msgSize > MaxPublishRequestBytes:
return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage)
case msgSize > b.availableBufferBytes:
return ErrOverflow
}
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/internal/wire/publish_batcher_test.go
Expand Up @@ -146,7 +146,7 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver
}

func TestPublishBatcherAddMessage(t *testing.T) {
const initAvailableBytes = MaxPublishMessageBytes + 1
const initAvailableBytes = MaxPublishRequestBytes
settings := DefaultPublishSettings
settings.BufferedByteLimit = initAvailableBytes

Expand Down Expand Up @@ -178,8 +178,8 @@ func TestPublishBatcherAddMessage(t *testing.T) {
})

t.Run("oversized message", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishMessageBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishMessageBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg)
}
})
Expand Down
18 changes: 13 additions & 5 deletions pubsublite/internal/wire/settings.go
Expand Up @@ -24,16 +24,18 @@ const (
// batched in a single publish request.
MaxPublishRequestCount = 1000

// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes. Must be lower
// than the gRPC limit of 4 MiB.
MaxPublishRequestBytes = 3500000
MaxPublishRequestBytes int = 3.5 * 1024 * 1024
)

// FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
type FrameworkType string

// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.
const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"

// PublishSettings control the batching of published messages. These settings
// apply per partition.
type PublishSettings struct {
Expand Down Expand Up @@ -70,6 +72,9 @@ type PublishSettings struct {
// The polling interval to watch for topic partition count updates. Set to 0
// to disable polling if the number of partitions will never update.
ConfigPollPeriod time.Duration

// The user-facing API type.
Framework FrameworkType
}

// DefaultPublishSettings holds the default values for PublishSettings.
Expand Down Expand Up @@ -132,6 +137,9 @@ type ReceiveSettings struct {
// specified, the client will use the partition assignment service to
// determine which partitions it should connect to.
Partitions []int

// The user-facing API type.
Framework FrameworkType
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/ps/example_test.go
@@ -0,0 +1,74 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps_test

import (
"context"
"fmt"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)

func ExamplePublisherClient_Publish() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

func ExamplePublisherClient_Error() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == ps.ErrPublisherStopped {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}