Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): add support for autoscaling #5232

Merged
merged 11 commits into from Jan 19, 2022
199 changes: 168 additions & 31 deletions bigtable/admin.go
Expand Up @@ -808,6 +808,10 @@ type InstanceConf struct {
StorageType StorageType
InstanceType InstanceType
Labels map[string]string

// AutoscalingConfig configures the autoscaling properties on the cluster
// created with the instance. It is optional.
AutoscalingConfig *AutoscalingConfig
}

// InstanceWithClustersConfig contains the information necessary to create an Instance
Expand All @@ -824,22 +828,23 @@ var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][
// This method will return when the instance has been created or when an error occurs.
func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
ctx = mergeOutgoingMetadata(ctx, iac.md)
newConfig := InstanceWithClustersConfig{
newConfig := &InstanceWithClustersConfig{
InstanceID: conf.InstanceId,
DisplayName: conf.DisplayName,
InstanceType: conf.InstanceType,
Labels: conf.Labels,
Clusters: []ClusterConfig{
{
InstanceID: conf.InstanceId,
ClusterID: conf.ClusterId,
Zone: conf.Zone,
NumNodes: conf.NumNodes,
StorageType: conf.StorageType,
InstanceID: conf.InstanceId,
ClusterID: conf.ClusterId,
Zone: conf.Zone,
NumNodes: conf.NumNodes,
StorageType: conf.StorageType,
AutoscalingConfig: conf.AutoscalingConfig,
},
},
}
return iac.CreateInstanceWithClusters(ctx, &newConfig)
return iac.CreateInstanceWithClusters(ctx, newConfig)
}

// CreateInstanceWithClusters creates a new instance with configured clusters in the project.
Expand Down Expand Up @@ -920,7 +925,9 @@ func (iac *InstanceAdminClient) updateInstance(ctx context.Context, conf *Instan
// - InstanceID is required
// - DisplayName and InstanceType are updated only if they are not empty
// - ClusterID is required for any provided cluster
// - All other cluster fields are ignored except for NumNodes, which if set will be updated
// - All other cluster fields are ignored except for NumNodes and
// AutoscalingConfig, which if set will be updated. If both are provided,
// AutoscalingConfig takes precedence.
//
// This method may return an error after partially succeeding, for example if the instance is updated
// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
Expand All @@ -941,12 +948,17 @@ func (iac *InstanceAdminClient) UpdateInstanceWithClusters(ctx context.Context,

// Update any clusters
for _, cluster := range conf.Clusters {
err := iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
if err != nil {
var clusterErr error
if cluster.AutoscalingConfig != nil {
clusterErr = iac.SetAutoscaling(ctx, conf.InstanceID, cluster.ClusterID, *cluster.AutoscalingConfig)
} else if cluster.NumNodes > 0 {
clusterErr = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
}
if clusterErr != nil {
if updatedInstance {
// We updated the instance, so note that in the error message.
return fmt.Errorf("UpdateCluster %q failed %v; however UpdateInstance succeeded",
cluster.ClusterID, err)
cluster.ClusterID, clusterErr)
}
return err
}
Expand Down Expand Up @@ -1033,6 +1045,35 @@ func (iac *InstanceAdminClient) InstanceInfo(ctx context.Context, instanceID str
}, nil
}

// AutoscalingConfig contains autoscaling configuration for a cluster.
// For details, see https://cloud.google.com/bigtable/docs/autoscaling.
enocom marked this conversation as resolved.
Show resolved Hide resolved
type AutoscalingConfig struct {
// MinNodes sets the minumum number of nodes in a cluster. MinNodes must
// be 1 or greater.
MinNodes int
// MaxNodes sets the maximum number of nodes in a cluster. MaxNodes must be
// equal to or greater than MinNodes.
MaxNodes int
// CPUTargetPercent sets the CPU utilization target for your cluster's
// workload.
CPUTargetPercent int
}

func (a *AutoscalingConfig) proto() *btapb.Cluster_ClusterAutoscalingConfig {
if a == nil {
return nil
}
return &btapb.Cluster_ClusterAutoscalingConfig{
enocom marked this conversation as resolved.
Show resolved Hide resolved
AutoscalingLimits: &btapb.AutoscalingLimits{
MinServeNodes: int32(a.MinNodes),
MaxServeNodes: int32(a.MaxNodes),
},
AutoscalingTargets: &btapb.AutoscalingTargets{
CpuUtilizationPercent: int32(a.CPUTargetPercent),
},
}
}

// ClusterConfig contains the information necessary to create a cluster
type ClusterConfig struct {
// InstanceID specifies the unique name of the instance. Required.
Expand All @@ -1047,7 +1088,9 @@ type ClusterConfig struct {
Zone string

// NumNodes specifies the number of nodes allocated to this cluster. More
// nodes enable higher throughput and more consistent performance. Required.
// nodes enable higher throughput and more consistent performance. One of
// NumNodes or AutoscalingConfig is required. If both are set,
// AutoscalingConfig takes precedence.
NumNodes int32

// StorageType specifies the type of storage used by this cluster to serve
Expand All @@ -1068,17 +1111,30 @@ type ClusterConfig struct {
// key.
// Optional. Immutable.
KMSKeyName string

// AutoscalingConfig configures the autoscaling properties on a cluster.
// One of NumNodes or AutoscalingConfig is required.
AutoscalingConfig *AutoscalingConfig
}

func (cc *ClusterConfig) proto(project string) *btapb.Cluster {
ec := btapb.Cluster_EncryptionConfig{}
ec.KmsKeyName = cc.KMSKeyName
return &btapb.Cluster{
cl := &btapb.Cluster{
ServeNodes: cc.NumNodes,
DefaultStorageType: cc.StorageType.proto(),
Location: "projects/" + project + "/locations/" + cc.Zone,
EncryptionConfig: &ec,
EncryptionConfig: &btapb.Cluster_EncryptionConfig{
KmsKeyName: cc.KMSKeyName,
},
}

if asc := cc.AutoscalingConfig; asc != nil {
cl.Config = &btapb.Cluster_ClusterConfig_{
ClusterConfig: &btapb.Cluster_ClusterConfig{
ClusterAutoscalingConfig: asc.proto(),
},
}
}
return cl
}

// ClusterInfo represents information about a cluster.
Expand All @@ -1100,6 +1156,9 @@ type ClusterInfo struct {

// KMSKeyName is the customer managed encryption key for the cluster.
KMSKeyName string

// AutoscalingConfig are the configured values for a cluster.
AutoscalingConfig *AutoscalingConfig
}

// CreateCluster creates a new cluster in an instance.
Expand Down Expand Up @@ -1129,13 +1188,48 @@ func (iac *InstanceAdminClient) DeleteCluster(ctx context.Context, instanceID, c
return err
}

// UpdateCluster updates attributes of a cluster
// SetAutoscaling enables autoscaling on a cluster. To remove autoscaling, use
// UpdateCluster. See AutoscalingConfig documentation for deatils.
func (iac *InstanceAdminClient) SetAutoscaling(ctx context.Context, instanceID, clusterID string, conf AutoscalingConfig) error {
ctx = mergeOutgoingMetadata(ctx, iac.md)
cluster := &btapb.Cluster{
Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
Config: &btapb.Cluster_ClusterConfig_{
ClusterConfig: &btapb.Cluster_ClusterConfig{
ClusterAutoscalingConfig: conf.proto(),
enocom marked this conversation as resolved.
Show resolved Hide resolved
},
},
}
lro, err := iac.iClient.PartialUpdateCluster(ctx, &btapb.PartialUpdateClusterRequest{
UpdateMask: &field_mask.FieldMask{
Paths: []string{"cluster_config.cluster_autoscaling_config"},
},
Cluster: cluster,
})
if err != nil {
return err
}
return longrunning.InternalNewOperation(iac.lroClient, lro).Wait(ctx, nil)
}

// UpdateCluster updates attributes of a cluster. If Autoscaling is configured
enocom marked this conversation as resolved.
Show resolved Hide resolved
// for the cluster, it will be removed and replaced by the static number of
// serve nodes specified.
func (iac *InstanceAdminClient) UpdateCluster(ctx context.Context, instanceID, clusterID string, serveNodes int32) error {
ctx = mergeOutgoingMetadata(ctx, iac.md)
cluster := &btapb.Cluster{
Name: "projects/" + iac.project + "/instances/" + instanceID + "/clusters/" + clusterID,
ServeNodes: serveNodes}
lro, err := iac.iClient.UpdateCluster(ctx, cluster)
ServeNodes: serveNodes,
// Explicitly removing autoscaling config (and including it in the field
// mask below)
Config: nil,
}
lro, err := iac.iClient.PartialUpdateCluster(ctx, &btapb.PartialUpdateClusterRequest{
tritone marked this conversation as resolved.
Show resolved Hide resolved
UpdateMask: &field_mask.FieldMask{
Paths: []string{"serve_nodes", "cluster_config.cluster_autoscaling_config"},
},
Cluster: cluster,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -1167,14 +1261,20 @@ func (iac *InstanceAdminClient) Clusters(ctx context.Context, instanceID string)
if c.EncryptionConfig != nil {
kmsKeyName = c.EncryptionConfig.KmsKeyName
}
cis = append(cis, &ClusterInfo{
ci := &ClusterInfo{
Name: nameParts[len(nameParts)-1],
Zone: locParts[len(locParts)-1],
ServeNodes: int(c.ServeNodes),
State: c.State.String(),
StorageType: storageTypeFromProto(c.DefaultStorageType),
KMSKeyName: kmsKeyName,
})
}
if cfg := c.GetClusterConfig(); cfg != nil {
if asc := fromClusterConfigProto(cfg); asc != nil {
ci.AutoscalingConfig = asc
}
}
cis = append(cis, ci)
}
if len(res.FailedLocations) > 0 {
// Return partial results and an error in
Expand Down Expand Up @@ -1206,15 +1306,39 @@ func (iac *InstanceAdminClient) GetCluster(ctx context.Context, instanceID, clus
}
nameParts := strings.Split(c.Name, "/")
locParts := strings.Split(c.Location, "/")
cis := &ClusterInfo{
ci := &ClusterInfo{
Name: nameParts[len(nameParts)-1],
Zone: locParts[len(locParts)-1],
ServeNodes: int(c.ServeNodes),
State: c.State.String(),
StorageType: storageTypeFromProto(c.DefaultStorageType),
KMSKeyName: kmsKeyName,
}
return cis, nil
// Use type assertion to handle protobuf oneof type
if cfg := c.GetClusterConfig(); cfg != nil {
if asc := fromClusterConfigProto(cfg); asc != nil {
ci.AutoscalingConfig = asc
}
}
return ci, nil
}

func fromClusterConfigProto(c *btapb.Cluster_ClusterConfig) *AutoscalingConfig {
if c == nil {
return nil
}
if c.ClusterAutoscalingConfig == nil {
return nil
}
got := c.ClusterAutoscalingConfig
if got.AutoscalingLimits == nil || got.AutoscalingTargets == nil {
return nil
}
return &AutoscalingConfig{
MinNodes: int(got.AutoscalingLimits.MinServeNodes),
MaxNodes: int(got.AutoscalingLimits.MaxServeNodes),
CPUTargetPercent: int(got.AutoscalingTargets.CpuUtilizationPercent),
}
}

// InstanceIAM returns the instance's IAM handle.
Expand Down Expand Up @@ -1478,8 +1602,12 @@ func max(x, y int) int {
// and the given ClusterConfig.
// - Any cluster missing from conf.Clusters but present in the instance will be removed from the instance
// using DeleteCluster.
// - Any cluster in conf.Clusters that also exists in the instance will be updated to contain the
// provided number of nodes if set.
// - Any cluster in conf.Clusters that also exists in the instance will be
// updated either to contain the provided number of nodes or to use the
// provided autoscaling config. If both the number of nodes and autoscaling
// are configured, autoscaling takes precedence. If the number of nodes is zero
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
// and autoscaling is not provided in InstanceWithClustersConfig, the cluster
// is not updated.
//
// This method may return an error after partially succeeding, for example if the instance is updated
// but a cluster update fails. If an error is returned, InstanceInfo and Clusters may be called to
Expand Down Expand Up @@ -1521,16 +1649,25 @@ func UpdateInstanceAndSyncClusters(ctx context.Context, iac *InstanceAdminClient
}
delete(existingClusterNames, cluster.ClusterID)

if cluster.NumNodes <= 0 {
// We only synchronize clusters with a valid number of nodes.
if cluster.NumNodes <= 0 && cluster.AutoscalingConfig == nil {
// We only synchronize clusters with a valid number of nodes
// or a valid autoscaling config.
continue
}

// We simply want to update this cluster
err = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID, cluster.NumNodes)
if err != nil {
// We update teh clusters autoscaling config, or its number of serve
// nodes.
var updateErr error
if cluster.AutoscalingConfig != nil {
updateErr = iac.SetAutoscaling(ctx, conf.InstanceID, cluster.ClusterID,
*cluster.AutoscalingConfig)
} else {
updateErr = iac.UpdateCluster(ctx, conf.InstanceID, cluster.ClusterID,
cluster.NumNodes)
}
if updateErr != nil {
return results, fmt.Errorf("UpdateCluster %q failed %v; Progress: %v",
cluster.ClusterID, err, results)
cluster.ClusterID, updateErr, results)
}
results.UpdatedClusters = append(results.UpdatedClusters, cluster.ClusterID)
}
Expand Down