Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): adding ability to create subscriptions at head #3790

Merged
merged 19 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 45 additions & 1 deletion pubsublite/admin.go
Expand Up @@ -29,6 +29,18 @@ var (
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
)

// BacklogLocation refers to a location with respect to the message backlog.
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
type BacklogLocation int

const (
// End refers to the location past all currently published messages. End
// skips the entire message backlog.
End BacklogLocation = iota + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be clear from the name that this is a BacklogLocation? I'm thinking about reading code that uses this: pubsublite.End isn't totally clear what End is.

(I'm not familiar with the feature, so totally fine if you say, yes, it's clear in context.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be clear to users based on context. When calling the create subscription method, users will have to specify StartingOffset(Beginning/End) - The StartingOffset should indicate that the option between beginning/end is a location in backlog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is any clearer, perhaps we can consider StartingAt(BeginningOfBacklog|EndOfBacklog).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with any of the above. One other idea would be basing it on "oldest" and "newest" (is the end new or old?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End represents the newest message, Beginning represents the oldest retained. My preference would be to keep this as End/Beginning, as this is what we use for gCloud, the UI, and other client libraries.

codyoss marked this conversation as resolved.
Show resolved Hide resolved

// Beginning refers to the location of the oldest retained message.
Beginning
)

// AdminClient provides admin operations for Pub/Sub Lite resources within a
// Google Cloud region. The zone component of resource paths must be within this
// region. See https://cloud.google.com/pubsub/lite/docs/locations for the list
Expand Down Expand Up @@ -147,9 +159,40 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
}
}

type createSubscriptionSettings struct {
backlogLocation BacklogLocation
}

// CreateSubscriptionOption is an option for AdminClient.CreateSubscription.
type CreateSubscriptionOption interface {
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
Apply(*createSubscriptionSettings)
}

hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
type startingOffset struct {
backlogLocation BacklogLocation
}

func (so startingOffset) Apply(settings *createSubscriptionSettings) {
settings.backlogLocation = so.backlogLocation
}

// StartingOffset specifies the offset at which a newly created subscription
// will start receiving messages.
func StartingOffset(location BacklogLocation) CreateSubscriptionOption {
return startingOffset{location}
}

// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
//
// By default, a new subscription will only receive messages published after
// the subscription was created. Use StartingOffset to override.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is, unfortunately, a breaking change. Users may have defined an interface with the CreateSubscription(context.Context, pubsublite.SubscriptionConfig) method. If we change this signature, AdminClient will no longer fulfill that interface.

Is that OK?

Alternatively, could we add it to the SubscriptionConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine, I don't believe this library has been released yet. Right @tmdiep?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The library is released as Beta, but since that was only recent, the likelihood of a user creating an interface for AdminClient is very low. This low risk is fine to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, why don't we want to put it into the config?

(The two common patterns for functions like this are config arguments and options -- I'm surprised to see both.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the fields in the SubscriptionConfig map directly to a field in the Subscription resource proto - these fields are defining features of the subscription itself. On the other hand, the StartingOffset option only makes sense on subscription creation - it is not part of the subscription resource and does not map to any field in the subscription proto (it instead maps to a field in the create subscription request).

The SubscriptionConfig is used elsewhere in the library (ex. it is returned from the UpdateSubscription request). For this reason, I think that it is misleading to have the StartingOffset in the subscription config when it only makes sense on subscription creation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. That makes sense. Thanks!

var settings createSubscriptionSettings
for _, opt := range opts {
opt.Apply(&settings)
}

subsPath, err := wire.ParseSubscriptionPath(config.Name)
if err != nil {
return nil, err
Expand All @@ -161,6 +204,7 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
Parent: subsPath.Location().String(),
Subscription: config.toProto(),
SubscriptionId: subsPath.SubscriptionID,
SkipBacklog: settings.backlogLocation != Beginning,
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pubsublite/admin_test.go
Expand Up @@ -275,6 +275,13 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
Parent: "projects/my-proj/locations/us-central1-a",
SubscriptionId: "my-subscription",
Subscription: subscriptionConfig.toProto(),
SkipBacklog: true,
}
wantCreateAtBacklogReq := &pb.CreateSubscriptionRequest{
Parent: "projects/my-proj/locations/us-central1-a",
SubscriptionId: "my-subscription",
Subscription: subscriptionConfig.toProto(),
SkipBacklog: false,
}
wantUpdateReq := updateConfig.toUpdateRequest()
wantGetReq := &pb.GetSubscriptionRequest{
Expand All @@ -286,6 +293,8 @@ func TestAdminSubscriptionCRUD(t *testing.T) {

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantCreateAtBacklogReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantUpdateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantGetReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil)
Expand All @@ -301,6 +310,18 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(End)); err != nil {
t.Errorf("CreateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(Beginning)); err != nil {
t.Errorf("CreateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.UpdateSubscription(ctx, updateConfig); err != nil {
t.Errorf("UpdateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
Expand Down