From bc083b66972b1c4329c18da9529c76b79ef56c50 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Thu, 25 Mar 2021 12:03:43 -0700 Subject: [PATCH] feat(pubsublite): adding ability to create subscriptions at head (#3790) * feat: adding ability to create subscriptions at head * fix: formatting * fix: line formatting * fix: format * fix: update docs * fix: requested changes * fix: doc update * fix: add create subscription opts doc * fix: doc formatting * fix: docs * fix: use variadic opts in create request * fix: change offset location name to backlog location * fix: create a subscription settings struct Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com> --- pubsublite/admin.go | 46 +++++++++++++++++++++++++++++++++++++++- pubsublite/admin_test.go | 21 ++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 8c548d3c41d..5cd00800490 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -29,6 +29,18 @@ var ( errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription") ) +// BacklogLocation refers to a location with respect to the message backlog. +type BacklogLocation int + +const ( + // End refers to the location past all currently published messages. End + // skips the entire message backlog. + End BacklogLocation = iota + 1 + + // Beginning refers to the location of the oldest retained message. + Beginning +) + // AdminClient provides admin operations for Pub/Sub Lite resources within a // Google Cloud region. The zone component of resource paths must be within this // region. See https://cloud.google.com/pubsub/lite/docs/locations for the list @@ -147,9 +159,40 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator } } +type createSubscriptionSettings struct { + backlogLocation BacklogLocation +} + +// CreateSubscriptionOption is an option for AdminClient.CreateSubscription. +type CreateSubscriptionOption interface { + Apply(*createSubscriptionSettings) +} + +type startingOffset struct { + backlogLocation BacklogLocation +} + +func (so startingOffset) Apply(settings *createSubscriptionSettings) { + settings.backlogLocation = so.backlogLocation +} + +// StartingOffset specifies the offset at which a newly created subscription +// will start receiving messages. +func StartingOffset(location BacklogLocation) CreateSubscriptionOption { + return startingOffset{location} +} + // CreateSubscription creates a new subscription from the given config. If the // subscription already exists an error will be returned. -func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) { +// +// By default, a new subscription will only receive messages published after +// the subscription was created. Use StartingOffset to override. +func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) { + var settings createSubscriptionSettings + for _, opt := range opts { + opt.Apply(&settings) + } + subsPath, err := wire.ParseSubscriptionPath(config.Name) if err != nil { return nil, err @@ -161,6 +204,7 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti Parent: subsPath.Location().String(), Subscription: config.toProto(), SubscriptionId: subsPath.SubscriptionID, + SkipBacklog: settings.backlogLocation != Beginning, } subspb, err := ac.admin.CreateSubscription(ctx, req) if err != nil { diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index 522b8091c5d..d0a9b0ee5b5 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -275,6 +275,13 @@ func TestAdminSubscriptionCRUD(t *testing.T) { Parent: "projects/my-proj/locations/us-central1-a", SubscriptionId: "my-subscription", Subscription: subscriptionConfig.toProto(), + SkipBacklog: true, + } + wantCreateAtBacklogReq := &pb.CreateSubscriptionRequest{ + Parent: "projects/my-proj/locations/us-central1-a", + SubscriptionId: "my-subscription", + Subscription: subscriptionConfig.toProto(), + SkipBacklog: false, } wantUpdateReq := updateConfig.toUpdateRequest() wantGetReq := &pb.GetSubscriptionRequest{ @@ -286,6 +293,8 @@ func TestAdminSubscriptionCRUD(t *testing.T) { verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantCreateAtBacklogReq, subscriptionConfig.toProto(), nil) verifiers.GlobalVerifier.Push(wantUpdateReq, subscriptionConfig.toProto(), nil) verifiers.GlobalVerifier.Push(wantGetReq, subscriptionConfig.toProto(), nil) verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil) @@ -301,6 +310,18 @@ func TestAdminSubscriptionCRUD(t *testing.T) { t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) } + if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(End)); err != nil { + t.Errorf("CreateSubscription() got err: %v", err) + } else if !testutil.Equal(gotConfig, &subscriptionConfig) { + t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) + } + + if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(Beginning)); err != nil { + t.Errorf("CreateSubscription() got err: %v", err) + } else if !testutil.Equal(gotConfig, &subscriptionConfig) { + t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) + } + if gotConfig, err := admin.UpdateSubscription(ctx, updateConfig); err != nil { t.Errorf("UpdateSubscription() got err: %v", err) } else if !testutil.Equal(gotConfig, &subscriptionConfig) {