diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 8c548d3c41d..5cd00800490 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -29,6 +29,18 @@ var ( errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription") ) +// BacklogLocation refers to a location with respect to the message backlog. +type BacklogLocation int + +const ( + // End refers to the location past all currently published messages. End + // skips the entire message backlog. + End BacklogLocation = iota + 1 + + // Beginning refers to the location of the oldest retained message. + Beginning +) + // AdminClient provides admin operations for Pub/Sub Lite resources within a // Google Cloud region. The zone component of resource paths must be within this // region. See https://cloud.google.com/pubsub/lite/docs/locations for the list @@ -147,9 +159,40 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator } } +type createSubscriptionSettings struct { + backlogLocation BacklogLocation +} + +// CreateSubscriptionOption is an option for AdminClient.CreateSubscription. +type CreateSubscriptionOption interface { + Apply(*createSubscriptionSettings) +} + +type startingOffset struct { + backlogLocation BacklogLocation +} + +func (so startingOffset) Apply(settings *createSubscriptionSettings) { + settings.backlogLocation = so.backlogLocation +} + +// StartingOffset specifies the offset at which a newly created subscription +// will start receiving messages. +func StartingOffset(location BacklogLocation) CreateSubscriptionOption { + return startingOffset{location} +} + // CreateSubscription creates a new subscription from the given config. If the // subscription already exists an error will be returned. -func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) { +// +// By default, a new subscription will only receive messages published after +// the subscription was created. Use StartingOffset to override. +func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) { + var settings createSubscriptionSettings + for _, opt := range opts { + opt.Apply(&settings) + } + subsPath, err := wire.ParseSubscriptionPath(config.Name) if err != nil { return nil, err @@ -161,6 +204,7 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti Parent: subsPath.Location().String(), Subscription: config.toProto(), SubscriptionId: subsPath.SubscriptionID, + SkipBacklog: settings.backlogLocation != Beginning, } subspb, err := ac.admin.CreateSubscription(ctx, req) if err != nil { diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index 522b8091c5d..d0a9b0ee5b5 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -275,6 +275,13 @@ func TestAdminSubscriptionCRUD(t *testing.T) { Parent: "projects/my-proj/locations/us-central1-a", SubscriptionId: "my-subscription", Subscription: subscriptionConfig.toProto(), + SkipBacklog: true, + } + wantCreateAtBacklogReq := &pb.CreateSubscriptionRequest{ + Parent: "projects/my-proj/locations/us-central1-a", + SubscriptionId: "my-subscription", + Subscription: subscriptionConfig.toProto(), + SkipBacklog: false, } wantUpdateReq := updateConfig.toUpdateRequest() wantGetReq := &pb.GetSubscriptionRequest{ @@ -286,6 +293,8 @@ func TestAdminSubscriptionCRUD(t *testing.T) { verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantCreateAtBacklogReq, subscriptionConfig.toProto(), nil) verifiers.GlobalVerifier.Push(wantUpdateReq, subscriptionConfig.toProto(), nil) verifiers.GlobalVerifier.Push(wantGetReq, subscriptionConfig.toProto(), nil) verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil) @@ -301,6 +310,18 @@ func TestAdminSubscriptionCRUD(t *testing.T) { t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) } + if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(End)); err != nil { + t.Errorf("CreateSubscription() got err: %v", err) + } else if !testutil.Equal(gotConfig, &subscriptionConfig) { + t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) + } + + if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(Beginning)); err != nil { + t.Errorf("CreateSubscription() got err: %v", err) + } else if !testutil.Equal(gotConfig, &subscriptionConfig) { + t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) + } + if gotConfig, err := admin.UpdateSubscription(ctx, updateConfig); err != nil { t.Errorf("UpdateSubscription() got err: %v", err) } else if !testutil.Equal(gotConfig, &subscriptionConfig) {