New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Receiving an empty list when using RecordFilterStrategy on batch messages #2806
Comments
…RecordFilterStrategy on batch messages
Hi, @artembilan. if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
} Why we should consider if (!filter(consumerRecord)) {
switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment);
case CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, consumer);
case SIMPLE -> this.delegate.onMessage(consumerRecord);
}
}
else {
ackFilteredIfNecessary(acknowledgment);
} In a not // Fix Here. consumerRecords.size() should be grater than 0.
if ((consumerRecords.size() > 0) && (consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING)))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
// And then, consumerRecords.size() == 0, ackFilteredIfNecessary!
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
} IMHO, it is not necessary to be aware of |
@chickenchickenlove You can go ahead and work on a PR. Then, we can review the details. Keep in mind that we are already in an RC state with |
This reverts commit a60197d.
Discussed in #2805
Originally posted by m-hetz September 11, 2023
Hi!
I'm using
RecordFilterStrategy
to filter records based on headers. This works great on single messages, the filter works and records don't reach the listener (annotated with@KafkaListener
).When using batch mode, filtering also works but when the entire batch is filtered, the listener is invoked with an empty list.
Is this expected behavior? According to this when returning an empty list all records are filtered
Is there a way to configure it not to reach the listener at all? (on an empty bulk)
Thanks!
Sorry for the unformatted code, I tried editing the message but the code is still messed up
The text was updated successfully, but these errors were encountered: