/
publisher.go
170 lines (149 loc) · 5.98 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pscompat
import (
"context"
"sync"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"google.golang.org/api/option"
ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
// Use errors.Is for comparing errors.
ErrOverflow = wire.ErrOverflow
// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes. Use errors.Is for comparing errors.
ErrOversizedMessage = wire.ErrOversizedMessage
// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped or is in the process of
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any). Use errors.Is for comparing errors.
ErrPublisherStopped = wire.ErrServiceStopped
)
// PublisherClient is a Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher
// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}
// NewPublisherClient creates a new Pub/Sub Lite publisher client to publish
// messages to a given topic, using DefaultPublishSettings. A valid topic path
// has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}
// NewPublisherClientWithSettings creates a new Pub/Sub Lite publisher client to
// publish messages to a given topic, using the specified PublishSettings. A
// valid topic path has the format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
return nil, err
}
region, err := wire.ZoneToRegion(topicPath.Zone)
if err != nil {
return nil, err
}
// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic, opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}
// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server. Retryable
// errors are automatically handled. If a PublishResult returns an error, this
// indicates that the publisher client encountered a fatal error and has
// permanently terminated. After the fatal error has been resolved, a new
// publisher client instance must be created to republish failed messages.
//
// Once Stop() has been called or the publisher client has failed permanently
// due to an error, future calls to Publish will immediately return a
// PublishResult with error ErrPublisherStopped.
//
// Error() returns the error that caused the publisher client to terminate and
// may contain more context than the error returned by PublishResult.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := ipubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
ipubsub.SetPublishResult(result, "", err)
p.setError(err)
p.wirePub.Stop()
return result
}
p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) {
if metadata != nil {
ipubsub.SetPublishResult(result, metadata.String(), err)
} else {
ipubsub.SetPublishResult(result, "", err)
}
})
return result
}
// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent. Stop should be called when the client is no longer required.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}
// Error returns the error that caused the publisher client to terminate. The
// error returned here may contain more context than PublishResult errors. The
// return value may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.err != nil {
return p.err
}
return p.wirePub.Error()
}
func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()
// Don't clobber original error.
if p.err == nil {
p.err = err
}
}
func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}
keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}