Skip to content

Commit

Permalink
docs(pubsublite): clarifications for publishing (#3695)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Feb 11, 2021
1 parent 6a8d4c5 commit 286a77d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/errors.go
Expand Up @@ -39,7 +39,7 @@ var (
ErrServiceStarting = errors.New("pubsublite: service is starting up")

// ErrServiceStopped indicates that a service (e.g. publisher or subscriber)
// cannot perform an operation because it has stoped or is in the process of
// cannot perform an operation because it has stopped or is in the process of
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")
)
Expand Down
14 changes: 9 additions & 5 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -41,6 +41,8 @@ func ExamplePublisherClient_Publish() {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
// NOTE: The publisher will terminate upon first error. Create a new
// publisher to republish failed messages.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand Down Expand Up @@ -73,6 +75,8 @@ func ExamplePublisherClient_Publish_batchingSettings() {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
// NOTE: The publisher will terminate upon first error. Create a new
// publisher to republish failed messages.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand All @@ -96,12 +100,12 @@ func ExamplePublisherClient_Error() {
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// Prints the fatal error that caused the publisher to terminate.
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())

// TODO: Handle error.
if err == pscompat.ErrPublisherStopped {
// Prints the fatal error that caused the publisher to terminate.
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
break
}
// NOTE: The publisher will terminate upon first error. Create a new
// publisher to republish failed messages.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
Expand Down
12 changes: 8 additions & 4 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -36,8 +36,9 @@ var (
ErrOversizedMessage = bundler.ErrOversizedItem

// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped. PublisherClient.Error()
// returns the error that caused the publisher client to terminate (if any).
// published because the publisher client has stopped or is in the process of
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any).
ErrPublisherStopped = wire.ErrServiceStopped
)

Expand Down Expand Up @@ -105,7 +106,10 @@ func NewPublisherClientWithSettings(ctx context.Context, topic string, settings
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server.
// message has been sent (or has failed to be sent) to the server. Once the
// first publish error occurs, the publisher will terminate and subsequent
// PublishResults will also have an error. A new publisher must be created to
// republish failed messages.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
Expand Down Expand Up @@ -134,7 +138,7 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub

// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent.
// sent. Stop should be called when the client is no longer required.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
Expand Down

0 comments on commit 286a77d

Please sign in to comment.