Skip to content

Commit

Permalink
Merge pull request #4707 from chaosi-zju/hpasyncerv3
Browse files Browse the repository at this point in the history
sync deployment replicas when it is controlled by hpa
  • Loading branch information
karmada-bot committed Mar 14, 2024
2 parents 9822d09 + 8f5cc49 commit 2bebae0
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 616 deletions.
2 changes: 1 addition & 1 deletion artifacts/deploy/karmada-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
- --cluster-status-update-frequency=10s
- --secure-port=10357
- --failover-eviction-timeout=30s
- --controllers=*,hpaReplicasSyncer
- --controllers=*,hpaScaleTargetMarker,deploymentReplicasSyncer
- --feature-gates=PropagationPolicyPreemption=true,MultiClusterService=true
- --v=4
livenessProbe:
Expand Down
31 changes: 18 additions & 13 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/scale"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
Expand All @@ -57,12 +56,13 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/cluster"
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
"github.com/karmada-io/karmada/pkg/controllers/cronfederatedhpa"
"github.com/karmada-io/karmada/pkg/controllers/deploymentreplicassyncer"
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/federatedhpa"
metricsclient "github.com/karmada-io/karmada/pkg/controllers/federatedhpa/metrics"
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
"github.com/karmada-io/karmada/pkg/controllers/gracefuleviction"
"github.com/karmada-io/karmada/pkg/controllers/hpareplicassyncer"
"github.com/karmada-io/karmada/pkg/controllers/hpascaletargetmarker"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
"github.com/karmada-io/karmada/pkg/controllers/multiclusterservice"
"github.com/karmada-io/karmada/pkg/controllers/namespace"
Expand Down Expand Up @@ -206,7 +206,7 @@ func Run(ctx context.Context, opts *options.Options) error {
var controllers = make(controllerscontext.Initializers)

// controllersDisabledByDefault is the set of controllers which is disabled by default
var controllersDisabledByDefault = sets.New("hpaReplicasSyncer")
var controllersDisabledByDefault = sets.New("hpaScaleTargetMarker", "deploymentReplicasSyncer")

func init() {
controllers["cluster"] = startClusterController
Expand All @@ -226,7 +226,8 @@ func init() {
controllers["applicationFailover"] = startApplicationFailoverController
controllers["federatedHorizontalPodAutoscaler"] = startFederatedHorizontalPodAutoscalerController
controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController
controllers["hpaReplicasSyncer"] = startHPAReplicasSyncerController
controllers["hpaScaleTargetMarker"] = startHPAScaleTargetMarkerController
controllers["deploymentReplicasSyncer"] = startDeploymentReplicasSyncerController
controllers["multiclusterservice"] = startMCSController
controllers["endpointsliceCollect"] = startEndpointSliceCollectController
controllers["endpointsliceDispatch"] = startEndpointSliceDispatchController
Expand Down Expand Up @@ -655,20 +656,24 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.
return true, nil
}

func startHPAReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(ctx.KubeClientSet.Discovery())
scaleClient, err := scale.NewForConfig(ctx.Mgr.GetConfig(), ctx.Mgr.GetRESTMapper(), dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
func startHPAScaleTargetMarkerController(ctx controllerscontext.Context) (enabled bool, err error) {
hpaScaleTargetMarker := hpascaletargetmarker.HpaScaleTargetMarker{
DynamicClient: ctx.DynamicClientSet,
RESTMapper: ctx.Mgr.GetRESTMapper(),
}
err = hpaScaleTargetMarker.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
}

hpaReplicasSyncer := hpareplicassyncer.HPAReplicasSyncer{
Client: ctx.Mgr.GetClient(),
DynamicClient: ctx.DynamicClientSet,
RESTMapper: ctx.Mgr.GetRESTMapper(),
ScaleClient: scaleClient,
return true, nil
}

func startDeploymentReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
deploymentReplicasSyncer := deploymentreplicassyncer.DeploymentReplicasSyncer{
Client: ctx.Mgr.GetClient(),
}
err = hpaReplicasSyncer.SetupWithManager(ctx.Mgr)
err = deploymentReplicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package deploymentreplicassyncer

import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/names"
)

const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "deployment-replicas-syncer"
)

// DeploymentReplicasSyncer is to sync deployment replicas from status field to spec field.
type DeploymentReplicasSyncer struct {
Client client.Client
}

var predicateFunc = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj := e.ObjectOld.(*appsv1.Deployment)
newObj := e.ObjectNew.(*appsv1.Deployment)

// if deployment is not labeled `retain-replicas`, means it is not controlled by hpa, ignore the event
retainReplicasLabel := util.GetLabelValue(newObj.GetLabels(), util.RetainReplicasLabel)
if retainReplicasLabel != util.RetainReplicasValue {
return false
}

if oldObj.Spec.Replicas == nil || newObj.Spec.Replicas == nil {
klog.Errorf("spec.replicas field unexpectedly become nil: %+v, %+v", oldObj.Spec.Replicas, newObj.Spec.Replicas)
return false
}

// if replicas field has no change, either in spec.replicas or in status.replicas
if *oldObj.Spec.Replicas == *newObj.Spec.Replicas && oldObj.Status.Replicas == newObj.Status.Replicas {
return false
}

return true
},
DeleteFunc: func(event.DeleteEvent) bool { return false },
GenericFunc: func(event.GenericEvent) bool { return false },
}

// SetupWithManager creates a controller and register to controller manager.
func (r *DeploymentReplicasSyncer) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
For(&appsv1.Deployment{}, builder.WithPredicates(predicateFunc)).
Complete(r)
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *DeploymentReplicasSyncer) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for Deployment %s/%s", req.Namespace, req.Name)

deployment := &appsv1.Deployment{}
binding := &workv1alpha2.ResourceBinding{}
bindingName := names.GenerateBindingName(util.DeploymentKind, req.Name)

if err := r.Client.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: bindingName}, binding); err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("no need to update deployment replicas for binding not found")
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{}, err
}

// if it is not divided schedule type, no need to update replicas
if binding.Spec.Placement.ReplicaSchedulingType() != policyv1alpha1.ReplicaSchedulingTypeDivided {
return controllerruntime.Result{}, nil
}

if err := r.Client.Get(ctx, req.NamespacedName, deployment); err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("no need to update deployment replicas for deployment not found")
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{}, err
}

// if replicas in spec already the same as in status, no need to update replicas
if deployment.Spec.Replicas != nil && *deployment.Spec.Replicas == deployment.Status.Replicas {
klog.Infof("replicas in spec field (%d) already equal to in status field (%d)", *deployment.Spec.Replicas, deployment.Status.Replicas)
return controllerruntime.Result{}, nil
}

// make sure the replicas change in deployment.spec can sync to binding.spec, otherwise retry
if deployment.Spec.Replicas == nil || *deployment.Spec.Replicas != binding.Spec.Replicas {
klog.V(4).Infof("wait until replicas of binding (%d) equal to replicas of deployment (%d)",
binding.Spec.Replicas, *deployment.Spec.Replicas)
return controllerruntime.Result{}, fmt.Errorf("retry to wait replicas change sync to binding")
}

// make sure the scheduler observed generation equal to generation in binding, otherwise retry
if binding.Generation != binding.Status.SchedulerObservedGeneration {
klog.V(4).Infof("wait until scheduler observed generation (%d) equal to generation in binding (%d)",
binding.Status.SchedulerObservedGeneration, binding.Generation)
return controllerruntime.Result{}, fmt.Errorf("retry to wait scheduler observed generation")
}

if len(binding.Status.AggregatedStatus) != len(binding.Spec.Clusters) {
klog.V(4).Infof("wait until all clusters status collected, got: %d, expected: %d",
len(binding.Status.AggregatedStatus), len(binding.Spec.Clusters))
return controllerruntime.Result{}, fmt.Errorf("retry to wait status in binding collected")
}
for _, status := range binding.Status.AggregatedStatus {
if status.Status == nil {
klog.V(4).Infof("wait until aggregated status of cluster %s collected", status.ClusterName)
return controllerruntime.Result{}, fmt.Errorf("retry to wait status in binding collected")
}
}

// update replicas
oldReplicas := *deployment.Spec.Replicas
deployment.Spec.Replicas = &deployment.Status.Replicas
if err := r.Client.Update(ctx, deployment); err != nil {
klog.Errorf("failed to update deployment (%s/%s) replicas: %+v", deployment.Namespace, deployment.Name, err)
return controllerruntime.Result{}, err
}

klog.Infof("successfully udpate deployment (%s/%s) replicas from %d to %d", deployment.Namespace,
deployment.Namespace, oldReplicas, deployment.Status.Replicas)

return controllerruntime.Result{}, nil
}

0 comments on commit 2bebae0

Please sign in to comment.