Skip to content

Commit

Permalink
Simplify the handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Apr 8, 2024
1 parent 85d2e96 commit cbb2faf
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class);

private String _nextStartSequenceNumber = null;
private String _nextShardIterator = null;
private int _currentSecond = 0;
private int _numRequestsInCurrentSecond = 0;
private String _nextStartSequenceNumber = null;
private String _nextShardIterator = null;

public KinesisConsumer(KinesisConfig config) {
super(config);
Expand All @@ -60,15 +60,32 @@ public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) {
super(config, kinesisClient);
}

/**
* Fetch records from the Kinesis stream between the start and end KinesisCheckpoint
*/
@Override
public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset;
String shardId = startOffset.getShardId();
String startSequenceNumber = startOffset.getSequenceNumber();
long endTimeMs = System.currentTimeMillis() + timeoutMs;

// NOTE: Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we
// start getting ProvisionedThroughputExceededException. Rate limit the requests to avoid this.
long currentTimeMs = System.currentTimeMillis();
int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
if (currentTimeSeconds == _currentSecond) {
if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
try {
Thread.sleep(1000 - (currentTimeMs % 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
_currentSecond++;
_numRequestsInCurrentSecond = 1;
} else {
_numRequestsInCurrentSecond++;
}
} else {
_currentSecond = currentTimeSeconds;
_numRequestsInCurrentSecond = 1;
}

// Get the shard iterator
String shardIterator;
Expand All @@ -82,51 +99,29 @@ public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset s
.build();
shardIterator = _kinesisClient.getShardIterator(getShardIteratorRequest).shardIterator();
}
if (shardIterator == null) {
return new KinesisMessageBatch(List.of(), startOffset, true);
}

// Read records
// NOTE: When the next shard iterator from the response is null, it means we have reached the end of the shard.
long currentTimeMs;
while (shardIterator != null && (currentTimeMs = System.currentTimeMillis()) <= endTimeMs) {
// NOTE: Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we
// start getting ProvisionedThroughputExceededException. Rate limit the requests to avoid this.
int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
if (currentTimeSeconds == _currentSecond) {
if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
try {
Thread.sleep(1000 - (currentTimeMs % 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
_currentSecond++;
_numRequestsInCurrentSecond = 1;
} else {
_numRequestsInCurrentSecond++;
}
} else {
_currentSecond = currentTimeSeconds;
_numRequestsInCurrentSecond = 1;
}

GetRecordsRequest getRecordRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest);
List<Record> records = getRecordsResponse.records();
shardIterator = getRecordsResponse.nextShardIterator();
if (!records.isEmpty()) {
List<BytesStreamMessage> messages =
records.stream().map(record -> extractStreamMessage(record, shardId)).collect(Collectors.toList());
StreamMessageMetadata lastMessageMetadata = messages.get(messages.size() - 1).getMetadata();
assert lastMessageMetadata != null;
KinesisPartitionGroupOffset nextOffset = (KinesisPartitionGroupOffset) lastMessageMetadata.getNextOffset();
assert nextOffset != null;
_nextStartSequenceNumber = nextOffset.getSequenceNumber();
_nextShardIterator = shardIterator;
return new KinesisMessageBatch(messages, nextOffset, shardIterator == null);
}
GetRecordsRequest getRecordRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest);
List<BytesStreamMessage> messages =
getRecordsResponse.records().stream().map(record -> extractStreamMessage(record, shardId))
.collect(Collectors.toList());
KinesisPartitionGroupOffset offsetOfNextBatch;
if (messages.isEmpty()) {
offsetOfNextBatch = startOffset;
} else {
StreamMessageMetadata lastMessageMetadata = messages.get(messages.size() - 1).getMetadata();
assert lastMessageMetadata != null;
offsetOfNextBatch = (KinesisPartitionGroupOffset) lastMessageMetadata.getNextOffset();
}

// If no records are fetched, return an empty batch
return new KinesisMessageBatch(List.of(), startOffset, shardIterator == null);
assert offsetOfNextBatch != null;
_nextStartSequenceNumber = offsetOfNextBatch.getSequenceNumber();
_nextShardIterator = getRecordsResponse.nextShardIterator();
return new KinesisMessageBatch(messages, offsetOfNextBatch, _nextShardIterator == null);
}

private BytesStreamMessage extractStreamMessage(Record record, String shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ default void start(StreamPartitionMsgOffset startOffset) {
/**
* Fetches messages from the stream partition from the given start offset within the specified timeout.
*
* This method should return immediately if there are messages available. Otherwise, it will await the passed timeout.
* If the timeout expires, an empty message batch should be returned.
* This method should return within the timeout. If there is no message available before time runs out, an empty
* message batch should be returned.
*
* @param startOffset The offset of the first message desired, inclusive
* @param timeoutMs Timeout in milliseconds
Expand Down

0 comments on commit cbb2faf

Please sign in to comment.