Skip to content

Commit

Permalink
feat(pubsublite): support seek subscription in AdminClient (#4316)
Browse files Browse the repository at this point in the history
SeekSubscription performs an out-of-band seek for a subscription to a specified target. SeekSubscriptionOperation allows polling the seek operation.
  • Loading branch information
tmdiep committed Aug 9, 2021
1 parent 0d8993d commit 2dea319
Show file tree
Hide file tree
Showing 4 changed files with 413 additions and 12 deletions.
31 changes: 19 additions & 12 deletions pubsublite/admin.go
Expand Up @@ -29,18 +29,6 @@ var (
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
)

// BacklogLocation refers to a location with respect to the message backlog.
type BacklogLocation int

const (
// End refers to the location past all currently published messages. End
// skips the entire message backlog.
End BacklogLocation = iota + 1

// Beginning refers to the location of the oldest retained message.
Beginning
)

// AdminClient provides admin operations for Pub/Sub Lite resources within a
// Google Cloud region. The zone component of resource paths must be within this
// region. See https://cloud.google.com/pubsub/lite/docs/locations for the list
Expand Down Expand Up @@ -231,6 +219,25 @@ func (ac *AdminClient) UpdateSubscription(ctx context.Context, config Subscripti
return protoToSubscriptionConfig(subspb), nil
}

// SeekSubscription initiates an out-of-band seek for a subscription to a
// specified target, which may be timestamps or named positions within the
// message backlog. A valid subscription path has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
//
// See https://cloud.google.com/pubsub/lite/docs/seek for more information.
func (ac *AdminClient) SeekSubscription(ctx context.Context, subscription string, target SeekTarget, opts ...SeekSubscriptionOption) (*SeekSubscriptionOperation, error) {
if _, err := wire.ParseSubscriptionPath(subscription); err != nil {
return nil, err
}
req := &pb.SeekSubscriptionRequest{Name: subscription}
target.setRequest(req)
op, err := ac.admin.SeekSubscription(ctx, req)
if err != nil {
return nil, err
}
return &SeekSubscriptionOperation{op}, err
}

// DeleteSubscription deletes a subscription. A valid subscription path has the
// format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error {
Expand Down
208 changes: 208 additions & 0 deletions pubsublite/admin_test.go
Expand Up @@ -21,9 +21,14 @@ import (
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

emptypb "github.com/golang/protobuf/ptypes/empty"
tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
lrpb "google.golang.org/genproto/googleapis/longrunning"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)

func newTestAdminClient(t *testing.T) *AdminClient {
Expand Down Expand Up @@ -449,3 +454,206 @@ func TestAdminValidateResourcePaths(t *testing.T) {
t.Errorf("SubscriptionIterator.Next() should fail")
}
}

func TestAdminSeekSubscription(t *testing.T) {
const subscriptionPath = "projects/my-proj/locations/us-central1-a/subscriptions/my-subscription"
const operationPath = "projects/my-proj/locations/us-central1-a/operations/seek-op"
ctx := context.Background()

for _, tc := range []struct {
desc string
target SeekTarget
wantReq *pb.SeekSubscriptionRequest
}{
{
desc: "Beginning",
target: Beginning,
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_NamedTarget_{
NamedTarget: pb.SeekSubscriptionRequest_TAIL,
},
},
},
{
desc: "End",
target: End,
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_NamedTarget_{
NamedTarget: pb.SeekSubscriptionRequest_HEAD,
},
},
},
{
desc: "PublishTime",
target: PublishTime(time.Unix(1234, 0)),
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_TimeTarget{
TimeTarget: &pb.TimeTarget{
Time: &pb.TimeTarget_PublishTime{
PublishTime: &tspb.Timestamp{Seconds: 1234},
},
},
},
},
},
{
desc: "EventTime",
target: EventTime(time.Unix(2345, 0)),
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_TimeTarget{
TimeTarget: &pb.TimeTarget{
Time: &pb.TimeTarget_EventTime{
EventTime: &tspb.Timestamp{Seconds: 2345},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
initialOpResponse := &lrpb.Operation{
Name: operationPath,
Done: false,
Metadata: test.MakeAny(&pb.OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700},
}),
}
wantInitialMetadata := &OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: time.Unix(123456, 700),
}

wantGetOpReq := &lrpb.GetOperationRequest{
Name: operationPath,
}
successOpResponse := &lrpb.Operation{
Name: operationPath,
Done: true,
Metadata: test.MakeAny(&pb.OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700},
EndTime: &tspb.Timestamp{Seconds: 234567, Nanos: 800},
}),
Result: &lrpb.Operation_Response{
Response: test.MakeAny(&pb.SeekSubscriptionResponse{}),
},
}
failedOpResponse := &lrpb.Operation{
Name: operationPath,
Done: true,
Metadata: test.MakeAny(&pb.OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700},
EndTime: &tspb.Timestamp{Seconds: 234567, Nanos: 800},
}),
Result: &lrpb.Operation_Error{
Error: &statuspb.Status{Code: 10},
},
}
wantCompleteMetadata := &OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: time.Unix(123456, 700),
EndTime: time.Unix(234567, 800),
}

seekErr := status.Error(codes.FailedPrecondition, "")

verifiers := test.NewVerifiers(t)
// Seek 1
verifiers.GlobalVerifier.Push(tc.wantReq, initialOpResponse, nil)
verifiers.GlobalVerifier.Push(wantGetOpReq, successOpResponse, nil)
// Seek 2
verifiers.GlobalVerifier.Push(tc.wantReq, initialOpResponse, nil)
verifiers.GlobalVerifier.Push(wantGetOpReq, failedOpResponse, nil)
// Seek 3
verifiers.GlobalVerifier.Push(tc.wantReq, nil, seekErr)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

// Seek 1 - Successful operation.
op, err := admin.SeekSubscription(ctx, subscriptionPath, tc.target)
if err != nil {
t.Fatalf("SeekSubscription() got err: %v", err)
}
if got, want := op.Done(), false; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
if got, want := op.Name(), operationPath; got != want {
t.Errorf("Name() got %v, want %v", got, want)
}
gotMetadata, err := op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantInitialMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

result, err := op.Wait(ctx)
if err != nil {
t.Fatalf("Wait() got err: %v", err)
}
if result == nil {
t.Error("SeekSubscriptionResult was nil")
}
if got, want := op.Done(), true; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
gotMetadata, err = op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantCompleteMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

// Seek 2 - Failed operation.
op, err = admin.SeekSubscription(ctx, subscriptionPath, tc.target)
if err != nil {
t.Fatalf("SeekSubscription() got err: %v", err)
}
if got, want := op.Done(), false; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
if got, want := op.Name(), operationPath; got != want {
t.Errorf("Name() got %v, want %v", got, want)
}
gotMetadata, err = op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantInitialMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

_, gotErr := op.Wait(ctx)
if wantErr := status.Error(codes.Aborted, ""); !test.ErrorEqual(gotErr, wantErr) {
t.Fatalf("Wait() got err: %v, want err: %v", gotErr, wantErr)
}
if got, want := op.Done(), true; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
gotMetadata, err = op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantCompleteMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

// Seek 3 - Failed seek.
if _, gotErr := admin.SeekSubscription(ctx, subscriptionPath, tc.target); !test.ErrorEqual(gotErr, seekErr) {
t.Errorf("SeekSubscription() got err: %v, want err: %v", gotErr, seekErr)
}
})
}
}
27 changes: 27 additions & 0 deletions pubsublite/example_test.go
Expand Up @@ -175,6 +175,33 @@ func ExampleAdminClient_UpdateSubscription() {
}
}

func ExampleAdminClient_SeekSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
}

const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
seekOp, err := admin.SeekSubscription(ctx, subscription, pubsublite.Beginning)
if err != nil {
// TODO: Handle error.
}

// Optional: Wait for the seek operation to complete, which indicates when
// subscribers for all partitions are receiving messages from the seek target.
_, err = seekOp.Wait(ctx)
if err != nil {
// TODO: Handle error.
}
metadata, err := seekOp.Metadata()
if err != nil {
// TODO: Handle error.
}
fmt.Println(metadata)
}

func ExampleAdminClient_DeleteSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
Expand Down

0 comments on commit 2dea319

Please sign in to comment.