New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement SingleSubscriptionConsumerImpl #281
Conversation
Codecov Report
@@ Coverage Diff @@
## master #281 +/- ##
=========================================
Coverage ? 61.68%
Complexity ? 984
=========================================
Files ? 189
Lines ? 6063
Branches ? 525
=========================================
Hits ? 3740
Misses ? 2017
Partials ? 306
Continue to review full report at Codecov.
|
public ConsumerRecords<byte[], byte[]> poll(Duration duration) { | ||
Map<Partition, Queue<SequencedMessage>> partitionQueues = new HashMap<>(); | ||
try { | ||
if (wakeupTriggered) throw new WakeupException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you read this without the mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Fixed.
try { | ||
if (wakeupTriggered) throw new WakeupException(); | ||
while (!duration.isZero()) { | ||
Duration sleepFor = Collections.min(ImmutableList.of(duration, Duration.ofMillis(100))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried that this will result in very high e2e latency for consumers that set high poll durations. IIUC, Kafka returns from Poll as soon as any data is available, and does not wait until "max.poll.records" records are available. Ideally, poll() would return as soon as any one partition has any data unless "fetch.min.bytes" is set to >1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was misreading the behavior. I've restructured this to return as soon as any messages are available, PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works well for cases where messages are already in the buffer when the client calls poll(), but it will still have high latency when messages are delivered during the 100ms sleep. I can think of some cases where this will matter: imagine messages being published uniformly every 10ms. Also, we need to remember that clients are used to single-digit milliseconds latency for Kafka. Would it be possible to have some kind of notification when any of the SinglePartitionSubscribers has messages ready to be delivered?
})); | ||
} | ||
} | ||
try (CloseableMonitor.Hold h = monitor.enter()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this last read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Added an explanatory comment as to why.
…t data, not wait the full duration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a single comment that I would like to discuss regarding latency for Kafka subscribers. Everything else looks great, thanks Dan.
try { | ||
if (wakeupTriggered) throw new WakeupException(); | ||
while (!duration.isZero()) { | ||
Duration sleepFor = Collections.min(ImmutableList.of(duration, Duration.ofMillis(100))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works well for cases where messages are already in the buffer when the client calls poll(), but it will still have high latency when messages are delivered during the 100ms sleep. I can think of some cases where this will matter: imagine messages being published uniformly every 10ms. Also, we need to remember that clients are used to single-digit milliseconds latency for Kafka. Would it be possible to have some kind of notification when any of the SinglePartitionSubscribers has messages ready to be delivered?
Ack. As discussed offline, this requires a bit of a gross "register watch" type api to implement. I will add a FR for this behavior and lower the polling duration to 10 ms in a subsequent PR. |
This is an API that can be used to implement the kafka Consumer API.