diff --git a/pubsublite/admin.go b/pubsublite/admin.go index da425a267d9..9b59a52369c 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -27,6 +27,7 @@ import ( var ( errNoTopicFieldsUpdated = errors.New("pubsublite: no fields updated for topic") errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription") + errNoReservationFieldsUpdated = errors.New("pubsublite: no fields updated for reservation") ) // AdminClient provides admin operations for Pub/Sub Lite resources within a @@ -273,6 +274,90 @@ func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *Subscr } } +// CreateReservation creates a new reservation from the given config. If the +// reservation already exists an error will be returned. +func (ac *AdminClient) CreateReservation(ctx context.Context, config ReservationConfig) (*ReservationConfig, error) { + reservationPath, err := wire.ParseReservationPath(config.Name) + if err != nil { + return nil, err + } + req := &pb.CreateReservationRequest{ + Parent: reservationPath.Location().String(), + Reservation: config.toProto(), + ReservationId: reservationPath.ReservationID, + } + reservationpb, err := ac.admin.CreateReservation(ctx, req) + if err != nil { + return nil, err + } + return protoToReservationConfig(reservationpb), nil +} + +// UpdateReservation updates an existing reservation from the given config and +// returns the new reservation config. UpdateReservation returns an error if no +// fields were modified. +func (ac *AdminClient) UpdateReservation(ctx context.Context, config ReservationConfigToUpdate) (*ReservationConfig, error) { + if _, err := wire.ParseReservationPath(config.Name); err != nil { + return nil, err + } + req := config.toUpdateRequest() + if len(req.GetUpdateMask().GetPaths()) == 0 { + return nil, errNoReservationFieldsUpdated + } + respb, err := ac.admin.UpdateReservation(ctx, req) + if err != nil { + return nil, err + } + return protoToReservationConfig(respb), nil +} + +// DeleteReservation deletes a reservation. A valid reservation path has the +// format: "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". +func (ac *AdminClient) DeleteReservation(ctx context.Context, reservation string) error { + if _, err := wire.ParseReservationPath(reservation); err != nil { + return err + } + return ac.admin.DeleteReservation(ctx, &pb.DeleteReservationRequest{Name: reservation}) +} + +// Reservation retrieves the configuration of a reservation. A valid reservation +// name has the format: +// "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". +func (ac *AdminClient) Reservation(ctx context.Context, reservation string) (*ReservationConfig, error) { + if _, err := wire.ParseReservationPath(reservation); err != nil { + return nil, err + } + respb, err := ac.admin.GetReservation(ctx, &pb.GetReservationRequest{Name: reservation}) + if err != nil { + return nil, err + } + return protoToReservationConfig(respb), nil +} + +// Reservations retrieves the list of reservation configs for a given project +// and region. A valid parent path has the format: +// "projects/PROJECT_ID/locations/REGION". +func (ac *AdminClient) Reservations(ctx context.Context, parent string) *ReservationIterator { + if _, err := wire.ParseLocationPath(parent); err != nil { + return &ReservationIterator{err: err} + } + return &ReservationIterator{ + it: ac.admin.ListReservations(ctx, &pb.ListReservationsRequest{Parent: parent}), + } +} + +// ReservationTopics retrieves the list of topic paths for a reservation. +// A valid reservation path has the format: +// "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". +func (ac *AdminClient) ReservationTopics(ctx context.Context, reservation string) *TopicPathIterator { + if _, err := wire.ParseReservationPath(reservation); err != nil { + return &TopicPathIterator{err: err} + } + return &TopicPathIterator{ + it: ac.admin.ListReservationTopics(ctx, &pb.ListReservationTopicsRequest{Name: reservation}), + } +} + // Close releases any resources held by the client when it is no longer // required. If the client is available for the lifetime of the program, then // Close need not be called at exit. @@ -319,6 +404,26 @@ func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) { return protoToSubscriptionConfig(subspb), nil } +// TopicPathIterator is an iterator that returns a list of topic paths. +type TopicPathIterator struct { + it *vkit.StringIterator + err error +} + +// Next returns the next topic path, which has format: +// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". The +// second return value will be iterator.Done if there are no more topic paths. +func (sp *TopicPathIterator) Next() (string, error) { + if sp.err != nil { + return "", sp.err + } + topicPath, err := sp.it.Next() + if err != nil { + return "", err + } + return topicPath, nil +} + // SubscriptionPathIterator is an iterator that returns a list of subscription // paths. type SubscriptionPathIterator struct { @@ -340,3 +445,23 @@ func (sp *SubscriptionPathIterator) Next() (string, error) { } return subsPath, nil } + +// ReservationIterator is an iterator that returns a list of reservation +// configs. +type ReservationIterator struct { + it *vkit.ReservationIterator + err error +} + +// Next returns the next reservation config. The second return value will be +// iterator.Done if there are no more reservation configs. +func (r *ReservationIterator) Next() (*ReservationConfig, error) { + if r.err != nil { + return nil, r.err + } + respb, err := r.it.Next() + if err != nil { + return nil, err + } + return protoToReservationConfig(respb), nil +} diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index 8cf493f9e7f..1fb22fa77dd 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -51,6 +51,7 @@ func TestAdminTopicCRUD(t *testing.T) { SubscribeCapacityMiBPerSec: 4, PerPartitionBytes: 30 * gibi, RetentionDuration: 24 * time.Hour, + ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation", } updateConfig := TopicConfigToUpdate{ Name: topicPath, @@ -58,6 +59,7 @@ func TestAdminTopicCRUD(t *testing.T) { SubscribeCapacityMiBPerSec: 8, PerPartitionBytes: 40 * gibi, RetentionDuration: InfiniteRetention, + ThroughputReservation: "", } emptyUpdateConfig := TopicConfigToUpdate{ Name: topicPath, @@ -414,6 +416,194 @@ func TestAdminListSubscriptions(t *testing.T) { } } +func TestAdminReservationCRUD(t *testing.T) { + ctx := context.Background() + + // Inputs + const reservationPath = "projects/my-proj/locations/us-central1/reservations/my-reservation" + reservationConfig := ReservationConfig{ + Name: reservationPath, + ThroughputCapacity: 4, + } + updateConfig := ReservationConfigToUpdate{ + Name: reservationPath, + ThroughputCapacity: 5, + } + emptyUpdateConfig := ReservationConfigToUpdate{ + Name: reservationPath, + } + + // Expected requests and fake responses + wantCreateReq := &pb.CreateReservationRequest{ + Parent: "projects/my-proj/locations/us-central1", + ReservationId: "my-reservation", + Reservation: reservationConfig.toProto(), + } + wantUpdateReq := updateConfig.toUpdateRequest() + wantGetReq := &pb.GetReservationRequest{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + } + wantDeleteReq := &pb.DeleteReservationRequest{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + } + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(wantCreateReq, reservationConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantUpdateReq, reservationConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantGetReq, reservationConfig.toProto(), nil) + verifiers.GlobalVerifier.Push(wantDeleteReq, &emptypb.Empty{}, nil) + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + admin := newTestAdminClient(t) + defer admin.Close() + + if gotConfig, err := admin.CreateReservation(ctx, reservationConfig); err != nil { + t.Errorf("CreateReservation() got err: %v", err) + } else if !testutil.Equal(gotConfig, &reservationConfig) { + t.Errorf("CreateReservation() got: %v\nwant: %v", gotConfig, reservationConfig) + } + + if gotConfig, err := admin.UpdateReservation(ctx, updateConfig); err != nil { + t.Errorf("UpdateReservation() got err: %v", err) + } else if !testutil.Equal(gotConfig, &reservationConfig) { + t.Errorf("UpdateReservation() got: %v\nwant: %v", gotConfig, reservationConfig) + } + + if _, err := admin.UpdateReservation(ctx, emptyUpdateConfig); !test.ErrorEqual(err, errNoReservationFieldsUpdated) { + t.Errorf("UpdateReservation() got err: (%v), want err: (%v)", err, errNoReservationFieldsUpdated) + } + + if gotConfig, err := admin.Reservation(ctx, reservationPath); err != nil { + t.Errorf("Reservation() got err: %v", err) + } else if !testutil.Equal(gotConfig, &reservationConfig) { + t.Errorf("Reservation() got: %v\nwant: %v", gotConfig, reservationConfig) + } + + if err := admin.DeleteReservation(ctx, reservationPath); err != nil { + t.Errorf("DeleteReservation() got err: %v", err) + } +} + +func TestAdminListReservations(t *testing.T) { + ctx := context.Background() + + // Inputs + const locationPath = "projects/my-proj/locations/us-central1" + reservationConfig1 := ReservationConfig{ + Name: "projects/my-proj/locations/us-central1/reservations/reservation1", + ThroughputCapacity: 1, + } + reservationConfig2 := ReservationConfig{ + Name: "projects/my-proj/locations/us-central1/reservations/reservation2", + ThroughputCapacity: 2, + } + reservationConfig3 := ReservationConfig{ + Name: "projects/my-proj/locations/us-central1/reservations/reservation3", + ThroughputCapacity: 2, + } + + // Expected requests and fake responses + wantListReq1 := &pb.ListReservationsRequest{ + Parent: "projects/my-proj/locations/us-central1", + } + listResp1 := &pb.ListReservationsResponse{ + Reservations: []*pb.Reservation{reservationConfig1.toProto(), reservationConfig2.toProto()}, + NextPageToken: "next_token", + } + wantListReq2 := &pb.ListReservationsRequest{ + Parent: "projects/my-proj/locations/us-central1", + PageToken: "next_token", + } + listResp2 := &pb.ListReservationsResponse{ + Reservations: []*pb.Reservation{reservationConfig3.toProto()}, + } + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(wantListReq1, listResp1, nil) + verifiers.GlobalVerifier.Push(wantListReq2, listResp2, nil) + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + admin := newTestAdminClient(t) + defer admin.Close() + + var gotReservationConfigs []*ReservationConfig + reservationIt := admin.Reservations(ctx, locationPath) + for { + reservation, err := reservationIt.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Errorf("ReservationIterator.Next() got err: %v", err) + } else { + gotReservationConfigs = append(gotReservationConfigs, reservation) + } + } + + wantReservationConfigs := []*ReservationConfig{&reservationConfig1, &reservationConfig2, &reservationConfig3} + if diff := testutil.Diff(gotReservationConfigs, wantReservationConfigs); diff != "" { + t.Errorf("Reservations() got: -, want: +\n%s", diff) + } +} + +func TestAdminListReservationTopics(t *testing.T) { + ctx := context.Background() + + // Inputs + const ( + reservationPath = "projects/my-proj/locations/us-central1/reservations/my-reservation" + topic1 = "projects/my-proj/locations/us-central1-a/topics/topic1" + topic2 = "projects/my-proj/locations/us-central1-a/topics/topic2" + topic3 = "projects/my-proj/locations/us-central1-a/topics/topic3" + ) + + // Expected requests and fake responses + wantListReq1 := &pb.ListReservationTopicsRequest{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + } + listResp1 := &pb.ListReservationTopicsResponse{ + Topics: []string{topic1, topic2}, + NextPageToken: "next_token", + } + wantListReq2 := &pb.ListReservationTopicsRequest{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + PageToken: "next_token", + } + listResp2 := &pb.ListReservationTopicsResponse{ + Topics: []string{topic3}, + } + + verifiers := test.NewVerifiers(t) + verifiers.GlobalVerifier.Push(wantListReq1, listResp1, nil) + verifiers.GlobalVerifier.Push(wantListReq2, listResp2, nil) + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + admin := newTestAdminClient(t) + defer admin.Close() + + var gotTopics []string + topicPathIt := admin.ReservationTopics(ctx, reservationPath) + for { + topicPath, err := topicPathIt.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Errorf("TopicPathIterator.Next() got err: %v", err) + } else { + gotTopics = append(gotTopics, topicPath) + } + } + + wantTopics := []string{topic1, topic2, topic3} + if !testutil.Equal(gotTopics, wantTopics) { + t.Errorf("ReservationTopics() got: %v\nwant: %v", gotTopics, wantTopics) + } +} + func TestAdminValidateResourcePaths(t *testing.T) { ctx := context.Background() @@ -438,7 +628,13 @@ func TestAdminValidateResourcePaths(t *testing.T) { t.Errorf("Subscription() should fail") } if err := admin.DeleteSubscription(ctx, "INVALID"); err == nil { - t.Errorf("DeleteTopic() should fail") + t.Errorf("DeleteSubscription() should fail") + } + if _, err := admin.Reservation(ctx, "INVALID"); err == nil { + t.Errorf("Reservation() should fail") + } + if err := admin.DeleteReservation(ctx, "INVALID"); err == nil { + t.Errorf("DeleteReservation() should fail") } topicIt := admin.Topics(ctx, "INVALID") @@ -453,6 +649,14 @@ func TestAdminValidateResourcePaths(t *testing.T) { if _, err := subsIt.Next(); err == nil { t.Errorf("SubscriptionIterator.Next() should fail") } + resIt := admin.Reservations(ctx, "INVALID") + if _, err := resIt.Next(); err == nil { + t.Errorf("ReservationIterator.Next() should fail") + } + topicPathIt := admin.ReservationTopics(ctx, "INVALID") + if _, err := topicPathIt.Next(); err == nil { + t.Errorf("TopicPathIterator.Next() should fail") + } } func TestAdminSeekSubscription(t *testing.T) { diff --git a/pubsublite/config.go b/pubsublite/config.go index 4e520cabb5c..000d3929e81 100644 --- a/pubsublite/config.go +++ b/pubsublite/config.go @@ -68,6 +68,11 @@ type TopicConfig struct { // messages will be retained as long as the bytes retained for each partition // is below `PerPartitionBytes`. Otherwise, must be > 0. RetentionDuration time.Duration + + // The path of the reservation to use for this topic's throughput capacity, in + // the format: + // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". + ThroughputReservation string } func (tc *TopicConfig) toProto() *pb.Topic { @@ -89,6 +94,11 @@ func (tc *TopicConfig) toProto() *pb.Topic { if tc.RetentionDuration >= 0 { topicpb.RetentionConfig.Period = ptypes.DurationProto(tc.RetentionDuration) } + if len(tc.ThroughputReservation) > 0 { + topicpb.ReservationConfig = &pb.Topic_ReservationConfig{ + ThroughputReservation: tc.ThroughputReservation, + } + } return topicpb } @@ -102,6 +112,7 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()), PerPartitionBytes: retentionCfg.GetPerPartitionBytes(), RetentionDuration: InfiniteRetention, + ThroughputReservation: t.GetReservationConfig().GetThroughputReservation(), } // An unset retention period proto denotes "infinite retention". if retentionCfg.Period != nil { @@ -141,6 +152,11 @@ type TopicConfigToUpdate struct { // clear a retention duration (i.e. retain messages as long as there is // available storage), set this to `InfiniteRetention`. RetentionDuration optional.Duration + + // The path of the reservation to use for this topic's throughput capacity, in + // the format: + // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". + ThroughputReservation optional.String } func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { @@ -181,6 +197,12 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { updatedTopic.RetentionConfig.Period = ptypes.DurationProto(duration) } } + if tc.ThroughputReservation != nil { + fields = append(fields, "reservation_config.throughput_reservation") + updatedTopic.ReservationConfig = &pb.Topic_ReservationConfig{ + ThroughputReservation: optional.ToString(tc.ThroughputReservation), + } + } return &pb.UpdateTopicRequest{ Topic: updatedTopic, @@ -287,3 +309,69 @@ func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRe UpdateMask: &fmpb.FieldMask{Paths: fields}, } } + +// ReservationConfig describes the properties of a Pub/Sub Lite reservation. +type ReservationConfig struct { + // The full path of the reservation, in the format: + // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". + // + // - PROJECT_ID: The project ID (e.g. "my-project") or the project number + // (e.g. "987654321") can be provided. + // - REGION: The Google Cloud region (e.g. "us-central1") for the reservation. + // See https://cloud.google.com/pubsub/lite/docs/locations for the list of + // regions where Pub/Sub Lite is available. + // - RESERVATION_ID: The ID of the reservation (e.g. "my-reservation"). See + // https://cloud.google.com/pubsub/docs/admin#resource_names for information + // about valid reservation IDs. + Name string + + // The reserved throughput capacity. Every unit of throughput capacity is + // equivalent to 1 MiB/s of published messages or 2 MiB/s of subscribed + // messages. + // + // Any topics which are declared as using capacity from a reservation will + // consume resources from this reservation instead of being charged + // individually. + ThroughputCapacity int +} + +func (rc *ReservationConfig) toProto() *pb.Reservation { + return &pb.Reservation{ + Name: rc.Name, + ThroughputCapacity: int64(rc.ThroughputCapacity), + } +} + +func protoToReservationConfig(r *pb.Reservation) *ReservationConfig { + return &ReservationConfig{ + Name: r.GetName(), + ThroughputCapacity: int(r.GetThroughputCapacity()), + } +} + +// ReservationConfigToUpdate specifies the properties to update for a +// reservation. +type ReservationConfigToUpdate struct { + // The full path of the reservation to update, in the format: + // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". + // Required. + Name string + + // If non-zero, updates the throughput capacity. + ThroughputCapacity int +} + +func (rc *ReservationConfigToUpdate) toUpdateRequest() *pb.UpdateReservationRequest { + var fields []string + if rc.ThroughputCapacity > 0 { + fields = append(fields, "throughput_capacity") + } + + return &pb.UpdateReservationRequest{ + Reservation: &pb.Reservation{ + Name: rc.Name, + ThroughputCapacity: int64(rc.ThroughputCapacity), + }, + UpdateMask: &fmpb.FieldMask{Paths: fields}, + } +} diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go index 5babd00bccd..314fc513caf 100644 --- a/pubsublite/config_test.go +++ b/pubsublite/config_test.go @@ -32,7 +32,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) { wantConfig *TopicConfig }{ { - desc: "retention duration set", + desc: "all fields set", topicpb: &pb.Topic{ Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", PartitionConfig: &pb.Topic_PartitionConfig{ @@ -51,6 +51,9 @@ func TestTopicConfigToProtoConversion(t *testing.T) { Nanos: 600, }, }, + ReservationConfig: &pb.Topic_ReservationConfig{ + ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation", + }, }, wantConfig: &TopicConfig{ Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", @@ -59,10 +62,11 @@ func TestTopicConfigToProtoConversion(t *testing.T) { SubscribeCapacityMiBPerSec: 16, PerPartitionBytes: 1073741824, RetentionDuration: time.Duration(86400*1e9 + 600), + ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation", }, }, { - desc: "retention duration unset", + desc: "optional fields unset", topicpb: &pb.Topic{ Name: "projects/my-proj/locations/europe-west1-b/topics/my-topic", PartitionConfig: &pb.Topic_PartitionConfig{ @@ -117,6 +121,7 @@ func TestTopicUpdateRequest(t *testing.T) { SubscribeCapacityMiBPerSec: 12, PerPartitionBytes: 500000, RetentionDuration: time.Duration(0), + ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation", }, want: &pb.UpdateTopicRequest{ Topic: &pb.Topic{ @@ -134,6 +139,9 @@ func TestTopicUpdateRequest(t *testing.T) { PerPartitionBytes: 500000, Period: &dpb.Duration{}, }, + ReservationConfig: &pb.Topic_ReservationConfig{ + ThroughputReservation: "projects/my-proj/locations/us-central1/reservations/my-reservation", + }, }, UpdateMask: &fmpb.FieldMask{ Paths: []string{ @@ -142,6 +150,7 @@ func TestTopicUpdateRequest(t *testing.T) { "partition_config.capacity.subscribe_mib_per_sec", "retention_config.per_partition_bytes", "retention_config.period", + "reservation_config.throughput_reservation", }, }, }, @@ -169,6 +178,32 @@ func TestTopicUpdateRequest(t *testing.T) { }, }, }, + { + desc: "clear throughput reservation", + config: &TopicConfigToUpdate{ + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", + ThroughputReservation: "", + }, + want: &pb.UpdateTopicRequest{ + Topic: &pb.Topic{ + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", + PartitionConfig: &pb.Topic_PartitionConfig{ + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{}, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{}, + ReservationConfig: &pb.Topic_ReservationConfig{ + ThroughputReservation: "", + }, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{ + "reservation_config.throughput_reservation", + }, + }, + }, + }, { desc: "no fields set", config: &TopicConfigToUpdate{ @@ -291,3 +326,80 @@ func TestSubscriptionUpdateRequest(t *testing.T) { }) } } + +func TestReservationConfigToProtoConversion(t *testing.T) { + for _, tc := range []struct { + desc string + respb *pb.Reservation + wantConfig *ReservationConfig + }{ + { + desc: "all fields set", + respb: &pb.Reservation{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + ThroughputCapacity: 5, + }, + wantConfig: &ReservationConfig{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + ThroughputCapacity: 5, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotConfig := protoToReservationConfig(tc.respb) + if !testutil.Equal(gotConfig, tc.wantConfig) { + t.Errorf("protoToReservationConfig(%v)\ngot: %v\nwant: %v", tc.respb, gotConfig, tc.wantConfig) + } + + // Check that the config converts back to an identical proto. + if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.respb) { + t.Errorf("ReservationConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.respb) + } + }) + } +} + +func TestReservationUpdateRequest(t *testing.T) { + for _, tc := range []struct { + desc string + config *ReservationConfigToUpdate + want *pb.UpdateReservationRequest + }{ + { + desc: "all fields set", + config: &ReservationConfigToUpdate{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + ThroughputCapacity: 4, + }, + want: &pb.UpdateReservationRequest{ + Reservation: &pb.Reservation{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + ThroughputCapacity: 4, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{ + "throughput_capacity", + }, + }, + }, + }, + { + desc: "no fields set", + config: &ReservationConfigToUpdate{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + }, + want: &pb.UpdateReservationRequest{ + Reservation: &pb.Reservation{ + Name: "projects/my-proj/locations/us-central1/reservations/my-reservation", + }, + UpdateMask: &fmpb.FieldMask{}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) { + t.Errorf("ReservationConfigToUpdate: %v toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want) + } + }) + } +} diff --git a/pubsublite/example_test.go b/pubsublite/example_test.go index 063939b916e..bba59700bc2 100644 --- a/pubsublite/example_test.go +++ b/pubsublite/example_test.go @@ -237,3 +237,96 @@ func ExampleAdminClient_Subscriptions() { fmt.Println(subscriptionConfig) } } + +// This example demonstrates how to create a new reservation. +// See https://cloud.google.com/pubsub/lite/docs/locations for the list of +// regions where Pub/Sub Lite is available. +func ExampleAdminClient_CreateReservation() { + ctx := context.Background() + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + + reservationConfig := pubsublite.ReservationConfig{ + Name: "projects/my-project/locations/region/reservations/my-reservation", + ThroughputCapacity: 10, + } + _, err = admin.CreateReservation(ctx, reservationConfig) + if err != nil { + // TODO: Handle error. + } +} + +func ExampleAdminClient_UpdateReservation() { + ctx := context.Background() + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + + updateConfig := pubsublite.ReservationConfigToUpdate{ + Name: "projects/my-project/locations/region/reservations/my-reservation", + ThroughputCapacity: 20, + } + _, err = admin.UpdateReservation(ctx, updateConfig) + if err != nil { + // TODO: Handle error. + } +} + +func ExampleAdminClient_DeleteReservation() { + ctx := context.Background() + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + + const reservation = "projects/my-project/locations/region/reservations/my-reservation" + if err := admin.DeleteReservation(ctx, reservation); err != nil { + // TODO: Handle error. + } +} + +func ExampleAdminClient_Reservations() { + ctx := context.Background() + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + + // List the configs of all reservations in the given region for the project. + it := admin.Reservations(ctx, "projects/my-project/locations/region") + for { + reservation, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + // TODO: Handle error. + } + fmt.Println(reservation) + } +} + +func ExampleAdminClient_ReservationTopics() { + ctx := context.Background() + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + + // List the paths of all topics using a reservation. + const reservation = "projects/my-project/locations/region/reservations/my-reservation" + it := admin.ReservationTopics(ctx, reservation) + for { + topicPath, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + // TODO: Handle error. + } + fmt.Println(topicPath) + } +}