Skip to content

Commit

Permalink
refactor(pubsub): update proto library dependency for linter (#3793)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Mar 11, 2021
1 parent 653082d commit f321666
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 99 deletions.
8 changes: 3 additions & 5 deletions pubsub/apiv1/mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pubsub/go.mod
Expand Up @@ -14,4 +14,5 @@ require (
google.golang.org/api v0.41.0
google.golang.org/genproto v0.0.0-20210311153111-e2979279ddde
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.25.0
)
9 changes: 5 additions & 4 deletions pubsub/integration_test.go
Expand Up @@ -32,7 +32,6 @@ import (
"cloud.google.com/go/internal/version"
kms "cloud.google.com/go/kms/apiv1"
testutil2 "cloud.google.com/go/pubsub/internal/testutil"
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
Expand All @@ -43,6 +42,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)

var (
Expand Down Expand Up @@ -292,7 +293,7 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Now().Sub(now) >= time.Minute {
if time.Since(now) >= time.Minute {
t.Fatal("pullN took too long")
}
} else {
Expand Down Expand Up @@ -390,8 +391,8 @@ func TestIntegration_LargePublishSize(t *testing.T) {
length := MaxPublishRequestBytes - calcFieldSizeString(topic.String())
// Next, account for the overhead from encoding an individual PubsubMessage,
// and the inner PubsubMessage.Data field.
pbMsgOverhead := 1 + proto.SizeVarint(uint64(length))
dataOverhead := 1 + proto.SizeVarint(uint64(length-pbMsgOverhead))
pbMsgOverhead := 1 + protowire.SizeVarint(uint64(length))
dataOverhead := 1 + protowire.SizeVarint(uint64(length-pbMsgOverhead))
maxLengthSingleMessage := length - pbMsgOverhead - dataOverhead

publishReq := &pb.PublishRequest{
Expand Down
6 changes: 3 additions & 3 deletions pubsub/iterator.go
Expand Up @@ -23,12 +23,12 @@ import (

vkit "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/internal/distribution"
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
)

// Between message receipt and ack (that is, the time spent processing a message) we want to extend the message
Expand Down Expand Up @@ -536,7 +536,7 @@ func (it *messageIterator) pingStream() {
func calcFieldSizeString(fields ...string) int {
overhead := 0
for _, field := range fields {
overhead += 1 + len(field) + proto.SizeVarint(uint64(len(field)))
overhead += 1 + len(field) + protowire.SizeVarint(uint64(len(field)))
}
return overhead
}
Expand All @@ -546,7 +546,7 @@ func calcFieldSizeString(fields ...string) int {
func calcFieldSizeInt(fields ...int) int {
overhead := 0
for _, field := range fields {
overhead += 1 + proto.SizeVarint(uint64(field))
overhead += 1 + protowire.SizeVarint(uint64(field))
}
return overhead
}
Expand Down
6 changes: 1 addition & 5 deletions pubsub/loadtest/loadtest.go
Expand Up @@ -31,7 +31,6 @@ import (

"cloud.google.com/go/pubsub"
pb "cloud.google.com/go/pubsub/loadtest/pb"
"github.com/golang/protobuf/ptypes"
"golang.org/x/time/rate"
)

Expand All @@ -56,10 +55,7 @@ func (l *PubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartR
if err != nil {
return nil, err
}
dur, err := ptypes.Duration(req.PublishBatchDuration)
if err != nil {
return nil, err
}
dur := req.PublishBatchDuration.AsDuration()
l.init(c, req.Topic, req.MessageSize, req.PublishBatchSize, dur)
log.Println("started")
return &pb.StartResponse{}, nil
Expand Down
6 changes: 1 addition & 5 deletions pubsub/message.go
Expand Up @@ -19,7 +19,6 @@ import (
"time"

ipubsub "cloud.google.com/go/internal/pubsub"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
)

Expand Down Expand Up @@ -72,10 +71,7 @@ func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDon
return msg, nil
}

pubTime, err := ptypes.Timestamp(resp.Message.PublishTime)
if err != nil {
return nil, err
}
pubTime := resp.Message.PublishTime.AsTime()

var deliveryAttempt *int
if resp.DeliveryAttempt > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/mock_test.go
Expand Up @@ -23,8 +23,8 @@ import (
"time"

"cloud.google.com/go/internal/testutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/types/known/emptypb"
)

type mockServer struct {
Expand Down
28 changes: 9 additions & 19 deletions pubsub/pstest/fake.go
Expand Up @@ -34,12 +34,12 @@ import (
"time"

"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/ptypes"
durpb "github.com/golang/protobuf/ptypes/duration"
emptypb "github.com/golang/protobuf/ptypes/empty"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
durpb "google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// ReactorOptions is a map that Server uses to look up reactors.
Expand Down Expand Up @@ -455,11 +455,11 @@ const (
maxMessageRetentionDuration = 168 * time.Hour
)

var defaultMessageRetentionDuration = ptypes.DurationProto(maxMessageRetentionDuration)
var defaultMessageRetentionDuration = durpb.New(maxMessageRetentionDuration)

func checkMRD(pmrd *durpb.Duration) error {
mrd, err := ptypes.Duration(pmrd)
if err != nil || mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration {
mrd := pmrd.AsDuration()
if mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration {
return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd)
}
return nil
Expand Down Expand Up @@ -619,10 +619,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis
s.nextID++
pm.MessageId = id
pubTime := s.timeNowFunc()
tsPubTime, err := ptypes.TimestampProto(pubTime)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
tsPubTime := timestamppb.New(pubTime)
pm.PublishTime = tsPubTime
m := &Message{
ID: id,
Expand Down Expand Up @@ -833,11 +830,7 @@ func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekRespon
case nil:
return nil, status.Errorf(codes.InvalidArgument, "missing Seek target type")
case *pb.SeekRequest_Time:
var err error
target, err = ptypes.Timestamp(v.Time)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "bad Time target: %v", err)
}
target = v.Time.AsTime()
default:
return nil, status.Errorf(codes.Unimplemented, "unhandled Seek target type %T", v)
}
Expand Down Expand Up @@ -985,10 +978,7 @@ func (s *subscription) maintainMessages(now time.Time) {
if m.outstanding() && now.After(m.ackDeadline) {
m.makeAvailable()
}
pubTime, err := ptypes.Timestamp(m.proto.Message.PublishTime)
if err != nil {
panic(err)
}
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
delete(s.msgs, id)
Expand Down
24 changes: 11 additions & 13 deletions pubsub/pstest/fake_test.go
Expand Up @@ -25,12 +25,13 @@ import (
"time"

"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestTopics(t *testing.T) {
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestSubscriptionErrors(t *testing.T) {
checkCode(err, codes.NotFound)
_, err = sclient.Seek(ctx, &pb.SeekRequest{})
checkCode(err, codes.InvalidArgument)
srt := &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}
srt := &pb.SeekRequest_Time{Time: timestamppb.Now()}
_, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt})
checkCode(err, codes.InvalidArgument)
_, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt, Subscription: "s"})
Expand Down Expand Up @@ -278,10 +279,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages
if err != nil {
t.Fatal(err)
}
tsPubTime, err := ptypes.TimestampProto(pubTime)
if err != nil {
t.Fatal(err)
}
tsPubTime := timestamppb.New(pubTime)
want := map[string]*pb.PubsubMessage{}
for i, id := range res.MessageIds {
want[id] = &pb.PubsubMessage{
Expand Down Expand Up @@ -639,7 +637,7 @@ func TestSeek(t *testing.T) {
Topic: top.Name,
AckDeadlineSeconds: 10,
})
ts := ptypes.TimestampNow()
ts := timestamppb.Now()
_, err := sclient.Seek(context.Background(), &pb.SeekRequest{
Subscription: sub.Name,
Target: &pb.SeekRequest_Time{Time: ts},
Expand Down Expand Up @@ -700,7 +698,7 @@ func TestTimeNowFunc(t *testing.T) {

m := s.Message(id)
if m == nil {
t.Error("got nil, want a message")
t.Fatalf("got nil, want a message")
}
if got, want := m.PublishTime, timeFunc(); got != want {
t.Fatalf("got %v, want %v", got, want)
Expand Down Expand Up @@ -797,8 +795,8 @@ func TestUpdateRetryPolicy(t *testing.T) {
Name: "projects/P/subscriptions/S",
Topic: top.Name,
RetryPolicy: &pb.RetryPolicy{
MinimumBackoff: ptypes.DurationProto(10 * time.Second),
MaximumBackoff: ptypes.DurationProto(60 * time.Second),
MinimumBackoff: durationpb.New(10 * time.Second),
MaximumBackoff: durationpb.New(60 * time.Second),
},
})

Expand All @@ -807,8 +805,8 @@ func TestUpdateRetryPolicy(t *testing.T) {
Name: sub.Name,
Topic: top.Name,
RetryPolicy: &pb.RetryPolicy{
MinimumBackoff: ptypes.DurationProto(20 * time.Second),
MaximumBackoff: ptypes.DurationProto(100 * time.Second),
MinimumBackoff: durationpb.New(20 * time.Second),
MaximumBackoff: durationpb.New(100 * time.Second),
},
}

Expand Down Expand Up @@ -1007,7 +1005,7 @@ func TestErrorInjection(t *testing.T) {
},
{
funcName: "Seek",
param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}},
param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: timestamppb.Now()}},
},
}

Expand Down
9 changes: 6 additions & 3 deletions pubsub/pullstream.go
Expand Up @@ -28,8 +28,9 @@ import (
// A pullStream supports the methods of a StreamingPullClient, but re-opens
// the stream on a retryable error.
type pullStream struct {
ctx context.Context
open func() (pb.Subscriber_StreamingPullClient, error)
ctx context.Context
open func() (pb.Subscriber_StreamingPullClient, error)
cancel context.CancelFunc

mu sync.Mutex
spc *pb.Subscriber_StreamingPullClient
Expand All @@ -41,8 +42,10 @@ type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_S

func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
ctx = withSubscriptionKey(ctx, subName)
ctx, cancel := context.WithCancel(ctx)
return &pullStream{
ctx: ctx,
ctx: ctx,
cancel: cancel,
open: func() (pb.Subscriber_StreamingPullClient, error) {
spc, err := streamingPull(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
if err == nil {
Expand Down
14 changes: 4 additions & 10 deletions pubsub/snapshot.go
Expand Up @@ -20,8 +20,8 @@ import (
"strings"
"time"

"github.com/golang/protobuf/ptypes"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Snapshot is a reference to a PubSub snapshot.
Expand Down Expand Up @@ -100,11 +100,8 @@ func (s *Snapshot) Delete(ctx context.Context) error {
// creation time), only retained messages will be marked as unacknowledged,
// and already-expunged messages will not be restored.
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error {
ts, err := ptypes.TimestampProto(t)
if err != nil {
return err
}
_, err = s.c.subc.Seek(ctx, &pb.SeekRequest{
ts := timestamppb.New(t)
_, err := s.c.subc.Seek(ctx, &pb.SeekRequest{
Subscription: s.name,
Target: &pb.SeekRequest_Time{Time: ts},
})
Expand Down Expand Up @@ -148,10 +145,7 @@ func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error
}

func toSnapshotConfig(snap *pb.Snapshot, c *Client) (*SnapshotConfig, error) {
exp, err := ptypes.Timestamp(snap.ExpireTime)
if err != nil {
return nil, err
}
exp := snap.ExpireTime.AsTime()
return &SnapshotConfig{
Snapshot: &Snapshot{c: c, name: snap.Name},
Topic: newTopic(c, snap.Topic),
Expand Down
2 changes: 1 addition & 1 deletion pubsub/streaming_pull_test.go
Expand Up @@ -28,14 +28,14 @@ import (
"time"

"cloud.google.com/go/internal/testutil"
tspb "github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/api/option"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand Down

0 comments on commit f321666

Please sign in to comment.