Skip to content

Commit

Permalink
feat(pubsublite): allow increasing the number of topic partitions (#3647
Browse files Browse the repository at this point in the history
)

Allows increasing the number of partitions when updating a topic config. Updates documentation, examples and integration tests.
  • Loading branch information
tmdiep committed Feb 18, 2021
1 parent 0da3578 commit 1f85fdc
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 55 deletions.
12 changes: 10 additions & 2 deletions pubsublite/config.go
Expand Up @@ -46,8 +46,8 @@ type TopicConfig struct {
// about valid topic IDs.
Name string

// The number of partitions in the topic. Must be at least 1. Cannot be
// changed after creation.
// The number of partitions in the topic. Must be at least 1. Can be increased
// after creation, but not decreased.
PartitionCount int

// Publish throughput capacity per partition in MiB/s.
Expand Down Expand Up @@ -120,6 +120,10 @@ type TopicConfigToUpdate struct {
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". Required.
Name string

// If non-zero, will update the number of partitions in the topic. The number
// of partitions can only be increased, not decreased.
PartitionCount int

// If non-zero, will update the publish throughput capacity per partition.
PublishCapacityMiBPerSec int

Expand All @@ -139,6 +143,7 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
updatedTopic := &pb.Topic{
Name: tc.Name,
PartitionConfig: &pb.Topic_PartitionConfig{
Count: int64(tc.PartitionCount),
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
Expand All @@ -152,6 +157,9 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
}

var fields []string
if tc.PartitionCount > 0 {
fields = append(fields, "partition_config.count")
}
if tc.PublishCapacityMiBPerSec > 0 {
fields = append(fields, "partition_config.capacity.publish_mib_per_sec")
}
Expand Down
5 changes: 4 additions & 1 deletion pubsublite/config_test.go
Expand Up @@ -112,6 +112,7 @@ func TestTopicUpdateRequest(t *testing.T) {
desc: "all fields set",
config: &TopicConfigToUpdate{
Name: "projects/my-proj/locations/us-central1-c/topics/my-topic",
PartitionCount: 2,
PublishCapacityMiBPerSec: 4,
SubscribeCapacityMiBPerSec: 12,
PerPartitionBytes: 500000,
Expand All @@ -121,6 +122,7 @@ func TestTopicUpdateRequest(t *testing.T) {
Topic: &pb.Topic{
Name: "projects/my-proj/locations/us-central1-c/topics/my-topic",
PartitionConfig: &pb.Topic_PartitionConfig{
Count: 2,
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: 4,
Expand All @@ -135,6 +137,7 @@ func TestTopicUpdateRequest(t *testing.T) {
},
UpdateMask: &fmpb.FieldMask{
Paths: []string{
"partition_config.count",
"partition_config.capacity.publish_mib_per_sec",
"partition_config.capacity.subscribe_mib_per_sec",
"retention_config.per_partition_bytes",
Expand Down Expand Up @@ -187,7 +190,7 @@ func TestTopicUpdateRequest(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) {
t.Errorf("TopicConfigToUpdate: %v toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want)
t.Errorf("TopicConfigToUpdate(%v).toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/example_test.go
Expand Up @@ -61,10 +61,10 @@ func ExampleAdminClient_UpdateTopic() {

updateConfig := pubsublite.TopicConfigToUpdate{
Name: "projects/my-project/locations/zone/topics/my-topic",
PartitionCount: 3, // Only increases currently supported.
PublishCapacityMiBPerSec: 8,
SubscribeCapacityMiBPerSec: 16,
// Garbage collect messages older than 24 hours.
RetentionDuration: 24 * time.Hour,
RetentionDuration: 24 * time.Hour, // Garbage collect messages older than 24 hours.
}
_, err = admin.UpdateTopic(ctx, updateConfig)
if err != nil {
Expand Down

0 comments on commit 1f85fdc

Please sign in to comment.