Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(internal): copy pubsub Message and PublishResult to internal/pub…
…sub (#3351)

Message and PublishResult will be used by the pubsub and pubsublite packages, which require access to internal fields that should not be made public.
  • Loading branch information
tmdiep committed Dec 3, 2020
1 parent 28fbb9a commit 82521ee
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
97 changes: 97 additions & 0 deletions internal/pubsub/message.go
@@ -0,0 +1,97 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsub

import (
"time"
)

// AckHandler implements ack/nack handling.
type AckHandler interface {
// OnAck processes a message ack.
OnAck()

// OnNack processes a message nack.
OnNack()
}

// Message represents a Pub/Sub message.
type Message struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
//
// This field is read-only.
ID string

// Data is the actual data in the message.
Data []byte

// Attributes represents the key-value pairs the current message is
// labelled with.
Attributes map[string]string

// PublishTime is the time at which the message was published. This is
// populated by the server for Messages obtained from a subscription.
//
// This field is read-only.
PublishTime time.Time

// DeliveryAttempt is the number of times a message has been delivered.
// This is part of the dead lettering feature that forwards messages that
// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
// If dead lettering is enabled, this will be set on all attempts, starting
// with value 1. Otherwise, the value will be nil.
// This field is read-only.
DeliveryAttempt *int

// OrderingKey identifies related messages for which publish order should
// be respected. If empty string is used, message will be sent unordered.
OrderingKey string

// ackh handles Ack() or Nack().
ackh AckHandler
}

// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// If message acknowledgement fails, the Message will be redelivered.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Ack() {
if m.ackh != nil {
m.ackh.OnAck()
}
}

// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Nack() {
if m.ackh != nil {
m.ackh.OnNack()
}
}

// NewMessage creates a message with an AckHandler implementation, which should
// not be nil.
func NewMessage(ackh AckHandler) *Message {
return &Message{ackh: ackh}
}

// MessageAckHandler provides access to the internal field Message.ackh.
func MessageAckHandler(m *Message) AckHandler {
return m.ackh
}
57 changes: 57 additions & 0 deletions internal/pubsub/publish.go
@@ -0,0 +1,57 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsub

import "context"

// A PublishResult holds the result from a call to Publish.
type PublishResult struct {
ready chan struct{}
serverID string
err error
}

// Ready returns a channel that is closed when the result is ready.
// When the Ready channel is closed, Get is guaranteed not to block.
func (r *PublishResult) Ready() <-chan struct{} { return r.ready }

// Get returns the server-generated message ID and/or error result of a Publish call.
// Get blocks until the Publish call completes or the context is done.
func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) {
// If the result is already ready, return it even if the context is done.
select {
case <-r.Ready():
return r.serverID, r.err
default:
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-r.Ready():
return r.serverID, r.err
}
}

// NewPublishResult creates a PublishResult.
func NewPublishResult() *PublishResult {
return &PublishResult{ready: make(chan struct{})}
}

// SetPublishResult sets the server ID and error for a publish result and closes
// the Ready channel.
func SetPublishResult(r *PublishResult, sid string, err error) {
r.serverID = sid
r.err = err
close(r.ready)
}

0 comments on commit 82521ee

Please sign in to comment.