Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): support reservations in AdminClient #4294

Merged
merged 7 commits into from Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
125 changes: 125 additions & 0 deletions pubsublite/admin.go
Expand Up @@ -27,6 +27,7 @@ import (
var (
errNoTopicFieldsUpdated = errors.New("pubsublite: no fields updated for topic")
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
errNoReservationFieldsUpdated = errors.New("pubsublite: no fields updated for reservation")
)

// AdminClient provides admin operations for Pub/Sub Lite resources within a
Expand Down Expand Up @@ -273,6 +274,90 @@ func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *Subscr
}
}

// CreateReservation creates a new reservation from the given config. If the
// reservation already exists an error will be returned.
func (ac *AdminClient) CreateReservation(ctx context.Context, config ReservationConfig) (*ReservationConfig, error) {
reservationPath, err := wire.ParseReservationPath(config.Name)
if err != nil {
return nil, err
}
req := &pb.CreateReservationRequest{
Parent: reservationPath.Location().String(),
Reservation: config.toProto(),
ReservationId: reservationPath.ReservationID,
}
reservationpb, err := ac.admin.CreateReservation(ctx, req)
if err != nil {
return nil, err
}
return protoToReservationConfig(reservationpb), nil
}

// UpdateReservation updates an existing reservation from the given config and
// returns the new reservation config. UpdateReservation returns an error if no
// fields were modified.
func (ac *AdminClient) UpdateReservation(ctx context.Context, config ReservationConfigToUpdate) (*ReservationConfig, error) {
if _, err := wire.ParseReservationPath(config.Name); err != nil {
return nil, err
}
req := config.toUpdateRequest()
if len(req.GetUpdateMask().GetPaths()) == 0 {
return nil, errNoReservationFieldsUpdated
}
respb, err := ac.admin.UpdateReservation(ctx, req)
if err != nil {
return nil, err
}
return protoToReservationConfig(respb), nil
}

// DeleteReservation deletes a reservation. A valid reservation path has the
// format: "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (ac *AdminClient) DeleteReservation(ctx context.Context, reservation string) error {
if _, err := wire.ParseReservationPath(reservation); err != nil {
return err
}
return ac.admin.DeleteReservation(ctx, &pb.DeleteReservationRequest{Name: reservation})
}

// Reservation retrieves the configuration of a reservation. A valid reservation
// name has the format:
// "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (ac *AdminClient) Reservation(ctx context.Context, reservation string) (*ReservationConfig, error) {
if _, err := wire.ParseReservationPath(reservation); err != nil {
return nil, err
}
respb, err := ac.admin.GetReservation(ctx, &pb.GetReservationRequest{Name: reservation})
if err != nil {
return nil, err
}
return protoToReservationConfig(respb), nil
}

// Reservations retrieves the list of reservation configs for a given project
// and region. A valid parent path has the format:
// "projects/PROJECT_ID/locations/REGION".
func (ac *AdminClient) Reservations(ctx context.Context, parent string) *ReservationIterator {
if _, err := wire.ParseLocationPath(parent); err != nil {
return &ReservationIterator{err: err}
}
return &ReservationIterator{
it: ac.admin.ListReservations(ctx, &pb.ListReservationsRequest{Parent: parent}),
}
}

// ReservationTopics retrieves the list of topic paths for a reservation.
// A valid reservation path has the format:
// "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (ac *AdminClient) ReservationTopics(ctx context.Context, reservation string) *TopicPathIterator {
if _, err := wire.ParseReservationPath(reservation); err != nil {
return &TopicPathIterator{err: err}
}
return &TopicPathIterator{
it: ac.admin.ListReservationTopics(ctx, &pb.ListReservationTopicsRequest{Name: reservation}),
}
}

// Close releases any resources held by the client when it is no longer
// required. If the client is available for the lifetime of the program, then
// Close need not be called at exit.
Expand Down Expand Up @@ -319,6 +404,26 @@ func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
return protoToSubscriptionConfig(subspb), nil
}

// TopicPathIterator is an iterator that returns a list of topic paths.
type TopicPathIterator struct {
it *vkit.StringIterator
err error
}

// Next returns the next topic path, which has format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". The
// second return value will be iterator.Done if there are no more topic paths.
func (sp *TopicPathIterator) Next() (string, error) {
if sp.err != nil {
return "", sp.err
}
topicPath, err := sp.it.Next()
if err != nil {
return "", err
}
return topicPath, nil
}

// SubscriptionPathIterator is an iterator that returns a list of subscription
// paths.
type SubscriptionPathIterator struct {
Expand All @@ -340,3 +445,23 @@ func (sp *SubscriptionPathIterator) Next() (string, error) {
}
return subsPath, nil
}

// ReservationIterator is an iterator that returns a list of reservation
// configs.
type ReservationIterator struct {
it *vkit.ReservationIterator
err error
}

// Next returns the next reservation config. The second return value will be
// iterator.Done if there are no more reservation configs.
func (r *ReservationIterator) Next() (*ReservationConfig, error) {
if r.err != nil {
return nil, r.err
}
respb, err := r.it.Next()
if err != nil {
return nil, err
}
return protoToReservationConfig(respb), nil
}
206 changes: 205 additions & 1 deletion pubsublite/admin_test.go
Expand Up @@ -51,13 +51,15 @@ func TestAdminTopicCRUD(t *testing.T) {
SubscribeCapacityMiBPerSec: 4,
PerPartitionBytes: 30 * gibi,
RetentionDuration: 24 * time.Hour,
ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}
updateConfig := TopicConfigToUpdate{
Name: topicPath,
PublishCapacityMiBPerSec: 6,
SubscribeCapacityMiBPerSec: 8,
PerPartitionBytes: 40 * gibi,
RetentionDuration: InfiniteRetention,
ThroughputReservation: "",
}
emptyUpdateConfig := TopicConfigToUpdate{
Name: topicPath,
Expand Down Expand Up @@ -414,6 +416,194 @@ func TestAdminListSubscriptions(t *testing.T) {
}
}

func TestAdminReservationCRUD(t *testing.T) {
ctx := context.Background()

// Inputs
const reservationPath = "projects/my-proj/locations/us-central1/reservations/my-reservation"
reservationConfig := ReservationConfig{
Name: reservationPath,
ThroughputCapacity: 4,
}
updateConfig := ReservationConfigToUpdate{
Name: reservationPath,
ThroughputCapacity: 5,
}
emptyUpdateConfig := ReservationConfigToUpdate{
Name: reservationPath,
}

// Expected requests and fake responses
wantCreateReq := &pb.CreateReservationRequest{
Parent: "projects/my-proj/locations/us-central1",
ReservationId: "my-reservation",
Reservation: reservationConfig.toProto(),
}
wantUpdateReq := updateConfig.toUpdateRequest()
wantGetReq := &pb.GetReservationRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}
wantDeleteReq := &pb.DeleteReservationRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantCreateReq, reservationConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantUpdateReq, reservationConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantGetReq, reservationConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

if gotConfig, err := admin.CreateReservation(ctx, reservationConfig); err != nil {
t.Errorf("CreateReservation() got err: %v", err)
} else if !testutil.Equal(gotConfig, &reservationConfig) {
t.Errorf("CreateReservation() got: %v\nwant: %v", gotConfig, reservationConfig)
}

if gotConfig, err := admin.UpdateReservation(ctx, updateConfig); err != nil {
t.Errorf("UpdateReservation() got err: %v", err)
} else if !testutil.Equal(gotConfig, &reservationConfig) {
t.Errorf("UpdateReservation() got: %v\nwant: %v", gotConfig, reservationConfig)
}

if _, err := admin.UpdateReservation(ctx, emptyUpdateConfig); !test.ErrorEqual(err, errNoReservationFieldsUpdated) {
t.Errorf("UpdateReservation() got err: (%v), want err: (%v)", err, errNoReservationFieldsUpdated)
}

if gotConfig, err := admin.Reservation(ctx, reservationPath); err != nil {
t.Errorf("Reservation() got err: %v", err)
} else if !testutil.Equal(gotConfig, &reservationConfig) {
t.Errorf("Reservation() got: %v\nwant: %v", gotConfig, reservationConfig)
}

if err := admin.DeleteReservation(ctx, reservationPath); err != nil {
t.Errorf("DeleteReservation() got err: %v", err)
}
}

func TestAdminListReservations(t *testing.T) {
ctx := context.Background()

// Inputs
const locationPath = "projects/my-proj/locations/us-central1"
reservationConfig1 := ReservationConfig{
Name: "projects/my-proj/locations/us-central1/reservations/reservation1",
ThroughputCapacity: 1,
}
reservationConfig2 := ReservationConfig{
Name: "projects/my-proj/locations/us-central1/reservations/reservation2",
ThroughputCapacity: 2,
}
reservationConfig3 := ReservationConfig{
Name: "projects/my-proj/locations/us-central1/reservations/reservation3",
ThroughputCapacity: 2,
}

// Expected requests and fake responses
wantListReq1 := &pb.ListReservationsRequest{
Parent: "projects/my-proj/locations/us-central1",
}
listResp1 := &pb.ListReservationsResponse{
Reservations: []*pb.Reservation{reservationConfig1.toProto(), reservationConfig2.toProto()},
NextPageToken: "next_token",
}
wantListReq2 := &pb.ListReservationsRequest{
Parent: "projects/my-proj/locations/us-central1",
PageToken: "next_token",
}
listResp2 := &pb.ListReservationsResponse{
Reservations: []*pb.Reservation{reservationConfig3.toProto()},
}

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantListReq1, listResp1, nil)
verifiers.GlobalVerifier.Push(wantListReq2, listResp2, nil)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotReservationConfigs []*ReservationConfig
reservationIt := admin.Reservations(ctx, locationPath)
for {
reservation, err := reservationIt.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("ReservationIterator.Next() got err: %v", err)
} else {
gotReservationConfigs = append(gotReservationConfigs, reservation)
}
}

wantReservationConfigs := []*ReservationConfig{&reservationConfig1, &reservationConfig2, &reservationConfig3}
if diff := testutil.Diff(gotReservationConfigs, wantReservationConfigs); diff != "" {
t.Errorf("Reservations() got: -, want: +\n%s", diff)
}
}

func TestAdminListReservationTopics(t *testing.T) {
ctx := context.Background()

// Inputs
const (
reservationPath = "projects/my-proj/locations/us-central1/reservations/my-reservation"
topic1 = "projects/my-proj/locations/us-central1-a/topics/topic1"
topic2 = "projects/my-proj/locations/us-central1-a/topics/topic2"
topic3 = "projects/my-proj/locations/us-central1-a/topics/topic3"
)

// Expected requests and fake responses
wantListReq1 := &pb.ListReservationTopicsRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}
listResp1 := &pb.ListReservationTopicsResponse{
Topics: []string{topic1, topic2},
NextPageToken: "next_token",
}
wantListReq2 := &pb.ListReservationTopicsRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
PageToken: "next_token",
}
listResp2 := &pb.ListReservationTopicsResponse{
Topics: []string{topic3},
}

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantListReq1, listResp1, nil)
verifiers.GlobalVerifier.Push(wantListReq2, listResp2, nil)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotTopics []string
topicPathIt := admin.ReservationTopics(ctx, reservationPath)
for {
topicPath, err := topicPathIt.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("TopicPathIterator.Next() got err: %v", err)
} else {
gotTopics = append(gotTopics, topicPath)
}
}

wantTopics := []string{topic1, topic2, topic3}
if !testutil.Equal(gotTopics, wantTopics) {
t.Errorf("ReservationTopics() got: %v\nwant: %v", gotTopics, wantTopics)
}
}

func TestAdminValidateResourcePaths(t *testing.T) {
ctx := context.Background()

Expand All @@ -438,7 +628,13 @@ func TestAdminValidateResourcePaths(t *testing.T) {
t.Errorf("Subscription() should fail")
}
if err := admin.DeleteSubscription(ctx, "INVALID"); err == nil {
t.Errorf("DeleteTopic() should fail")
t.Errorf("DeleteSubscription() should fail")
}
if _, err := admin.Reservation(ctx, "INVALID"); err == nil {
t.Errorf("Reservation() should fail")
}
if err := admin.DeleteReservation(ctx, "INVALID"); err == nil {
t.Errorf("DeleteReservation() should fail")
}

topicIt := admin.Topics(ctx, "INVALID")
Expand All @@ -453,6 +649,14 @@ func TestAdminValidateResourcePaths(t *testing.T) {
if _, err := subsIt.Next(); err == nil {
t.Errorf("SubscriptionIterator.Next() should fail")
}
resIt := admin.Reservations(ctx, "INVALID")
if _, err := resIt.Next(); err == nil {
t.Errorf("ReservationIterator.Next() should fail")
}
topicPathIt := admin.ReservationTopics(ctx, "INVALID")
if _, err := topicPathIt.Next(); err == nil {
t.Errorf("TopicPathIterator.Next() should fail")
}
}

func TestAdminSeekSubscription(t *testing.T) {
Expand Down