Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): adding ability to create subscriptions at head #3790

Merged
merged 19 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 33 additions & 0 deletions pubsublite/admin.go
Expand Up @@ -147,9 +147,41 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
}
}

// OffsetLocation refers to the location of an offset with respect to the
// message backlog.
type OffsetLocation int

const (
// UnspecifiedOffsetLocation represents an OffsetLocation that has not
// been set.
UnspecifiedOffsetLocation OffsetLocation = iota

hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
// End refers to the offset past all currently published messages. End
// skips the entire message backlog.
End

// Beginning represents the offset of the oldest retained message.
Beginning
)

// CreateSubscriptionOpts specify the options to use when creating a
// subscription.
type CreateSubscriptionOpts struct {
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
// StartingOffset is the offset at which a newly created subscription
// will start receiving messages.
StartingOffset OffsetLocation
}

// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
return ac.CreateSubscriptionWithOptions(ctx, config, CreateSubscriptionOpts{StartingOffset: End})
}

// CreateSubscriptionWithOptions creates a new subscription from the given
// config with the provided options. If the subscription already exists an error
// will be returned.
func (ac *AdminClient) CreateSubscriptionWithOptions(ctx context.Context, config SubscriptionConfig, opts CreateSubscriptionOpts) (*SubscriptionConfig, error) {
subsPath, err := wire.ParseSubscriptionPath(config.Name)
if err != nil {
return nil, err
Expand All @@ -161,6 +193,7 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
Parent: subsPath.Location().String(),
Subscription: config.toProto(),
SubscriptionId: subsPath.SubscriptionID,
SkipBacklog: opts.StartingOffset != Beginning,
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pubsublite/admin_test.go
Expand Up @@ -275,6 +275,13 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
Parent: "projects/my-proj/locations/us-central1-a",
SubscriptionId: "my-subscription",
Subscription: subscriptionConfig.toProto(),
SkipBacklog: true,
}
wantCreateAtBacklogReq := &pb.CreateSubscriptionRequest{
Parent: "projects/my-proj/locations/us-central1-a",
SubscriptionId: "my-subscription",
Subscription: subscriptionConfig.toProto(),
SkipBacklog: false,
}
wantUpdateReq := updateConfig.toUpdateRequest()
wantGetReq := &pb.GetSubscriptionRequest{
Expand All @@ -286,6 +293,7 @@ func TestAdminSubscriptionCRUD(t *testing.T) {

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantCreateAtBacklogReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantUpdateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantGetReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil)
Expand All @@ -301,6 +309,12 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.CreateSubscriptionWithOptions(ctx, subscriptionConfig, CreateSubscriptionOpts{StartingOffset: Beginning}); err != nil {
t.Errorf("CreateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.UpdateSubscription(ctx, updateConfig); err != nil {
t.Errorf("UpdateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
Expand Down