Skip to content

Commit

Permalink
refactor(pubsub): use Message and PublishResult in internal/pubsub (#…
Browse files Browse the repository at this point in the history
…3231)

Replaces Message and PublishResult with aliases of types from internal/pubsub. Refactors all usage.
  • Loading branch information
tmdiep committed Dec 9, 2020
1 parent a10263a commit 48aa1ce
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 177 deletions.
2 changes: 1 addition & 1 deletion pubsub/go.mod
Expand Up @@ -3,7 +3,7 @@ module cloud.google.com/go/pubsub
go 1.11

require (
cloud.google.com/go v0.72.0
cloud.google.com/go v0.73.0
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.4
github.com/googleapis/gax-go/v2 v2.0.5
Expand Down
7 changes: 7 additions & 0 deletions pubsub/go.sum
Expand Up @@ -19,6 +19,8 @@ cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww=
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go v0.73.0 h1:sGvc4e0Cmm4+DKQR76a9VwNukpacQK8TOl5pDl0Pcn0=
cloud.google.com/go v0.73.0/go.mod h1:BkDh9dFvGjCitVw03TNjKbBxXNKULXXIq6orU6HrJ4Q=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
Expand Down Expand Up @@ -119,6 +121,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20201117184057-ae444373da19/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
Expand Down Expand Up @@ -230,6 +233,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjN
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -352,6 +357,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201204162204-73cf035baebf h1:LJkCozzIEY51bepolJQN3tP938NA5mMucF2dDJ9AMNA=
golang.org/x/tools v0.0.0-20201204162204-73cf035baebf/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -420,6 +426,7 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201203001206-6486ece9c497/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446 h1:65ppmIPdaZE+BO34gntwqexoTYr30IRNGmS0OGOHu3A=
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
18 changes: 7 additions & 11 deletions pubsub/integration_test.go
Expand Up @@ -1149,9 +1149,8 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
OrderingKey: orderingKey,
})
go func() {
<-r.ready
if r.err != nil {
t.Error(r.err)
if _, err := r.Get(ctx); err != nil {
t.Error(err)
}
}()
}
Expand Down Expand Up @@ -1320,8 +1319,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
Data: bytes.Repeat([]byte("A"), 1000),
OrderingKey: orderingKey,
})
<-r.ready
if r.err == nil {
if _, err := r.Get(ctx); err == nil {
t.Fatalf("expected bundle byte limit error, got nil")
}
// Publish a normal sized message now, which should fail
Expand All @@ -1330,9 +1328,8 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
Data: []byte("should fail"),
OrderingKey: orderingKey,
})
<-r.ready
if r.err == nil || !strings.Contains(r.err.Error(), "pubsub: Publishing for ordering key") {
t.Fatalf("expected ordering keys publish error, got %v", r.err)
if _, err := r.Get(ctx); err == nil || !strings.Contains(err.Error(), "pubsub: Publishing for ordering key") {
t.Fatalf("expected ordering keys publish error, got %v", err)
}

// Lastly, call ResumePublish and make sure subsequent publishes succeed.
Expand All @@ -1341,9 +1338,8 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
Data: []byte("should succeed"),
OrderingKey: orderingKey,
})
<-r.ready
if r.err != nil {
t.Fatalf("got error while publishing message: %v", r.err)
if _, err := r.Get(ctx); err != nil {
t.Fatalf("got error while publishing message: %v", err)
}
}

Expand Down
16 changes: 7 additions & 9 deletions pubsub/iterator.go
Expand Up @@ -205,7 +205,8 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
return nil, it.fail(err)
}
recordStat(it.ctx, PullCount, int64(len(rmsgs)))
msgs, err := convertMessages(rmsgs)
now := time.Now()
msgs, err := convertMessages(rmsgs, now, it.done)
if err != nil {
return nil, it.fail(err)
}
Expand All @@ -214,17 +215,14 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
maxExt := time.Now().Add(it.po.maxExtension)
ackIDs := map[string]bool{}
it.mu.Lock()
now := time.Now()
for _, m := range msgs {
ackh, _ := m.ackHandler()
ackh.receiveTime = now
addRecv(m.ID, ackh.ackID, now)
ackh.doneFunc = it.done
it.keepAliveDeadlines[ackh.ackID] = maxExt
ackID := msgAckID(m)
addRecv(m.ID, ackID, now)
it.keepAliveDeadlines[ackID] = maxExt
// Don't change the mod-ack if the message is going to be nacked. This is
// possible if there are retries.
if !it.pendingNacks[ackh.ackID] {
ackIDs[ackh.ackID] = true
if !it.pendingNacks[ackID] {
ackIDs[ackID] = true
}
}
deadline := it.ackDeadline()
Expand Down
145 changes: 55 additions & 90 deletions pubsub/message.go
Expand Up @@ -15,55 +15,61 @@
package pubsub

import (
"fmt"
"time"

ipubsub "cloud.google.com/go/internal/pubsub"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
)

// Message represents a Pub/Sub message.
type Message struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
//
// This field is read-only.
ID string

// Data is the actual data in the message.
Data []byte

// Attributes represents the key-value pairs the current message is
// labelled with.
Attributes map[string]string

// PublishTime is the time at which the message was published. This is
// populated by the server for Messages obtained from a subscription.
//
// This field is read-only.
PublishTime time.Time

// DeliveryAttempt is the number of times a message has been delivered.
// This is part of the dead lettering feature that forwards messages that
// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
// If dead lettering is enabled, this will be set on all attempts, starting
// with value 1. Otherwise, the value will be nil.
// This field is read-only.
DeliveryAttempt *int

// size is the approximate size of the message's data and attributes.
size int

// OrderingKey identifies related messages for which publish order should
// be respected. If empty string is used, message will be sent unordered.
OrderingKey string

// ackh handles Ack() or Nack().
ackh ackHandler
//
// Message can be passed to Topic.Publish for publishing.
//
// If received in the callback passed to Subscription.Receive, client code must
// call Message.Ack or Message.Nack when finished processing the Message. Calls
// to Ack or Nack have no effect after the first call.
//
// Ack indicates successful processing of a Message. If message acknowledgement
// fails, the Message will be redelivered. Nack indicates that the client will
// not or cannot process a Message. Nack will result in the Message being
// redelivered more quickly than if it were allowed to expire.
type Message = ipubsub.Message

// msgAckHandler performs a safe cast of the message's ack handler to psAckHandler.
func msgAckHandler(m *Message) (*psAckHandler, bool) {
ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler)
return ackh, ok
}

func msgAckID(m *Message) string {
if ackh, ok := msgAckHandler(m); ok {
return ackh.ackID
}
return ""
}

func toMessage(resp *pb.ReceivedMessage) (*Message, error) {
// The done method of the iterator that created a Message.
type iterDoneFunc func(string, bool, time.Time)

func convertMessages(rms []*pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) ([]*Message, error) {
msgs := make([]*Message, 0, len(rms))
for i, m := range rms {
msg, err := toMessage(m, receiveTime, doneFunc)
if err != nil {
return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
}
msgs = append(msgs, msg)
}
return msgs, nil
}

func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) (*Message, error) {
ackh := &psAckHandler{ackID: resp.AckId}
msg := ipubsub.NewMessage(ackh)
if resp.Message == nil {
return &Message{ackh: &psAckHandler{ackID: resp.AckId}}, nil
return msg, nil
}

pubTime, err := ptypes.Timestamp(resp.Message.PublishTime)
Expand All @@ -77,56 +83,15 @@ func toMessage(resp *pb.ReceivedMessage) (*Message, error) {
deliveryAttempt = &da
}

return &Message{
Data: resp.Message.Data,
Attributes: resp.Message.Attributes,
ID: resp.Message.MessageId,
PublishTime: pubTime,
DeliveryAttempt: deliveryAttempt,
OrderingKey: resp.Message.OrderingKey,
ackh: &psAckHandler{ackID: resp.AckId},
}, nil
}

// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// If message acknowledgement fails, the Message will be redelivered.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Ack() {
if m.ackh != nil {
m.ackh.OnAck()
}
}

// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Nack() {
if m.ackh != nil {
m.ackh.OnNack()
}
}

// ackHandler performs a safe cast of the message's ack handler to psAckHandler.
func (m *Message) ackHandler() (*psAckHandler, bool) {
ackh, ok := m.ackh.(*psAckHandler)
return ackh, ok
}

func (m *Message) ackID() string {
if ackh, ok := m.ackh.(*psAckHandler); ok {
return ackh.ackID
}
return ""
}

// ackHandler implements ack/nack handling.
type ackHandler interface {
OnAck()
OnNack()
msg.Data = resp.Message.Data
msg.Attributes = resp.Message.Attributes
msg.ID = resp.Message.MessageId
msg.PublishTime = pubTime
msg.DeliveryAttempt = deliveryAttempt
msg.OrderingKey = resp.Message.OrderingKey
ackh.receiveTime = receiveTime
ackh.doneFunc = doneFunc
return msg, nil
}

// psAckHandler handles ack/nack for the pubsub package.
Expand All @@ -140,7 +105,7 @@ type psAckHandler struct {
calledDone bool

// The done method of the iterator that created this Message.
doneFunc func(string, bool, time.Time)
doneFunc iterDoneFunc
}

func (ah *psAckHandler) OnAck() {
Expand Down
14 changes: 0 additions & 14 deletions pubsub/service.go
Expand Up @@ -15,13 +15,11 @@
package pubsub

import (
"fmt"
"math"
"strings"
"time"

gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -37,18 +35,6 @@ const (
maxSendRecvBytes = 20 * 1024 * 1024 // 20M
)

func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) {
msgs := make([]*Message, 0, len(rms))
for i, m := range rms {
msg, err := toMessage(m)
if err != nil {
return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
}
msgs = append(msgs, msg)
}
return msgs, nil
}

func trunc32(i int64) int32 {
if i > math.MaxInt32 {
i = math.MaxInt32
Expand Down
14 changes: 7 additions & 7 deletions pubsub/streaming_pull_test.go
Expand Up @@ -67,7 +67,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) {
func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) {
sub := client.Subscription("S")
gotMsgs, err := pullN(context.Background(), sub, len(msgs), func(_ context.Context, m *Message) {
id, err := strconv.Atoi(m.ackID())
id, err := strconv.Atoi(msgAckID(m))
if err != nil {
panic(err)
}
Expand All @@ -83,14 +83,14 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer
}
gotMap := map[string]*Message{}
for _, m := range gotMsgs {
gotMap[m.ackID()] = m
gotMap[msgAckID(m)] = m
}
for i, msg := range msgs {
want, err := toMessage(msg)
want, err := toMessage(msg, time.Time{}, nil)
if err != nil {
t.Fatal(err)
}
wantAckh, _ := want.ackHandler()
wantAckh, _ := msgAckHandler(want)
wantAckh.calledDone = true
got := gotMap[wantAckh.ackID]
if got == nil {
Expand Down Expand Up @@ -236,10 +236,10 @@ func TestStreamingPullConcurrent(t *testing.T) {
}
seen := map[string]bool{}
for _, gm := range gotMsgs {
if seen[gm.ackID()] {
t.Fatalf("duplicate ID %q", gm.ackID())
if seen[msgAckID(gm)] {
t.Fatalf("duplicate ID %q", msgAckID(gm))
}
seen[gm.ackID()] = true
seen[msgAckID(gm)] = true
}
if len(seen) != nMessages {
t.Fatalf("got %d messages, want %d", len(seen), nMessages)
Expand Down
2 changes: 1 addition & 1 deletion pubsub/subscription.go
Expand Up @@ -916,7 +916,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// Return nil if the context is done, not err.
return nil
}
ackh, _ := msg.ackHandler()
ackh, _ := msgAckHandler(msg)
old := ackh.doneFunc
msgLen := len(msg.Data)
ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
Expand Down

0 comments on commit 48aa1ce

Please sign in to comment.