forked from googleapis/google-cloud-go
/
publisher.go
167 lines (146 loc) · 5.09 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package ps
import (
"context"
"sync"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/common"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow
// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishMessageBytes.
ErrOversizedMessage = bundler.ErrOversizedItem
// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped. PublisherClient.Error()
// returns the error that caused the publisher client to terminate (if any).
ErrPublisherStopped = wire.ErrServiceStopped
)
// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}
// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given
// topic.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher
// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}
// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish
// messages to a given topic.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) {
region, err := pubsublite.ZoneToRegion(topic.Zone)
if err != nil {
return nil, err
}
// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}
// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
// error ErrPublisherStopped. Error() returns the error that caused the
// publisher to terminate.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := pubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
pubsub.SetPublishResult(result, "", err)
p.setError(err)
p.Stop()
return result
}
p.wirePub.Publish(msgpb, func(pm *common.PublishMetadata, err error) {
err = translateError(err)
if pm != nil {
pubsub.SetPublishResult(result, pm.String(), err)
} else {
pubsub.SetPublishResult(result, "", err)
}
})
return result
}
// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}
// Error returns the error that caused the publisher client to terminate. It
// may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.err != nil {
return p.err
}
return p.wirePub.Error()
}
func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()
// Don't clobber original error.
if p.err == nil {
p.err = err
}
}
func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}
keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}