From 2dea3196a73764bd10842a3da5d0fa29ae84e101 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Mon, 9 Aug 2021 12:23:14 +1000 Subject: [PATCH] feat(pubsublite): support seek subscription in AdminClient (#4316) SeekSubscription performs an out-of-band seek for a subscription to a specified target. SeekSubscriptionOperation allows polling the seek operation. --- pubsublite/admin.go | 31 +++--- pubsublite/admin_test.go | 208 +++++++++++++++++++++++++++++++++++++ pubsublite/example_test.go | 27 +++++ pubsublite/operations.go | 159 ++++++++++++++++++++++++++++ 4 files changed, 413 insertions(+), 12 deletions(-) create mode 100644 pubsublite/operations.go diff --git a/pubsublite/admin.go b/pubsublite/admin.go index ec83aff3778..da425a267d9 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -29,18 +29,6 @@ var ( errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription") ) -// BacklogLocation refers to a location with respect to the message backlog. -type BacklogLocation int - -const ( - // End refers to the location past all currently published messages. End - // skips the entire message backlog. - End BacklogLocation = iota + 1 - - // Beginning refers to the location of the oldest retained message. - Beginning -) - // AdminClient provides admin operations for Pub/Sub Lite resources within a // Google Cloud region. The zone component of resource paths must be within this // region. See https://cloud.google.com/pubsub/lite/docs/locations for the list @@ -231,6 +219,25 @@ func (ac *AdminClient) UpdateSubscription(ctx context.Context, config Subscripti return protoToSubscriptionConfig(subspb), nil } +// SeekSubscription initiates an out-of-band seek for a subscription to a +// specified target, which may be timestamps or named positions within the +// message backlog. A valid subscription path has the format: +// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". +// +// See https://cloud.google.com/pubsub/lite/docs/seek for more information. +func (ac *AdminClient) SeekSubscription(ctx context.Context, subscription string, target SeekTarget, opts ...SeekSubscriptionOption) (*SeekSubscriptionOperation, error) { + if _, err := wire.ParseSubscriptionPath(subscription); err != nil { + return nil, err + } + req := &pb.SeekSubscriptionRequest{Name: subscription} + target.setRequest(req) + op, err := ac.admin.SeekSubscription(ctx, req) + if err != nil { + return nil, err + } + return &SeekSubscriptionOperation{op}, err +} + // DeleteSubscription deletes a subscription. A valid subscription path has the // format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error { diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index d0a9b0ee5b5..8cf493f9e7f 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -21,9 +21,14 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/pubsublite/internal/test" "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" emptypb "github.com/golang/protobuf/ptypes/empty" + tspb "github.com/golang/protobuf/ptypes/timestamp" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + lrpb "google.golang.org/genproto/googleapis/longrunning" + statuspb "google.golang.org/genproto/googleapis/rpc/status" ) func newTestAdminClient(t *testing.T) *AdminClient { @@ -449,3 +454,206 @@ func TestAdminValidateResourcePaths(t *testing.T) { t.Errorf("SubscriptionIterator.Next() should fail") } } + +func TestAdminSeekSubscription(t *testing.T) { + const subscriptionPath = "projects/my-proj/locations/us-central1-a/subscriptions/my-subscription" + const operationPath = "projects/my-proj/locations/us-central1-a/operations/seek-op" + ctx := context.Background() + + for _, tc := range []struct { + desc string + target SeekTarget + wantReq *pb.SeekSubscriptionRequest + }{ + { + desc: "Beginning", + target: Beginning, + wantReq: &pb.SeekSubscriptionRequest{ + Name: subscriptionPath, + Target: &pb.SeekSubscriptionRequest_NamedTarget_{ + NamedTarget: pb.SeekSubscriptionRequest_TAIL, + }, + }, + }, + { + desc: "End", + target: End, + wantReq: &pb.SeekSubscriptionRequest{ + Name: subscriptionPath, + Target: &pb.SeekSubscriptionRequest_NamedTarget_{ + NamedTarget: pb.SeekSubscriptionRequest_HEAD, + }, + }, + }, + { + desc: "PublishTime", + target: PublishTime(time.Unix(1234, 0)), + wantReq: &pb.SeekSubscriptionRequest{ + Name: subscriptionPath, + Target: &pb.SeekSubscriptionRequest_TimeTarget{ + TimeTarget: &pb.TimeTarget{ + Time: &pb.TimeTarget_PublishTime{ + PublishTime: &tspb.Timestamp{Seconds: 1234}, + }, + }, + }, + }, + }, + { + desc: "EventTime", + target: EventTime(time.Unix(2345, 0)), + wantReq: &pb.SeekSubscriptionRequest{ + Name: subscriptionPath, + Target: &pb.SeekSubscriptionRequest_TimeTarget{ + TimeTarget: &pb.TimeTarget{ + Time: &pb.TimeTarget_EventTime{ + EventTime: &tspb.Timestamp{Seconds: 2345}, + }, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + initialOpResponse := &lrpb.Operation{ + Name: operationPath, + Done: false, + Metadata: test.MakeAny(&pb.OperationMetadata{ + Target: subscriptionPath, + Verb: "seek", + CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700}, + }), + } + wantInitialMetadata := &OperationMetadata{ + Target: subscriptionPath, + Verb: "seek", + CreateTime: time.Unix(123456, 700), + } + + wantGetOpReq := &lrpb.GetOperationRequest{ + Name: operationPath, + } + successOpResponse := &lrpb.Operation{ + Name: operationPath, + Done: true, + Metadata: test.MakeAny(&pb.OperationMetadata{ + Target: subscriptionPath, + Verb: "seek", + CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700}, + EndTime: &tspb.Timestamp{Seconds: 234567, Nanos: 800}, + }), + Result: &lrpb.Operation_Response{ + Response: test.MakeAny(&pb.SeekSubscriptionResponse{}), + }, + } + failedOpResponse := &lrpb.Operation{ + Name: operationPath, + Done: true, + Metadata: test.MakeAny(&pb.OperationMetadata{ + Target: subscriptionPath, + Verb: "seek", + CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700}, + EndTime: &tspb.Timestamp{Seconds: 234567, Nanos: 800}, + }), + Result: &lrpb.Operation_Error{ + Error: &statuspb.Status{Code: 10}, + }, + } + wantCompleteMetadata := &OperationMetadata{ + Target: subscriptionPath, + Verb: "seek", + CreateTime: time.Unix(123456, 700), + EndTime: time.Unix(234567, 800), + } + + seekErr := status.Error(codes.FailedPrecondition, "") + + verifiers := test.NewVerifiers(t) + // Seek 1 + verifiers.GlobalVerifier.Push(tc.wantReq, initialOpResponse, nil) + verifiers.GlobalVerifier.Push(wantGetOpReq, successOpResponse, nil) + // Seek 2 + verifiers.GlobalVerifier.Push(tc.wantReq, initialOpResponse, nil) + verifiers.GlobalVerifier.Push(wantGetOpReq, failedOpResponse, nil) + // Seek 3 + verifiers.GlobalVerifier.Push(tc.wantReq, nil, seekErr) + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + admin := newTestAdminClient(t) + defer admin.Close() + + // Seek 1 - Successful operation. + op, err := admin.SeekSubscription(ctx, subscriptionPath, tc.target) + if err != nil { + t.Fatalf("SeekSubscription() got err: %v", err) + } + if got, want := op.Done(), false; got != want { + t.Errorf("Done() got %v, want %v", got, want) + } + if got, want := op.Name(), operationPath; got != want { + t.Errorf("Name() got %v, want %v", got, want) + } + gotMetadata, err := op.Metadata() + if err != nil { + t.Errorf("Metadata() got err: %v", err) + } else if diff := testutil.Diff(gotMetadata, wantInitialMetadata); diff != "" { + t.Errorf("Metadata() got: -, want: +\n%s", diff) + } + + result, err := op.Wait(ctx) + if err != nil { + t.Fatalf("Wait() got err: %v", err) + } + if result == nil { + t.Error("SeekSubscriptionResult was nil") + } + if got, want := op.Done(), true; got != want { + t.Errorf("Done() got %v, want %v", got, want) + } + gotMetadata, err = op.Metadata() + if err != nil { + t.Errorf("Metadata() got err: %v", err) + } else if diff := testutil.Diff(gotMetadata, wantCompleteMetadata); diff != "" { + t.Errorf("Metadata() got: -, want: +\n%s", diff) + } + + // Seek 2 - Failed operation. + op, err = admin.SeekSubscription(ctx, subscriptionPath, tc.target) + if err != nil { + t.Fatalf("SeekSubscription() got err: %v", err) + } + if got, want := op.Done(), false; got != want { + t.Errorf("Done() got %v, want %v", got, want) + } + if got, want := op.Name(), operationPath; got != want { + t.Errorf("Name() got %v, want %v", got, want) + } + gotMetadata, err = op.Metadata() + if err != nil { + t.Errorf("Metadata() got err: %v", err) + } else if diff := testutil.Diff(gotMetadata, wantInitialMetadata); diff != "" { + t.Errorf("Metadata() got: -, want: +\n%s", diff) + } + + _, gotErr := op.Wait(ctx) + if wantErr := status.Error(codes.Aborted, ""); !test.ErrorEqual(gotErr, wantErr) { + t.Fatalf("Wait() got err: %v, want err: %v", gotErr, wantErr) + } + if got, want := op.Done(), true; got != want { + t.Errorf("Done() got %v, want %v", got, want) + } + gotMetadata, err = op.Metadata() + if err != nil { + t.Errorf("Metadata() got err: %v", err) + } else if diff := testutil.Diff(gotMetadata, wantCompleteMetadata); diff != "" { + t.Errorf("Metadata() got: -, want: +\n%s", diff) + } + + // Seek 3 - Failed seek. + if _, gotErr := admin.SeekSubscription(ctx, subscriptionPath, tc.target); !test.ErrorEqual(gotErr, seekErr) { + t.Errorf("SeekSubscription() got err: %v, want err: %v", gotErr, seekErr) + } + }) + } +} diff --git a/pubsublite/example_test.go b/pubsublite/example_test.go index 9da58538035..063939b916e 100644 --- a/pubsublite/example_test.go +++ b/pubsublite/example_test.go @@ -175,6 +175,33 @@ func ExampleAdminClient_UpdateSubscription() { } } +func ExampleAdminClient_SeekSubscription() { + ctx := context.Background() + // NOTE: region must correspond to the zone of the subscription. + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" + seekOp, err := admin.SeekSubscription(ctx, subscription, pubsublite.Beginning) + if err != nil { + // TODO: Handle error. + } + + // Optional: Wait for the seek operation to complete, which indicates when + // subscribers for all partitions are receiving messages from the seek target. + _, err = seekOp.Wait(ctx) + if err != nil { + // TODO: Handle error. + } + metadata, err := seekOp.Metadata() + if err != nil { + // TODO: Handle error. + } + fmt.Println(metadata) +} + func ExampleAdminClient_DeleteSubscription() { ctx := context.Background() // NOTE: region must correspond to the zone of the subscription. diff --git a/pubsublite/operations.go b/pubsublite/operations.go new file mode 100644 index 00000000000..e19a252d68b --- /dev/null +++ b/pubsublite/operations.go @@ -0,0 +1,159 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "context" + "time" + + vkit "cloud.google.com/go/pubsublite/apiv1" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + tspb "google.golang.org/protobuf/types/known/timestamppb" +) + +// SeekTarget is the target location to seek a subscription to. Implemented by +// BacklogLocation, PublishTime, EventTime. +type SeekTarget interface { + setRequest(req *pb.SeekSubscriptionRequest) +} + +// BacklogLocation refers to a location with respect to the message backlog. +// It implements the SeekTarget interface. +type BacklogLocation int + +const ( + // End refers to the location past all currently published messages. End + // skips the entire message backlog. + End BacklogLocation = iota + 1 + + // Beginning refers to the location of the oldest retained message. + Beginning +) + +func (b BacklogLocation) setRequest(req *pb.SeekSubscriptionRequest) { + target := pb.SeekSubscriptionRequest_TAIL + if b == End { + target = pb.SeekSubscriptionRequest_HEAD + } + req.Target = &pb.SeekSubscriptionRequest_NamedTarget_{ + NamedTarget: target, + } +} + +// PublishTime is a message publish timestamp. It implements the SeekTarget +// interface. +type PublishTime time.Time + +func (p PublishTime) setRequest(req *pb.SeekSubscriptionRequest) { + req.Target = &pb.SeekSubscriptionRequest_TimeTarget{ + TimeTarget: &pb.TimeTarget{ + Time: &pb.TimeTarget_PublishTime{tspb.New(time.Time(p))}, + }, + } +} + +// EventTime is a message event timestamp. It implements the SeekTarget +// interface. +type EventTime time.Time + +func (e EventTime) setRequest(req *pb.SeekSubscriptionRequest) { + req.Target = &pb.SeekSubscriptionRequest_TimeTarget{ + TimeTarget: &pb.TimeTarget{ + Time: &pb.TimeTarget_EventTime{tspb.New(time.Time(e))}, + }, + } +} + +// SeekSubscriptionOption is reserved for future options. +type SeekSubscriptionOption interface{} + +// SeekSubscriptionResult is the result of a seek subscription operation. +// Currently empty. +type SeekSubscriptionResult struct{} + +// OperationMetadata stores metadata for long-running operations. +type OperationMetadata struct { + // The target of the operation. For example, targets of seeks are + // subscriptions, structured like: + // "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID" + Target string + + // The verb describing the kind of operation. + Verb string + + // The time the operation was created. + CreateTime time.Time + + // The time the operation finished running. Is zero if the operation has not + // completed. + EndTime time.Time +} + +func protoToOperationMetadata(o *pb.OperationMetadata) (*OperationMetadata, error) { + if err := o.GetCreateTime().CheckValid(); err != nil { + return nil, err + } + metadata := &OperationMetadata{ + Target: o.Target, + Verb: o.Verb, + CreateTime: o.GetCreateTime().AsTime(), + } + if o.GetEndTime() != nil { + if err := o.GetEndTime().CheckValid(); err != nil { + return nil, err + } + metadata.EndTime = o.GetEndTime().AsTime() + } + return metadata, nil +} + +// SeekSubscriptionOperation manages a long-running seek operation from +// AdminClient.SeekSubscription. +type SeekSubscriptionOperation struct { + op *vkit.SeekSubscriptionOperation +} + +// Name returns the path of the seek operation, in the format: +// "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID". +func (s *SeekSubscriptionOperation) Name() string { + return s.op.Name() +} + +// Done returns whether the seek operation has completed. +func (s *SeekSubscriptionOperation) Done() bool { + return s.op.Done() +} + +// Metadata returns metadata associated with the seek operation. To get the +// latest metadata, call this method after a successful call to Wait. +func (s *SeekSubscriptionOperation) Metadata() (*OperationMetadata, error) { + m, err := s.op.Metadata() + if err != nil { + return nil, err + } + return protoToOperationMetadata(m) +} + +// Wait polls until the seek operation is complete and returns one of the +// following: +// - A SeekSubscriptionResult and nil error if the operation is complete and +// succeeded. +// - Error containing failure reason if the operation is complete and failed. +// - Error if polling the operation status failed due to a non-retryable error. +func (s *SeekSubscriptionOperation) Wait(ctx context.Context) (*SeekSubscriptionResult, error) { + if _, err := s.op.Wait(ctx); err != nil { + return nil, err + } + return &SeekSubscriptionResult{}, nil +}