New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
only requeue consumergroups from appropriate statefulset on change #3825
base: main
Are you sure you want to change the base?
only requeue consumergroups from appropriate statefulset on change #3825
Conversation
Signed-off-by: Calum Murray <cmurray@redhat.com>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3825 +/- ##
=============================================
- Coverage 73.90% 49.49% -24.41%
=============================================
Files 100 246 +146
Lines 3407 14817 +11410
Branches 288 0 -288
=============================================
+ Hits 2518 7334 +4816
- Misses 716 6730 +6014
- Partials 173 753 +580
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
/retest-required |
func GetOwnerKindFromStatefulSetName(name string) (string, bool) { | ||
switch name { | ||
case SourceStatefulSetName: | ||
return "KafkaSource", true | ||
case ChannelStatefulSetName: | ||
return "KafkaChannel", true | ||
case BrokerStatefulSetName: | ||
return "Trigger", true | ||
} | ||
return "", false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use this one?
Lines 199 to 211 in 551691c
// GetUserFacingResourceRef gets the resource reference to the user-facing resources | |
// that are backed by this ConsumerGroup using the OwnerReference list. | |
func (cg *ConsumerGroup) GetUserFacingResourceRef() *metav1.OwnerReference { | |
for i, or := range cg.OwnerReferences { | |
// TODO hardcoded resource kinds. | |
if strings.EqualFold(or.Kind, "trigger") || | |
strings.EqualFold(or.Kind, "kafkasource") || | |
strings.EqualFold(or.Kind, "kafkachannel") { | |
return &cg.OwnerReferences[i] | |
} | |
} | |
return nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one goes consumergroup -> ownerReference, we need to go statefulset -> ownerReference.Kind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
func GetOwnerKindFromStatefulSetName(name string) (string, bool) { | ||
switch name { | ||
case SourceStatefulSetName: | ||
return "KafkaSource", true | ||
case ChannelStatefulSetName: | ||
return "KafkaChannel", true | ||
case BrokerStatefulSetName: | ||
return "Trigger", true | ||
} | ||
return "", false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
@@ -189,8 +189,30 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I | |||
}) | |||
consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey))) | |||
|
|||
globalResync := func(interface{}) { | |||
impl.GlobalResync(consumerGroupInformer.Informer()) | |||
globalResync := func(obj interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this variable?
This is not a globalResync
anymore as it's dealing with statefulset changes vs the primary controlled objects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't plan to reuse this anywhere I think we can inline this on the ResyncOnStatefulSetChange
call as it's now very specific
Signed-off-by: Calum Murray <cmurray@redhat.com>
/cc @pierDipi |
Signed-off-by: Calum Murray <cmurray@redhat.com>
@@ -90,7 +90,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I | |||
impl.GlobalResync(consumerInformer.Informer()) | |||
} | |||
|
|||
cgreconciler.ResyncOnStatefulSetChange(ctx, globalResync) | |||
cgreconciler.ResyncOnStatefulSetChange(ctx, impl.FilteredGlobalResync, consumerInformer.Informer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The provided informer here will have consumers but in the function we do:
filteredResync(func(i interface{}) bool {
cg, ok := i.(*kafkainternals.ConsumerGroup)
if !ok {
return false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense, thanks for catching this! (just saved me lots of debugging time)
Signed-off-by: Calum Murray <cmurray@redhat.com>
Signed-off-by: Calum Murray <cmurray@redhat.com>
/retest-required |
1 similar comment
/retest-required |
/retest |
/retest-required |
/cc @pierDipi |
} | ||
|
||
for _, ref := range c.GetOwnerReferences() { | ||
if ref.Kind == "ConsumerGroup" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use case insensitive match as it's more robust
if ref.Kind == "ConsumerGroup" { | |
if strings.EqualFold(ref.Kind, "ConsumerGroup") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Make sure the import is there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, there is this function
eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go
Lines 258 to 267 in 0c5c2e5
// GetConsumerGroup gets the resource reference to the ConsumerGroup | |
// using the OwnerReference list. | |
func (c *Consumer) GetConsumerGroup() *metav1.OwnerReference { | |
for i, or := range c.OwnerReferences { | |
if strings.EqualFold(or.Kind, ConsumerGroupGroupVersionKind.Kind) { | |
return &c.OwnerReferences[i] | |
} | |
} | |
return nil | |
} |
Signed-off-by: Calum Murray <cmurray@redhat.com>
/cc @pierDipi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Cali0707, pierDipi The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
return nil, false | ||
} | ||
|
||
cgRef := c.GetConsumerGroup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We would need to check for non nil here
/retest-required |
/test integration-tests |
/retest-required |
4 similar comments
/retest-required |
/retest-required |
/retest-required |
/retest-required |
/retest |
/retest-required |
|
/retest-required |
/test upgrade-tests |
@Cali0707: The following tests failed, say
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Fixes #3824
Proposed Changes