Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): publisher client #3303

Merged
merged 13 commits into from Dec 23, 2020
4 changes: 3 additions & 1 deletion pubsublite/go.mod
Expand Up @@ -4,10 +4,12 @@ go 1.11

require (
cloud.google.com/go v0.72.0
cloud.google.com/go/pubsub v1.8.1
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.3
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/tools v0.0.0-20201201064407-fd09bd90d85c // indirect
golang.org/x/tools v0.0.0-20201130220005-fd5f29369093 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.35.0
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e
google.golang.org/grpc v1.33.2
Expand Down
17 changes: 17 additions & 0 deletions pubsublite/go.sum
Expand Up @@ -13,6 +13,7 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV
cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.66.0/go.mod h1:dgqGAjKCDxyhGTtC9dAREQGUJpkceNm1yt590Qno0Ko=
cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww=
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
Expand All @@ -27,6 +28,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.8.1 h1:xuYU5DhMssHSUEF2KtTN84fPj+bbdCg5EU+l5JUN/lc=
cloud.google.com/go/pubsub v1.8.1/go.mod h1:l0z+j0Y73f7AzQfozIhekdyS9Sh5HdK/Bn77LD3lIMY=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down Expand Up @@ -100,6 +103,7 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200905233945-acf8798be1f7/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -197,6 +201,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
Expand Down Expand Up @@ -243,6 +248,7 @@ golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f h1:Fqb3ao1hUmOR3GkUOg/Y+BadLwykBIzs5q8Ez2SbHyc=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
Expand All @@ -258,6 +264,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down Expand Up @@ -298,8 +305,12 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200828161849-5deb26317202/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20200915173823-2db8f0ff891c/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201001104356-43ebab892c4c/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201130220005-fd5f29369093/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201064407-fd09bd90d85c h1:D/mVYXCk6gUcyr7WuGlAk/ShHqgARUXc2VQxo27Hmws=
golang.org/x/tools v0.0.0-20201201064407-fd09bd90d85c/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -323,6 +334,8 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.31.0/go.mod h1:CL+9IBCa2WWU6gRuBWaKqGWLFFwbEUXkfeMkHLQWYWo=
google.golang.org/api v0.32.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/api v0.35.0 h1:TBCmTTxUrRDA1iTctnK/fIeitxIZ+TQuaf0j29fmCGo=
google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down Expand Up @@ -361,7 +374,10 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200831141814-d751682dd103/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200914193844-75d14daec038/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201001141541-efaab9d3c4f7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e h1:wYR00/Ht+i/79g/gzhdehBgLIJCklKoc8Q/NebdzzpY=
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand All @@ -378,6 +394,7 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.2 h1:EQyQC3sa8M+p6Ulc8yy9SWSS2GVwyRc83gAbG8lrl4o=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
10 changes: 9 additions & 1 deletion pubsublite/internal/wire/errors.go
Expand Up @@ -13,13 +13,21 @@

package wire

import "errors"
import (
"errors"
"fmt"
)

// Errors exported from this package.
var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

// ErrOversizedMessage indicates that the user published a message over the
// allowed serialized byte size limit. It is wrapped in another error.
ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishMessageBytes (%d)", MaxPublishMessageBytes)

// ErrServiceUninitialized indicates that a service (e.g. publisher or
// subscriber) cannot perform an operation because it is uninitialized.
ErrServiceUninitialized = errors.New("pubsublite: service must be started")
Expand Down
3 changes: 2 additions & 1 deletion pubsublite/internal/wire/publish_batcher.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"cloud.google.com/go/pubsublite/common"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -110,7 +111,7 @@ func (b *publishMessageBatcher) AddMessage(msg *pb.PubSubMessage, onResult Publi
msgSize := proto.Size(msg)
switch {
case msgSize > MaxPublishMessageBytes:
return fmt.Errorf("pubsublite: serialized message size is %d bytes, maximum allowed size is MaxPublishMessageBytes (%d)", msgSize, MaxPublishMessageBytes)
return xerrors.Errorf("pubsublite: serialized message size is %d bytes: %w", msgSize, ErrOversizedMessage)
case msgSize > b.availableBufferBytes:
return ErrOverflow
}
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/ps/example_test.go
@@ -0,0 +1,74 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps_test

import (
"context"
"fmt"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)

func ExamplePublisherClient_Publish() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

func ExamplePublisherClient_Error() {
ctx := context.Background()
topic := pubsublite.TopicPath{Project: "project-id", Zone: "zone", TopicID: "topic-id"}
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == ps.ErrPublisherStopped {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}
167 changes: 167 additions & 0 deletions pubsublite/ps/publisher.go
@@ -0,0 +1,167 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package ps

import (
"context"
"sync"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/common"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow

// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishMessageBytes.
ErrOversizedMessage = bundler.ErrOversizedItem

// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped. PublisherClient.Error()
// returns the error that caused the publisher client to terminate (if any).
ErrPublisherStopped = wire.ErrServiceStopped
)

// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}

// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given
// topic.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher

// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}

// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish
// messages to a given topic.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) {
region, err := pubsublite.ZoneToRegion(topic.Zone)
if err != nil {
return nil, err
}

// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}

// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
// error ErrPublisherStopped. Error() returns the error that caused the
// publisher to terminate.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := pubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
pubsub.SetPublishResult(result, "", err)
p.setError(err)
p.Stop()
return result
}

p.wirePub.Publish(msgpb, func(pm *common.PublishMetadata, err error) {
err = translateError(err)
if pm != nil {
pubsub.SetPublishResult(result, pm.String(), err)
} else {
pubsub.SetPublishResult(result, "", err)
}
})
return result
}

// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}

// Error returns the error that caused the publisher client to terminate. It
// may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()

if p.err != nil {
return p.err
}
return p.wirePub.Error()
}

func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()

// Don't clobber original error.
if p.err == nil {
p.err = err
}
}

func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}

keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}