Skip to content

Commit

Permalink
Reconcile per-container Endpoints and EndpointSlices for member Services
Browse files Browse the repository at this point in the history
Kubernetes reconciles Endpoints and EndpointSlices for Services having
selector by taking entire Pod readiness into account. Because we need to
allow inter-node communication before CQL traffic we used
PublishNonReadyEndpoints on Services to overcome this.
But at the same time, we would like stop accepting new connections when
Pod is tearing down. These two requirements contradicts with each other.

To satisfy both requirements, Operator will reconcile Endpoints and
EndpointSlice resources in per-container way. If Pod container specifies
port and has its own Readiness probe, Endpoint/EndpointSlice
will become ready for this port when given container is ready. If these
two conditions aren't met, given port becomes ready when entire Pod is
ready.
Controller logic will consider Pod being deleted (non-nil
deletionTimestamp) as fully non-ready so that all endpoints for all
ports
and containers will removed.

Additional container added to Scylla Pod controls readiness of ports
used for inter-node communication.
  • Loading branch information
zimnx committed Mar 19, 2024
1 parent 8b32358 commit 2aa6bd3
Show file tree
Hide file tree
Showing 15 changed files with 2,255 additions and 66 deletions.
25 changes: 24 additions & 1 deletion helm/scylla-operator/templates/clusterrole_def.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ rules:
- ""
resources:
- nodes
- endpoints
verbs:
- get
- list
Expand Down Expand Up @@ -273,3 +272,27 @@ rules:
- patch
- update
- delete
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
2 changes: 2 additions & 0 deletions pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (o *OperatorOptions) run(ctx context.Context, streams genericclioptions.IOS
kubeInformers.Policy().V1().PodDisruptionBudgets(),
kubeInformers.Networking().V1().Ingresses(),
kubeInformers.Batch().V1().Jobs(),
kubeInformers.Discovery().V1().EndpointSlices(),
kubeInformers.Core().V1().Endpoints(),
scyllaInformers.Scylla().V1().ScyllaClusters(),
o.OperatorImage,
o.CQLSIngressPort,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/scyllacluster/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ const (
jobControllerDegradedCondition = "JobControllerDegraded"
configControllerProgressingCondition = "ConfigControllerProgressing"
configControllerDegradedCondition = "ConfigControllerDegraded"
endpointSliceControllerProgressingCondition = "EndpointSliceControllerProgressing"
endpointSliceControllerDegradedCondition = "EndpointSliceControllerDegraded"
endpointsControllerProgressingCondition = "EndpointsControllerProgressing"
endpointsControllerDegradedCondition = "EndpointsControllerDegraded"
)
69 changes: 69 additions & 0 deletions pkg/controller/scyllacluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -29,6 +30,7 @@ import (
appsv1informers "k8s.io/client-go/informers/apps/v1"
batchv1informers "k8s.io/client-go/informers/batch/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
discoveryv1informers "k8s.io/client-go/informers/discovery/v1"
networkingv1informers "k8s.io/client-go/informers/networking/v1"
policyv1informers "k8s.io/client-go/informers/policy/v1"
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
Expand All @@ -37,6 +39,7 @@ import (
appsv1listers "k8s.io/client-go/listers/apps/v1"
batchv1listers "k8s.io/client-go/listers/batch/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
discoveryv1listers "k8s.io/client-go/listers/discovery/v1"
networkingv1listers "k8s.io/client-go/listers/networking/v1"
policyv1listers "k8s.io/client-go/listers/policy/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
Expand Down Expand Up @@ -76,6 +79,8 @@ type Controller struct {
ingressLister networkingv1listers.IngressLister
scyllaLister scyllav1listers.ScyllaClusterLister
jobLister batchv1listers.JobLister
endpointSliceLister discoveryv1listers.EndpointSliceLister
endpointsLister corev1listers.EndpointsLister

cachesToSync []cache.InformerSynced

Expand All @@ -100,6 +105,8 @@ func NewController(
pdbInformer policyv1informers.PodDisruptionBudgetInformer,
ingressInformer networkingv1informers.IngressInformer,
jobInformer batchv1informers.JobInformer,
endpointSliceInformer discoveryv1informers.EndpointSliceInformer,
endpointsInformer corev1informers.EndpointsInformer,
scyllaClusterInformer scyllav1informers.ScyllaClusterInformer,
operatorImage string,
cqlsIngressPort int,
Expand Down Expand Up @@ -127,6 +134,8 @@ func NewController(
ingressLister: ingressInformer.Lister(),
scyllaLister: scyllaClusterInformer.Lister(),
jobLister: jobInformer.Lister(),
endpointSliceLister: endpointSliceInformer.Lister(),
endpointsLister: endpointsInformer.Lister(),

cachesToSync: []cache.InformerSynced{
podInformer.Informer().HasSynced,
Expand All @@ -140,6 +149,8 @@ func NewController(
ingressInformer.Informer().HasSynced,
scyllaClusterInformer.Informer().HasSynced,
jobInformer.Informer().HasSynced,
endpointSliceInformer.Informer().HasSynced,
endpointsInformer.Informer().HasSynced,
},

eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "scyllacluster-controller"}),
Expand Down Expand Up @@ -235,6 +246,18 @@ func NewController(
DeleteFunc: scc.deleteJob,
})

endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: scc.addEndpointSlice,
UpdateFunc: scc.updateEndpointSlice,
DeleteFunc: scc.deleteEndpointSlice,
})

endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: scc.addEndpoints,
UpdateFunc: scc.updateEndpoints,
DeleteFunc: scc.deleteEndpoints,
})

return scc, nil
}

Expand Down Expand Up @@ -625,3 +648,49 @@ func (scc *Controller) deleteJob(obj interface{}) {
scc.handlers.EnqueueOwner,
)
}

func (scc *Controller) addEndpointSlice(obj interface{}) {
scc.handlers.HandleAdd(
obj.(*discoveryv1.EndpointSlice),
scc.handlers.EnqueueOwner,
)
}

func (scc *Controller) updateEndpointSlice(old, cur interface{}) {
scc.handlers.HandleUpdate(
old.(*discoveryv1.EndpointSlice),
cur.(*discoveryv1.EndpointSlice),
scc.handlers.EnqueueOwner,
scc.deleteEndpointSlice,
)
}

func (scc *Controller) deleteEndpointSlice(obj interface{}) {
scc.handlers.HandleDelete(
obj,
scc.handlers.EnqueueOwner,
)
}

func (scc *Controller) addEndpoints(obj interface{}) {
scc.handlers.HandleAdd(
obj.(*corev1.Endpoints),
scc.handlers.EnqueueOwner,
)
}

func (scc *Controller) updateEndpoints(old, cur interface{}) {
scc.handlers.HandleUpdate(
old.(*corev1.Endpoints),
cur.(*corev1.Endpoints),
scc.handlers.EnqueueOwner,
scc.deleteEndpoints,
)
}

func (scc *Controller) deleteEndpoints(obj interface{}) {
scc.handlers.HandleDelete(
obj,
scc.handlers.EnqueueOwner,
)
}

0 comments on commit 2aa6bd3

Please sign in to comment.