Skip to content

Commit

Permalink
feat(pubsublite): support reservations in AdminClient (#4294)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Sep 15, 2021
1 parent 42cdbb9 commit 65b0f88
Show file tree
Hide file tree
Showing 5 changed files with 625 additions and 3 deletions.
125 changes: 125 additions & 0 deletions pubsublite/admin.go
Expand Up @@ -27,6 +27,7 @@ import (
var (
errNoTopicFieldsUpdated = errors.New("pubsublite: no fields updated for topic")
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
errNoReservationFieldsUpdated = errors.New("pubsublite: no fields updated for reservation")
)

// AdminClient provides admin operations for Pub/Sub Lite resources within a
Expand Down Expand Up @@ -273,6 +274,90 @@ func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *Subscr
}
}

// CreateReservation creates a new reservation from the given config. If the
// reservation already exists an error will be returned.
func (ac *AdminClient) CreateReservation(ctx context.Context, config ReservationConfig) (*ReservationConfig, error) {
reservationPath, err := wire.ParseReservationPath(config.Name)
if err != nil {
return nil, err
}
req := &pb.CreateReservationRequest{
Parent: reservationPath.Location().String(),
Reservation: config.toProto(),
ReservationId: reservationPath.ReservationID,
}
reservationpb, err := ac.admin.CreateReservation(ctx, req)
if err != nil {
return nil, err
}
return protoToReservationConfig(reservationpb), nil
}

// UpdateReservation updates an existing reservation from the given config and
// returns the new reservation config. UpdateReservation returns an error if no
// fields were modified.
func (ac *AdminClient) UpdateReservation(ctx context.Context, config ReservationConfigToUpdate) (*ReservationConfig, error) {
if _, err := wire.ParseReservationPath(config.Name); err != nil {
return nil, err
}
req := config.toUpdateRequest()
if len(req.GetUpdateMask().GetPaths()) == 0 {
return nil, errNoReservationFieldsUpdated
}
respb, err := ac.admin.UpdateReservation(ctx, req)
if err != nil {
return nil, err
}
return protoToReservationConfig(respb), nil
}

// DeleteReservation deletes a reservation. A valid reservation path has the
// format: "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (ac *AdminClient) DeleteReservation(ctx context.Context, reservation string) error {
if _, err := wire.ParseReservationPath(reservation); err != nil {
return err
}
return ac.admin.DeleteReservation(ctx, &pb.DeleteReservationRequest{Name: reservation})
}

// Reservation retrieves the configuration of a reservation. A valid reservation
// name has the format:
// "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (ac *AdminClient) Reservation(ctx context.Context, reservation string) (*ReservationConfig, error) {
if _, err := wire.ParseReservationPath(reservation); err != nil {
return nil, err
}
respb, err := ac.admin.GetReservation(ctx, &pb.GetReservationRequest{Name: reservation})
if err != nil {
return nil, err
}
return protoToReservationConfig(respb), nil
}

// Reservations retrieves the list of reservation configs for a given project
// and region. A valid parent path has the format:
// "projects/PROJECT_ID/locations/REGION".
func (ac *AdminClient) Reservations(ctx context.Context, parent string) *ReservationIterator {
if _, err := wire.ParseLocationPath(parent); err != nil {
return &ReservationIterator{err: err}
}
return &ReservationIterator{
it: ac.admin.ListReservations(ctx, &pb.ListReservationsRequest{Parent: parent}),
}
}

// ReservationTopics retrieves the list of topic paths for a reservation.
// A valid reservation path has the format:
// "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (ac *AdminClient) ReservationTopics(ctx context.Context, reservation string) *TopicPathIterator {
if _, err := wire.ParseReservationPath(reservation); err != nil {
return &TopicPathIterator{err: err}
}
return &TopicPathIterator{
it: ac.admin.ListReservationTopics(ctx, &pb.ListReservationTopicsRequest{Name: reservation}),
}
}

// Close releases any resources held by the client when it is no longer
// required. If the client is available for the lifetime of the program, then
// Close need not be called at exit.
Expand Down Expand Up @@ -319,6 +404,26 @@ func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
return protoToSubscriptionConfig(subspb), nil
}

// TopicPathIterator is an iterator that returns a list of topic paths.
type TopicPathIterator struct {
it *vkit.StringIterator
err error
}

// Next returns the next topic path, which has format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". The
// second return value will be iterator.Done if there are no more topic paths.
func (sp *TopicPathIterator) Next() (string, error) {
if sp.err != nil {
return "", sp.err
}
topicPath, err := sp.it.Next()
if err != nil {
return "", err
}
return topicPath, nil
}

// SubscriptionPathIterator is an iterator that returns a list of subscription
// paths.
type SubscriptionPathIterator struct {
Expand All @@ -340,3 +445,23 @@ func (sp *SubscriptionPathIterator) Next() (string, error) {
}
return subsPath, nil
}

// ReservationIterator is an iterator that returns a list of reservation
// configs.
type ReservationIterator struct {
it *vkit.ReservationIterator
err error
}

// Next returns the next reservation config. The second return value will be
// iterator.Done if there are no more reservation configs.
func (r *ReservationIterator) Next() (*ReservationConfig, error) {
if r.err != nil {
return nil, r.err
}
respb, err := r.it.Next()
if err != nil {
return nil, err
}
return protoToReservationConfig(respb), nil
}
206 changes: 205 additions & 1 deletion pubsublite/admin_test.go
Expand Up @@ -51,13 +51,15 @@ func TestAdminTopicCRUD(t *testing.T) {
SubscribeCapacityMiBPerSec: 4,
PerPartitionBytes: 30 * gibi,
RetentionDuration: 24 * time.Hour,
ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}
updateConfig := TopicConfigToUpdate{
Name: topicPath,
PublishCapacityMiBPerSec: 6,
SubscribeCapacityMiBPerSec: 8,
PerPartitionBytes: 40 * gibi,
RetentionDuration: InfiniteRetention,
ThroughputReservation: "",
}
emptyUpdateConfig := TopicConfigToUpdate{
Name: topicPath,
Expand Down Expand Up @@ -414,6 +416,194 @@ func TestAdminListSubscriptions(t *testing.T) {
}
}

func TestAdminReservationCRUD(t *testing.T) {
ctx := context.Background()

// Inputs
const reservationPath = "projects/my-proj/locations/us-central1/reservations/my-reservation"
reservationConfig := ReservationConfig{
Name: reservationPath,
ThroughputCapacity: 4,
}
updateConfig := ReservationConfigToUpdate{
Name: reservationPath,
ThroughputCapacity: 5,
}
emptyUpdateConfig := ReservationConfigToUpdate{
Name: reservationPath,
}

// Expected requests and fake responses
wantCreateReq := &pb.CreateReservationRequest{
Parent: "projects/my-proj/locations/us-central1",
ReservationId: "my-reservation",
Reservation: reservationConfig.toProto(),
}
wantUpdateReq := updateConfig.toUpdateRequest()
wantGetReq := &pb.GetReservationRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}
wantDeleteReq := &pb.DeleteReservationRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantCreateReq, reservationConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantUpdateReq, reservationConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantGetReq, reservationConfig.toProto(), nil)
verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

if gotConfig, err := admin.CreateReservation(ctx, reservationConfig); err != nil {
t.Errorf("CreateReservation() got err: %v", err)
} else if !testutil.Equal(gotConfig, &reservationConfig) {
t.Errorf("CreateReservation() got: %v\nwant: %v", gotConfig, reservationConfig)
}

if gotConfig, err := admin.UpdateReservation(ctx, updateConfig); err != nil {
t.Errorf("UpdateReservation() got err: %v", err)
} else if !testutil.Equal(gotConfig, &reservationConfig) {
t.Errorf("UpdateReservation() got: %v\nwant: %v", gotConfig, reservationConfig)
}

if _, err := admin.UpdateReservation(ctx, emptyUpdateConfig); !test.ErrorEqual(err, errNoReservationFieldsUpdated) {
t.Errorf("UpdateReservation() got err: (%v), want err: (%v)", err, errNoReservationFieldsUpdated)
}

if gotConfig, err := admin.Reservation(ctx, reservationPath); err != nil {
t.Errorf("Reservation() got err: %v", err)
} else if !testutil.Equal(gotConfig, &reservationConfig) {
t.Errorf("Reservation() got: %v\nwant: %v", gotConfig, reservationConfig)
}

if err := admin.DeleteReservation(ctx, reservationPath); err != nil {
t.Errorf("DeleteReservation() got err: %v", err)
}
}

func TestAdminListReservations(t *testing.T) {
ctx := context.Background()

// Inputs
const locationPath = "projects/my-proj/locations/us-central1"
reservationConfig1 := ReservationConfig{
Name: "projects/my-proj/locations/us-central1/reservations/reservation1",
ThroughputCapacity: 1,
}
reservationConfig2 := ReservationConfig{
Name: "projects/my-proj/locations/us-central1/reservations/reservation2",
ThroughputCapacity: 2,
}
reservationConfig3 := ReservationConfig{
Name: "projects/my-proj/locations/us-central1/reservations/reservation3",
ThroughputCapacity: 2,
}

// Expected requests and fake responses
wantListReq1 := &pb.ListReservationsRequest{
Parent: "projects/my-proj/locations/us-central1",
}
listResp1 := &pb.ListReservationsResponse{
Reservations: []*pb.Reservation{reservationConfig1.toProto(), reservationConfig2.toProto()},
NextPageToken: "next_token",
}
wantListReq2 := &pb.ListReservationsRequest{
Parent: "projects/my-proj/locations/us-central1",
PageToken: "next_token",
}
listResp2 := &pb.ListReservationsResponse{
Reservations: []*pb.Reservation{reservationConfig3.toProto()},
}

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantListReq1, listResp1, nil)
verifiers.GlobalVerifier.Push(wantListReq2, listResp2, nil)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotReservationConfigs []*ReservationConfig
reservationIt := admin.Reservations(ctx, locationPath)
for {
reservation, err := reservationIt.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("ReservationIterator.Next() got err: %v", err)
} else {
gotReservationConfigs = append(gotReservationConfigs, reservation)
}
}

wantReservationConfigs := []*ReservationConfig{&reservationConfig1, &reservationConfig2, &reservationConfig3}
if diff := testutil.Diff(gotReservationConfigs, wantReservationConfigs); diff != "" {
t.Errorf("Reservations() got: -, want: +\n%s", diff)
}
}

func TestAdminListReservationTopics(t *testing.T) {
ctx := context.Background()

// Inputs
const (
reservationPath = "projects/my-proj/locations/us-central1/reservations/my-reservation"
topic1 = "projects/my-proj/locations/us-central1-a/topics/topic1"
topic2 = "projects/my-proj/locations/us-central1-a/topics/topic2"
topic3 = "projects/my-proj/locations/us-central1-a/topics/topic3"
)

// Expected requests and fake responses
wantListReq1 := &pb.ListReservationTopicsRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
}
listResp1 := &pb.ListReservationTopicsResponse{
Topics: []string{topic1, topic2},
NextPageToken: "next_token",
}
wantListReq2 := &pb.ListReservationTopicsRequest{
Name: "projects/my-proj/locations/us-central1/reservations/my-reservation",
PageToken: "next_token",
}
listResp2 := &pb.ListReservationTopicsResponse{
Topics: []string{topic3},
}

verifiers := test.NewVerifiers(t)
verifiers.GlobalVerifier.Push(wantListReq1, listResp1, nil)
verifiers.GlobalVerifier.Push(wantListReq2, listResp2, nil)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

admin := newTestAdminClient(t)
defer admin.Close()

var gotTopics []string
topicPathIt := admin.ReservationTopics(ctx, reservationPath)
for {
topicPath, err := topicPathIt.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("TopicPathIterator.Next() got err: %v", err)
} else {
gotTopics = append(gotTopics, topicPath)
}
}

wantTopics := []string{topic1, topic2, topic3}
if !testutil.Equal(gotTopics, wantTopics) {
t.Errorf("ReservationTopics() got: %v\nwant: %v", gotTopics, wantTopics)
}
}

func TestAdminValidateResourcePaths(t *testing.T) {
ctx := context.Background()

Expand All @@ -438,7 +628,13 @@ func TestAdminValidateResourcePaths(t *testing.T) {
t.Errorf("Subscription() should fail")
}
if err := admin.DeleteSubscription(ctx, "INVALID"); err == nil {
t.Errorf("DeleteTopic() should fail")
t.Errorf("DeleteSubscription() should fail")
}
if _, err := admin.Reservation(ctx, "INVALID"); err == nil {
t.Errorf("Reservation() should fail")
}
if err := admin.DeleteReservation(ctx, "INVALID"); err == nil {
t.Errorf("DeleteReservation() should fail")
}

topicIt := admin.Topics(ctx, "INVALID")
Expand All @@ -453,6 +649,14 @@ func TestAdminValidateResourcePaths(t *testing.T) {
if _, err := subsIt.Next(); err == nil {
t.Errorf("SubscriptionIterator.Next() should fail")
}
resIt := admin.Reservations(ctx, "INVALID")
if _, err := resIt.Next(); err == nil {
t.Errorf("ReservationIterator.Next() should fail")
}
topicPathIt := admin.ReservationTopics(ctx, "INVALID")
if _, err := topicPathIt.Next(); err == nil {
t.Errorf("TopicPathIterator.Next() should fail")
}
}

func TestAdminSeekSubscription(t *testing.T) {
Expand Down

0 comments on commit 65b0f88

Please sign in to comment.