Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): allow increasing the number of topic partitions #3647

Merged
merged 3 commits into from Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions pubsublite/config.go
Expand Up @@ -46,8 +46,8 @@ type TopicConfig struct {
// about valid topic IDs.
Name string

// The number of partitions in the topic. Must be at least 1. Cannot be
// changed after creation.
// The number of partitions in the topic. Must be at least 1. Can be increased
// after creation, but not decreased.
PartitionCount int

// Publish throughput capacity per partition in MiB/s.
Expand Down Expand Up @@ -120,6 +120,10 @@ type TopicConfigToUpdate struct {
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". Required.
Name string

// If non-zero, will update the number of partitions in the topic. The number
// of partitions can only be increased, not decreased.
PartitionCount int

// If non-zero, will update the publish throughput capacity per partition.
PublishCapacityMiBPerSec int

Expand All @@ -139,6 +143,7 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
updatedTopic := &pb.Topic{
Name: tc.Name,
PartitionConfig: &pb.Topic_PartitionConfig{
Count: int64(tc.PartitionCount),
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
Expand All @@ -152,6 +157,9 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
}

var fields []string
if tc.PartitionCount > 0 {
fields = append(fields, "partition_config.count")
}
if tc.PublishCapacityMiBPerSec > 0 {
fields = append(fields, "partition_config.capacity.publish_mib_per_sec")
}
Expand Down
5 changes: 4 additions & 1 deletion pubsublite/config_test.go
Expand Up @@ -112,6 +112,7 @@ func TestTopicUpdateRequest(t *testing.T) {
desc: "all fields set",
config: &TopicConfigToUpdate{
Name: "projects/my-proj/locations/us-central1-c/topics/my-topic",
PartitionCount: 2,
PublishCapacityMiBPerSec: 4,
SubscribeCapacityMiBPerSec: 12,
PerPartitionBytes: 500000,
Expand All @@ -121,6 +122,7 @@ func TestTopicUpdateRequest(t *testing.T) {
Topic: &pb.Topic{
Name: "projects/my-proj/locations/us-central1-c/topics/my-topic",
PartitionConfig: &pb.Topic_PartitionConfig{
Count: 2,
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: 4,
Expand All @@ -135,6 +137,7 @@ func TestTopicUpdateRequest(t *testing.T) {
},
UpdateMask: &fmpb.FieldMask{
Paths: []string{
"partition_config.count",
"partition_config.capacity.publish_mib_per_sec",
"partition_config.capacity.subscribe_mib_per_sec",
"retention_config.per_partition_bytes",
Expand Down Expand Up @@ -187,7 +190,7 @@ func TestTopicUpdateRequest(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) {
t.Errorf("TopicConfigToUpdate: %v toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want)
t.Errorf("TopicConfigToUpdate(%v).toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/example_test.go
Expand Up @@ -61,10 +61,10 @@ func ExampleAdminClient_UpdateTopic() {

updateConfig := pubsublite.TopicConfigToUpdate{
Name: "projects/my-project/locations/zone/topics/my-topic",
PartitionCount: 3, // Only increases currently supported.
PublishCapacityMiBPerSec: 8,
SubscribeCapacityMiBPerSec: 16,
// Garbage collect messages older than 24 hours.
RetentionDuration: 24 * time.Hour,
RetentionDuration: 24 * time.Hour, // Garbage collect messages older than 24 hours.
}
_, err = admin.UpdateTopic(ctx, updateConfig)
if err != nil {
Expand Down