Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Kinesis consumer #12806

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Apr 7, 2024

  • Do not use a separate thread to fetch Kinesis records (this can fix the potential race condition)
  • Cache the shard iterator
  • Return the message batch immediately without combining multiple of them (timeout is ignored)
  • Change the default max records per fetch to 10,000 (Kinesis default)
  • Remove some unused dependencies

@codecov-commenter
Copy link

codecov-commenter commented Apr 8, 2024

Codecov Report

Attention: Patch coverage is 72.72727% with 12 lines in your changes are missing coverage. Please review.

Project coverage is 62.18%. Comparing base (59551e4) to head (4d71bf3).
Report is 438 commits behind head on master.

Files Patch % Lines
...e/pinot/plugin/stream/kinesis/KinesisConsumer.java 72.50% 8 Missing and 3 partials ⚠️
.../stream/kinesis/KinesisStreamMetadataProvider.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12806      +/-   ##
============================================
+ Coverage     61.75%   62.18%   +0.43%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2515      +79     
  Lines        133233   137764    +4531     
  Branches      20636    21314     +678     
============================================
+ Hits          82274    85675    +3401     
- Misses        44911    45713     +802     
- Partials       6048     6376     +328     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 62.16% <72.72%> (+0.45%) ⬆️
java-21 62.06% <72.72%> (+0.43%) ⬆️
skip-bytebuffers-false 62.18% <72.72%> (+0.43%) ⬆️
skip-bytebuffers-true 62.04% <72.72%> (+34.31%) ⬆️
temurin 62.18% <72.72%> (+0.43%) ⬆️
unittests 62.18% <72.72%> (+0.43%) ⬆️
unittests1 46.74% <100.00%> (-0.15%) ⬇️
unittests2 27.80% <68.18%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang force-pushed the enhance_kinesis_consumer branch 2 times, most recently from cbb2faf to d588079 Compare April 8, 2024 18:27

// NOTE: Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we
// start getting ProvisionedThroughputExceededException. Rate limit the requests to avoid this.
long currentTimeMs = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need our own custom rate limiter here ? Does kinesis client provide options to do the same thing/handle this, instead of us having this logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find one from Kinesis client. Seems it will just throw LimitExceededException.
The rps is currently configured on Pinot side though, so I guess it makes sense to rate limit on the Pinot side.

} else {
LOGGER.warn(message + ": " + throwable.getMessage());
// TODO: Revisit this logic to see if we always miss the first message when consuming from a new shard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more explanation to this ? Why would we miss the 1st message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

// Read records
GetRecordsRequest getRecordRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be empty right, even if the stream has some data, given how kinesis works? We'll be return a response even if its empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some test to verify the behavior. The consumer can handle empty message batch, but the consumption lag might be set to 0 because it thought there is no more message. Added a TODO to revisit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can also a metric to track this when it happens.

Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than clarifications.

// Read records
GetRecordsRequest getRecordRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can also a metric to track this when it happens.

* Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we start
* getting {@link ProvisionedThroughputExceededException}. Rate limit the requests to avoid this.
*/
private void rateLimitRequests() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating a separate method. I guess this being a special kind of rate limiter that needs to block until we are ready to fetch again, we cannot leverage off the shelf ones like guava.

if kinesis has a limit, don't we need to adhere to that limit. So does getRpsLimit() need to be what Kinesis limit is ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinesis limit is not very straight forward, so I guess we need to iterate on this to get the best settings.

long currentTimeMs = System.currentTimeMillis();
int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
if (currentTimeSeconds == _currentSecond) {
if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done later. A log.info or metric would help debug if rate limiting becomes an issue.

// Get the shard iterator
String shardIterator;
if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
shardIterator = _nextShardIterator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need to handle a case here when nextShardIterator has expired (since it has time limit of 5 minutes).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants