Skip to content

Commit

Permalink
feat(bigtable): add support for autoscaling (#5232)
Browse files Browse the repository at this point in the history
* feat(bigtable): add support for autoscaling

This commit adds support for autoscaling in the InstanceAdminClient with
associated unit tests and a happy-path integration test.

When creating either a new instance or cluster, callers may specify an
AutoscalingConfig.

For existing clusters, to enable autoscaling on a cluster, a caller may
use SetAutoscaling. To remove autoscaling, callers may use
UpdateCluster. Alternatively, callers may either update the cluster to
enable or disable autoscaling with UpdateInstanceWithClusters or
UpdateInstanceAndSyncClusters.

* fix(bigtable): update clusters with valid nodes

Previously UpdateInstanceWithClusters would try to update a cluster even
if the NumNodes was 0 (or less), resulting in an erroneous error. This
commit ensures a cluster is updated only when a valid and non-zero value
of NumNodes is provided.
  • Loading branch information
enocom committed Jan 19, 2022
1 parent 2090d76 commit a59d1ac
Show file tree
Hide file tree
Showing 5 changed files with 871 additions and 33 deletions.
199 changes: 168 additions & 31 deletions bigtable/admin.go
Expand Up @@ -808,6 +808,10 @@ type InstanceConf struct {
StorageType StorageType
InstanceType InstanceType
Labels map[string]string

// AutoscalingConfig configures the autoscaling properties on the cluster
// created with the instance. It is optional.
AutoscalingConfig *AutoscalingConfig
}

// InstanceWithClustersConfig contains the information necessary to create an Instance
Expand All @@ -824,22 +828,23 @@ var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][
// This method will return when the instance has been created or when an error occurs.
func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
ctx = mergeOutgoingMetadata(ctx, iac.md)
newConfig := InstanceWithClustersConfig{
newConfig := &InstanceWithClustersConfig{
InstanceID: conf.InstanceId,
DisplayName: conf.DisplayName,
InstanceType: conf.InstanceType,
Labels: conf.Labels,
Clusters: []ClusterConfig{
{
InstanceID: conf.InstanceId,
ClusterID: conf.ClusterId,
Zone: conf.Zone,
NumNodes: conf.NumNodes,
StorageType: conf.StorageType,
InstanceID: conf.InstanceId,
ClusterID: conf.ClusterId,
Zone: conf.Zone,
NumNodes: conf.NumNodes,
StorageType: conf.StorageType,
AutoscalingConfig: conf.AutoscalingConfig,
},
},
}
return iac.CreateInstanceWithClusters(ctx, &newConfig)
return iac.CreateInstanceWithClusters(ctx, newConfig)
}

// CreateInstanceWithClusters creates a new instance with configured clusters in the project.
Expand Down Expand Up @@ -920,7 +925,9 @@ func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *Instan
// - InstanceID is required
// - DisplayName and InstanceType are updated only if they are not empty
// - ClusterID is required for any provided cluster
// - All other cluster fields are ignored except for NumNodes, which if set will be updated
// - All other cluster fields are ignored except for NumNodes and
// AutoscalingConfig, which if set will be updated. If both are provided,
// AutoscalingConfig takes precedence.
//
// This method may return an error after partially succeeding, for example if the instance is updated
// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
Expand All @@ -941,12 +948,17 @@ func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context,

// Update any clusters
for _, cluster := range conf.Clusters {
err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
if err != nil {
var clusterErr error
if cluster.AutoscalingConfig != nil {
clusterErr = iac.SetAutoscaling(ctx, conf.InstanceID, cluster.ClusterID, *cluster.AutoscalingConfig)
} else if cluster.NumNodes > 0 {
clusterErr = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
}
if clusterErr != nil {
if updatedInstance {
// We updated the instance, so note that in the error message.
return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded",
cluster.ClusterID, err)
cluster.ClusterID, clusterErr)
}
return err
}
Expand Down Expand Up @@ -1033,6 +1045,35 @@ func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID str
}, nil
}

// AutoscalingConfig contains autoscaling configuration for a cluster.
// For details, see https://cloud.google.com/bigtable/docs/autoscaling.
type AutoscalingConfig struct {
// MinNodes sets the minumum number of nodes in a cluster. MinNodes must
// be 1 or greater.
MinNodes int
// MaxNodes sets the maximum number of nodes in a cluster. MaxNodes must be
// equal to or greater than MinNodes.
MaxNodes int
// CPUTargetPercent sets the CPU utilization target for your cluster's
// workload.
CPUTargetPercent int
}

func (a *AutoscalingConfig) proto() *btapb.Cluster_ClusterAutoscalingConfig {
if a == nil {
return nil
}
return &btapb.Cluster_ClusterAutoscalingConfig{
AutoscalingLimits: &btapb.AutoscalingLimits{
MinServeNodes: int32(a.MinNodes),
MaxServeNodes: int32(a.MaxNodes),
},
AutoscalingTargets: &btapb.AutoscalingTargets{
CpuUtilizationPercent: int32(a.CPUTargetPercent),
},
}
}

// ClusterConfig contains the information necessary to create a cluster
type ClusterConfig struct {
// InstanceID specifies the unique name of the instance. Required.
Expand All @@ -1047,7 +1088,9 @@ type ClusterConfig struct {
Zone string

// NumNodes specifies the number of nodes allocated to this cluster. More
// nodes enable higher throughput and more consistent performance. Required.
// nodes enable higher throughput and more consistent performance. One of
// NumNodes or AutoscalingConfig is required. If both are set,
// AutoscalingConfig takes precedence.
NumNodes int32

// StorageType specifies the type of storage used by this cluster to serve
Expand All @@ -1068,17 +1111,30 @@ type ClusterConfig struct {
// key.
// Optional. Immutable.
KMSKeyName string

// AutoscalingConfig configures the autoscaling properties on a cluster.
// One of NumNodes or AutoscalingConfig is required.
AutoscalingConfig *AutoscalingConfig
}

func (cc *ClusterConfig) proto(project string) *btapb.Cluster {
ec := btapb.Cluster_EncryptionConfig{}
ec.KmsKeyName = cc.KMSKeyName
return &btapb.Cluster{
cl := &btapb.Cluster{
ServeNodes: cc.NumNodes,
DefaultStorageType: cc.StorageType.proto(),
Location: "projects/" + project + "/locations/" + cc.Zone,
EncryptionConfig: &ec,
EncryptionConfig: &btapb.Cluster_EncryptionConfig{
KmsKeyName: cc.KMSKeyName,
},
}

if asc := cc.AutoscalingConfig; asc != nil {
cl.Config = &btapb.Cluster_ClusterConfig_{
ClusterConfig: &btapb.Cluster_ClusterConfig{
ClusterAutoscalingConfig: asc.proto(),
},
}
}
return cl
}

// ClusterInfo represents information about a cluster.
Expand All @@ -1100,6 +1156,9 @@ type ClusterInfo struct {

// KMSKeyName is the customer managed encryption key for the cluster.
KMSKeyName string

// AutoscalingConfig are the configured values for a cluster.
AutoscalingConfig *AutoscalingConfig
}

// CreateCluster creates a new cluster in an instance.
Expand Down Expand Up @@ -1129,13 +1188,48 @@ func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, c
return err
}

// UpdateCluster updates attributes of a cluster
// SetAutoscaling enables autoscaling on a cluster. To remove autoscaling, use
// UpdateCluster. See AutoscalingConfig documentation for deatils.
func (iac *InstanceAdminClient) SetAutoscaling(ctx context.Context, instanceID, clusterID string, conf AutoscalingConfig) error {
ctx = mergeOutgoingMetadata(ctx, iac.md)
cluster := &btapb.Cluster{
Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
Config: &btapb.Cluster_ClusterConfig_{
ClusterConfig: &btapb.Cluster_ClusterConfig{
ClusterAutoscalingConfig: conf.proto(),
},
},
}
lro, err := iac.iClient.PartialUpdateCluster(ctx, &btapb.PartialUpdateClusterRequest{
UpdateMask: &field_mask.FieldMask{
Paths: []string{"cluster_config.cluster_autoscaling_config"},
},
Cluster: cluster,
})
if err != nil {
return err
}
return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
}

// UpdateCluster updates attributes of a cluster. If Autoscaling is configured
// for the cluster, it will be removed and replaced by the static number of
// serve nodes specified.
func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error {
ctx = mergeOutgoingMetadata(ctx, iac.md)
cluster := &btapb.Cluster{
Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
ServeNodes: serveNodes}
lro, err := iac.iClient.UpdateCluster(ctx, cluster)
ServeNodes: serveNodes,
// Explicitly removing autoscaling config (and including it in the field
// mask below)
Config: nil,
}
lro, err := iac.iClient.PartialUpdateCluster(ctx, &btapb.PartialUpdateClusterRequest{
UpdateMask: &field_mask.FieldMask{
Paths: []string{"serve_nodes", "cluster_config.cluster_autoscaling_config"},
},
Cluster: cluster,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -1167,14 +1261,20 @@ func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string)
if c.EncryptionConfig != nil {
kmsKeyName = c.EncryptionConfig.KmsKeyName
}
cis = append(cis, &ClusterInfo{
ci := &ClusterInfo{
Name: nameParts[len(nameParts)-1],
Zone: locParts[len(locParts)-1],
ServeNodes: int(c.ServeNodes),
State: c.State.String(),
StorageType: storageTypeFromProto(c.DefaultStorageType),
KMSKeyName: kmsKeyName,
})
}
if cfg := c.GetClusterConfig(); cfg != nil {
if asc := fromClusterConfigProto(cfg); asc != nil {
ci.AutoscalingConfig = asc
}
}
cis = append(cis, ci)
}
if len(res.FailedLocations) > 0 {
// Return partial results and an error in
Expand Down Expand Up @@ -1206,15 +1306,39 @@ func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clus
}
nameParts := strings.Split(c.Name, "/")
locParts := strings.Split(c.Location, "/")
cis := &ClusterInfo{
ci := &ClusterInfo{
Name: nameParts[len(nameParts)-1],
Zone: locParts[len(locParts)-1],
ServeNodes: int(c.ServeNodes),
State: c.State.String(),
StorageType: storageTypeFromProto(c.DefaultStorageType),
KMSKeyName: kmsKeyName,
}
return cis, nil
// Use type assertion to handle protobuf oneof type
if cfg := c.GetClusterConfig(); cfg != nil {
if asc := fromClusterConfigProto(cfg); asc != nil {
ci.AutoscalingConfig = asc
}
}
return ci, nil
}

func fromClusterConfigProto(c *btapb.Cluster_ClusterConfig) *AutoscalingConfig {
if c == nil {
return nil
}
if c.ClusterAutoscalingConfig == nil {
return nil
}
got := c.ClusterAutoscalingConfig
if got.AutoscalingLimits == nil || got.AutoscalingTargets == nil {
return nil
}
return &AutoscalingConfig{
MinNodes: int(got.AutoscalingLimits.MinServeNodes),
MaxNodes: int(got.AutoscalingLimits.MaxServeNodes),
CPUTargetPercent: int(got.AutoscalingTargets.CpuUtilizationPercent),
}
}

// InstanceIAM returns the instance's IAM handle.
Expand Down Expand Up @@ -1478,8 +1602,12 @@ func max(x, y int) int {
// and the given ClusterConfig.
// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance
// using DeleteCluster.
// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the
// provided number of nodes if set.
// - Any cluster in conf.Clusters that also exists in the instance will be
// updated either to contain the provided number of nodes or to use the
// provided autoscaling config. If both the number of nodes and autoscaling
// are configured, autoscaling takes precedence. If the number of nodes is zero
// and autoscaling is not provided in InstanceWithClustersConfig, the cluster
// is not updated.
//
// This method may return an error after partially succeeding, for example if the instance is updated
// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
Expand Down Expand Up @@ -1521,16 +1649,25 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient
}
delete(existingClusterNames, cluster.ClusterID)

if cluster.NumNodes <= 0 {
// We only synchronize clusters with a valid number of nodes.
if cluster.NumNodes <= 0 && cluster.AutoscalingConfig == nil {
// We only synchronize clusters with a valid number of nodes
// or a valid autoscaling config.
continue
}

// We simply want to update this cluster
err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
if err != nil {
// We update teh clusters autoscaling config, or its number of serve
// nodes.
var updateErr error
if cluster.AutoscalingConfig != nil {
updateErr = iac.SetAutoscaling(ctx, conf.InstanceID, cluster.ClusterID,
*cluster.AutoscalingConfig)
} else {
updateErr = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID,
cluster.NumNodes)
}
if updateErr != nil {
return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v",
cluster.ClusterID, err, results)
cluster.ClusterID, updateErr, results)
}
results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID)
}
Expand Down

0 comments on commit a59d1ac

Please sign in to comment.