Skip to content

Commit

Permalink
feat(pubsublite): adding ability to create subscriptions at head (#3790)
Browse files Browse the repository at this point in the history
* feat: adding ability to create subscriptions at head

* fix: formatting

* fix: line formatting

* fix: format

* fix: update docs

* fix: requested changes

* fix: doc update

* fix: add create subscription opts doc

* fix: doc formatting

* fix: docs

* fix: use variadic opts in create request

* fix: change offset location name to backlog location

* fix: create a subscription settings struct

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
hannahrogers-google and hongalex committed Mar 25, 2021
1 parent a0b1f6f commit bc083b6
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
46 changes: 45 additions & 1 deletion pubsublite/admin.go
Expand Up @@ -29,6 +29,18 @@ var (
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
)

// BacklogLocation refers to a location with respect to the message backlog.
type BacklogLocation int

const (
// End refers to the location past all currently published messages. End
// skips the entire message backlog.
End BacklogLocation = iota + 1

// Beginning refers to the location of the oldest retained message.
Beginning
)

// AdminClient provides admin operations for Pub/Sub Lite resources within a
// Google Cloud region. The zone component of resource paths must be within this
// region. See https://cloud.google.com/pubsub/lite/docs/locations for the list
Expand Down Expand Up @@ -147,9 +159,40 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
}
}

type createSubscriptionSettings struct {
backlogLocation BacklogLocation
}

// CreateSubscriptionOption is an option for AdminClient.CreateSubscription.
type CreateSubscriptionOption interface {
Apply(*createSubscriptionSettings)
}

type startingOffset struct {
backlogLocation BacklogLocation
}

func (so startingOffset) Apply(settings *createSubscriptionSettings) {
settings.backlogLocation = so.backlogLocation
}

// StartingOffset specifies the offset at which a newly created subscription
// will start receiving messages.
func StartingOffset(location BacklogLocation) CreateSubscriptionOption {
return startingOffset{location}
}

// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
//
// By default, a new subscription will only receive messages published after
// the subscription was created. Use StartingOffset to override.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) {
var settings createSubscriptionSettings
for _, opt := range opts {
opt.Apply(&settings)
}

subsPath, err := wire.ParseSubscriptionPath(config.Name)
if err != nil {
return nil, err
Expand All @@ -161,6 +204,7 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
Parent: subsPath.Location().String(),
Subscription: config.toProto(),
SubscriptionId: subsPath.SubscriptionID,
SkipBacklog: settings.backlogLocation != Beginning,
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pubsublite/admin_test.go
Expand Up @@ -275,6 +275,13 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
Parent: "projects/my-proj/locations/us-central1-a",
SubscriptionId: "my-subscription",
Subscription: subscriptionConfig.toProto(),
SkipBacklog: true,
}
wantCreateAtBacklogReq := &pb.CreateSubscriptionRequest{
Parent: "projects/my-proj/locations/us-central1-a",
SubscriptionId: "my-subscription",
Subscription: subscriptionConfig.toProto(),
SkipBacklog: false,
}
wantUpdateReq := updateConfig.toUpdateRequest()
wantGetReq := &pb.GetSubscriptionRequest{
Expand All @@ -286,6 +293,8 @@ func TestAdminSubscriptionCRUD(t *testing.T) {

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantCreateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantCreateAtBacklogReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantUpdateReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantGetReq, subscriptionConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil)
Expand All @@ -301,6 +310,18 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(End)); err != nil {
t.Errorf("CreateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.CreateSubscription(ctx, subscriptionConfig, StartingOffset(Beginning)); err != nil {
t.Errorf("CreateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
t.Errorf("CreateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if gotConfig, err := admin.UpdateSubscription(ctx, updateConfig); err != nil {
t.Errorf("UpdateSubscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
Expand Down

0 comments on commit bc083b6

Please sign in to comment.