Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): support seek subscription in AdminClient #4316

Merged
merged 12 commits into from Aug 9, 2021
31 changes: 19 additions & 12 deletions pubsublite/admin.go
Expand Up @@ -29,18 +29,6 @@ var (
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
)

// BacklogLocation refers to a location with respect to the message backlog.
type BacklogLocation int

const (
// End refers to the location past all currently published messages. End
// skips the entire message backlog.
End BacklogLocation = iota + 1

// Beginning refers to the location of the oldest retained message.
Beginning
)

// AdminClient provides admin operations for Pub/Sub Lite resources within a
// Google Cloud region. The zone component of resource paths must be within this
// region. See https://cloud.google.com/pubsub/lite/docs/locations for the list
Expand Down Expand Up @@ -231,6 +219,25 @@ func (ac *AdminClient) UpdateSubscription(ctx context.Context, config Subscripti
return protoToSubscriptionConfig(subspb), nil
}

// SeekSubscription initiates an out-of-band seek for a subscription to a
// specified target, which may be timestamps or named positions within the
// message backlog. A valid subscription path has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
//
// See https://cloud.google.com/pubsub/lite/docs/seek for more information.
func (ac *AdminClient) SeekSubscription(ctx context.Context, subscription string, target SeekTarget, opts ...SeekSubscriptionOption) (*SeekSubscriptionOperation, error) {
if _, err := wire.ParseSubscriptionPath(subscription); err != nil {
return nil, err
}
req := &pb.SeekSubscriptionRequest{Name: subscription}
target.setRequest(req)
op, err := ac.admin.SeekSubscription(ctx, req)
if err != nil {
return nil, err
}
return &SeekSubscriptionOperation{op}, err
}

// DeleteSubscription deletes a subscription. A valid subscription path has the
// format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error {
Expand Down
203 changes: 203 additions & 0 deletions pubsublite/admin_test.go
Expand Up @@ -21,9 +21,14 @@ import (
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

emptypb "github.com/golang/protobuf/ptypes/empty"
tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
lrpb "google.golang.org/genproto/googleapis/longrunning"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)

func newTestAdminClient(t *testing.T) *AdminClient {
Expand Down Expand Up @@ -449,3 +454,201 @@ func TestAdminValidateResourcePaths(t *testing.T) {
t.Errorf("SubscriptionIterator.Next() should fail")
}
}

func TestAdminSeekSubscription(t *testing.T) {
const subscriptionPath = "projects/my-proj/locations/us-central1-a/subscriptions/my-subscription"
const operationPath = "projects/my-proj/locations/us-central1-a/operations/seek-op"
ctx := context.Background()

for _, tc := range []struct {
desc string
target SeekTarget
wantReq *pb.SeekSubscriptionRequest
}{
{
desc: "Beginning",
target: Beginning,
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_NamedTarget_{
NamedTarget: pb.SeekSubscriptionRequest_TAIL,
},
},
},
{
desc: "End",
target: End,
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_NamedTarget_{
NamedTarget: pb.SeekSubscriptionRequest_HEAD,
},
},
},
{
desc: "PublishTime",
target: PublishTime(time.Unix(1234, 0)),
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_TimeTarget{
TimeTarget: &pb.TimeTarget{
Time: &pb.TimeTarget_PublishTime{
PublishTime: &tspb.Timestamp{Seconds: 1234},
},
},
},
},
},
{
desc: "EventTime",
target: EventTime(time.Unix(2345, 0)),
wantReq: &pb.SeekSubscriptionRequest{
Name: subscriptionPath,
Target: &pb.SeekSubscriptionRequest_TimeTarget{
TimeTarget: &pb.TimeTarget{
Time: &pb.TimeTarget_EventTime{
EventTime: &tspb.Timestamp{Seconds: 2345},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
initialOpResponse := &lrpb.Operation{
Name: operationPath,
Done: false,
Metadata: test.MakeAny(&pb.OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700},
}),
}
wantInitialMetadata := &OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: time.Unix(123456, 700),
}

wantGetOpReq := &lrpb.GetOperationRequest{
Name: operationPath,
}
successOpResponse := &lrpb.Operation{
Name: operationPath,
Done: true,
Metadata: test.MakeAny(&pb.OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700},
EndTime: &tspb.Timestamp{Seconds: 234567, Nanos: 800},
}),
Result: &lrpb.Operation_Response{
Response: test.MakeAny(&pb.SeekSubscriptionResponse{}),
},
}
failedOpResponse := &lrpb.Operation{
Name: operationPath,
Done: true,
Metadata: test.MakeAny(&pb.OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: &tspb.Timestamp{Seconds: 123456, Nanos: 700},
EndTime: &tspb.Timestamp{Seconds: 234567, Nanos: 800},
}),
Result: &lrpb.Operation_Error{
Error: &statuspb.Status{Code: 10},
},
}
wantCompleteMetadata := &OperationMetadata{
Target: subscriptionPath,
Verb: "seek",
CreateTime: time.Unix(123456, 700),
EndTime: time.Unix(234567, 800),
}

seekErr := status.Error(codes.FailedPrecondition, "")

verifiers := test.NewVerifiers(t)
// Seek 1
verifiers.GlobalVerifier.Push(tc.wantReq, initialOpResponse, nil)
verifiers.GlobalVerifier.Push(wantGetOpReq, successOpResponse, nil)
// Seek 2
verifiers.GlobalVerifier.Push(tc.wantReq, initialOpResponse, nil)
verifiers.GlobalVerifier.Push(wantGetOpReq, failedOpResponse, nil)
// Seek 3
verifiers.GlobalVerifier.Push(tc.wantReq, nil, seekErr)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

// Seek 1 - Successful operation.
op, err := admin.SeekSubscription(ctx, subscriptionPath, tc.target)
if err != nil {
t.Fatalf("SeekSubscription() got err: %v", err)
}
if got, want := op.Done(), false; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
if got, want := op.Name(), operationPath; got != want {
t.Errorf("Name() got %v, want %v", got, want)
}
gotMetadata, err := op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantInitialMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

if err := op.Wait(ctx); err != nil {
t.Fatalf("Wait() got err: %v", err)
}
if got, want := op.Done(), true; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
gotMetadata, err = op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantCompleteMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

// Seek 2 - Failed operation.
op, err = admin.SeekSubscription(ctx, subscriptionPath, tc.target)
if err != nil {
t.Fatalf("SeekSubscription() got err: %v", err)
}
if got, want := op.Done(), false; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
if got, want := op.Name(), operationPath; got != want {
t.Errorf("Name() got %v, want %v", got, want)
}
gotMetadata, err = op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantInitialMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

if gotErr, wantErr := op.Wait(ctx), status.Error(codes.Aborted, ""); !test.ErrorEqual(gotErr, wantErr) {
t.Fatalf("Wait() got err: %v, want err: %v", gotErr, wantErr)
}
if got, want := op.Done(), true; got != want {
t.Errorf("Done() got %v, want %v", got, want)
}
gotMetadata, err = op.Metadata()
if err != nil {
t.Errorf("Metadata() got err: %v", err)
} else if diff := testutil.Diff(gotMetadata, wantCompleteMetadata); diff != "" {
t.Errorf("Metadata() got: -, want: +\n%s", diff)
}

// Seek 3 - Failed seek.
if _, gotErr := admin.SeekSubscription(ctx, subscriptionPath, tc.target); !test.ErrorEqual(gotErr, seekErr) {
t.Errorf("SeekSubscription() got err: %v, want err: %v", gotErr, seekErr)
}
})
}
}
27 changes: 27 additions & 0 deletions pubsublite/example_test.go
Expand Up @@ -175,6 +175,33 @@ func ExampleAdminClient_UpdateSubscription() {
}
}

func ExampleAdminClient_SeekSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
}

const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
seekOp, err := admin.SeekSubscription(ctx, subscription, pubsublite.Beginning)
if err != nil {
// TODO: Handle error.
}

// Optional: Wait for the seek operation to complete, which indicates when
// subscribers for all partitions are receiving messages from the seek target.
err = seekOp.Wait(ctx)
if err != nil {
// TODO: Handle error.
}
metadata, err := seekOp.Metadata()
if err != nil {
// TODO: Handle error.
}
fmt.Println(metadata)
}

func ExampleAdminClient_DeleteSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
Expand Down