Skip to content

Commit

Permalink
[release-1.13] Avoid possible nil pointer dereferences in consumergro…
Browse files Browse the repository at this point in the history
…up scheduling (#3785)

* fix: avoiding some possible nil pointer dereferences in consumergroup scheduling

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fixed unit tests

Signed-off-by: Calum Murray <cmurray@redhat.com>

* ignore no scheduler found error on finaliztion

Signed-off-by: Calum Murray <cmurray@redhat.com>

* address review comments

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
Co-authored-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
knative-prow-robot and Cali0707 committed Mar 26, 2024
1 parent c9a7165 commit 9acb72f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 11 deletions.
26 changes: 22 additions & 4 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Expand Up @@ -124,12 +124,20 @@ func init() {
}
}

type NoSchedulerFoundError struct{}

func (NoSchedulerFoundError) Error() string {
return "no scheduler found"
}

var _ error = NoSchedulerFoundError{}

type Scheduler struct {
scheduler.Scheduler
SchedulerConfig
}

type schedulerFunc func(s string) Scheduler
type schedulerFunc func(s string) (Scheduler, bool)

type Reconciler struct {
SchedulerFunc schedulerFunc
Expand Down Expand Up @@ -231,7 +239,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum
cg.Spec.Replicas = pointer.Int32(0)
err := r.schedule(ctx, cg) //de-schedule placements

if err != nil {
if err != nil && !errors.Is(err, NoSchedulerFoundError{}) {
// return an error to 1. update the status. 2. not clear the finalizer
return cg.MarkScheduleConsumerFailed("Deschedule", fmt.Errorf("failed to unschedule consumer group: %w", err))
}
Expand Down Expand Up @@ -405,7 +413,15 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
startTime := time.Now()
defer recordScheduleLatency(ctx, cg, startTime)

statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind)
resourceRef := cg.GetUserFacingResourceRef()
if resourceRef == nil {
return NoSchedulerFoundError{}
}

statefulSetScheduler, ok := r.SchedulerFunc(resourceRef.Kind)
if !ok {
return NoSchedulerFoundError{}
}

// Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing
// volumes.
Expand All @@ -419,7 +435,9 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
// Sort placements by pod name.
sort.SliceStable(placements, func(i, j int) bool { return placements[i].PodName < placements[j].PodName })
sort.SliceStable(placements, func(i, j int) bool {
return placements[i].PodName < placements[j].PodName
})

cg.Status.Placements = placements

Expand Down
96 changes: 90 additions & 6 deletions control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Expand Up @@ -69,6 +69,7 @@ func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Pla

const (
testSchedulerKey = "scheduler"
noTestScheduler = "no-scheduler"

systemNamespace = "knative-eventing"
finalizerName = "consumergroups.internal.kafka.eventing.knative.dev"
Expand Down Expand Up @@ -1673,14 +1674,14 @@ func TestReconcileKind(t *testing.T) {
store.OnConfigChanged(exampleConfig)

r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
SchedulerFunc: func(s string) (Scheduler, bool) {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Scheduler: ss,
SchedulerConfig: SchedulerConfig{
StatefulSetName: kafkainternals.SourceStatefulSetName,
},
}
}, true
},
ConsumerLister: listers.GetConsumerLister(),
InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand Down Expand Up @@ -1820,11 +1821,11 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
ctx, _ = kedaclient.With(ctx)

r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
SchedulerFunc: func(s string) (Scheduler, bool) {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Scheduler: ss,
}
}, true
},
ConsumerLister: listers.GetConsumerLister(),
InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand Down Expand Up @@ -2009,6 +2010,86 @@ func TestFinalizeKind(t *testing.T) {
},
SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it
},
{
Name: "Finalize normal - with consumers, no valid scheduler",
Objects: []runtime.Object{
NewSASLSSLSecret(ConsumerGroupNamespace, SecretName),
NewConsumer(1,
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
ConsumerBootstrapServersConfig(ChannelBootstrapServers),
ConsumerGroupIdConfig("my.group.id"),
),
ConsumerVReplicas(1),
ConsumerPlacement(kafkainternals.PodBind{PodName: "p1", PodNamespace: systemNamespace}),
ConsumerSubscriber(NewSourceSinkReference()),
)),
),
NewDeletedConsumeGroup(
ConsumerGroupReplicas(1),
ConsumerGroupOwnerRef(SourceAsOwnerReference()),
ConsumerGroupConsumerSpec(NewConsumerSpec(
ConsumerConfigs(
ConsumerBootstrapServersConfig(ChannelBootstrapServers),
ConsumerGroupIdConfig("my.group.id"),
),
ConsumerAuth(&kafkainternals.Auth{
NetSpec: &bindings.KafkaNetSpec{
SASL: bindings.KafkaSASLSpec{
Enable: true,
User: bindings.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: SecretName,
},
Key: "user",
},
},
Password: bindings.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: SecretName,
},
Key: "password",
},
},
Type: bindings.SecretValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: SecretName,
},
Key: "type",
},
},
},
TLS: bindings.KafkaTLSSpec{
Enable: true,
},
},
}),
)),
),
},
Key: testKey,
OtherTestData: map[string]interface{}{
noTestScheduler: true,
},
WantDeletes: []clientgotesting.DeleteActionImpl{
{
ActionImpl: clientgotesting.ActionImpl{
Namespace: ConsumerGroupNamespace,
Resource: schema.GroupVersionResource{
Group: kafkainternals.SchemeGroupVersion.Group,
Version: kafkainternals.SchemeGroupVersion.Version,
Resource: "consumers",
},
},
Name: fmt.Sprintf("%s-%d", ConsumerNamePrefix, 1),
},
},
SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it
},
{
Name: "Finalize normal - with consumers and existing placements",
Objects: []runtime.Object{
Expand Down Expand Up @@ -2230,11 +2311,14 @@ func TestFinalizeKind(t *testing.T) {
errorOnDeleteKafkaCG := row.OtherTestData[kafkatesting.ErrorOnDeleteConsumerGroupTestKey]

r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
SchedulerFunc: func(s string) (Scheduler, bool) {
if noScheduler, ok := row.OtherTestData[noTestScheduler]; ok && noScheduler.(bool) == true {
return Scheduler{}, false
}
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Scheduler: ss,
}
}, true
},
ConsumerLister: listers.GetConsumerLister(),
InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/controller.go
Expand Up @@ -114,7 +114,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

r := &Reconciler{
SchedulerFunc: func(s string) Scheduler { return schedulers[strings.ToLower(s)] },
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
InternalsClient: internalsclient.Get(ctx).InternalV1alpha1(),
SecretLister: secretinformer.Get(ctx).Lister(),
Expand Down

0 comments on commit 9acb72f

Please sign in to comment.