Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed May 3, 2024
1 parent b2e5f1d commit bd15dac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

Expand All @@ -44,10 +45,10 @@
public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class);

private int _currentSecond = 0;
private int _numRequestsInCurrentSecond = 0;
private String _nextStartSequenceNumber = null;
private String _nextShardIterator = null;
private int _currentSecond = 0;
private int _numRequestsInCurrentSecond = 0;

public KinesisConsumer(KinesisConfig config) {
super(config);
Expand All @@ -66,33 +67,13 @@ public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset s
String shardId = startOffset.getShardId();
String startSequenceNumber = startOffset.getSequenceNumber();

// NOTE: Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we
// start getting ProvisionedThroughputExceededException. Rate limit the requests to avoid this.
long currentTimeMs = System.currentTimeMillis();
int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
if (currentTimeSeconds == _currentSecond) {
if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
try {
Thread.sleep(1000 - (currentTimeMs % 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
_currentSecond++;
_numRequestsInCurrentSecond = 1;
} else {
_numRequestsInCurrentSecond++;
}
} else {
_currentSecond = currentTimeSeconds;
_numRequestsInCurrentSecond = 1;
}

// Get the shard iterator
String shardIterator;
if (startSequenceNumber.equals(_nextStartSequenceNumber)) {
shardIterator = _nextShardIterator;
} else {
// TODO: Revisit this logic to see if we always miss the first message when consuming from a new shard
// TODO: Revisit the offset handling logic. Reading after the start sequence number can lose the first message
// when consuming from a new partition because the initial start sequence number is inclusive.
GetShardIteratorRequest getShardIteratorRequest =
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId)
.startingSequenceNumber(startSequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
Expand All @@ -104,26 +85,55 @@ public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset s
}

// Read records
rateLimitRequests();
GetRecordsRequest getRecordRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordRequest);
List<BytesStreamMessage> messages =
getRecordsResponse.records().stream().map(record -> extractStreamMessage(record, shardId))
.collect(Collectors.toList());
List<Record> records = getRecordsResponse.records();
List<BytesStreamMessage> messages;
KinesisPartitionGroupOffset offsetOfNextBatch;
if (messages.isEmpty()) {
offsetOfNextBatch = startOffset;
} else {
if (!records.isEmpty()) {
messages = records.stream().map(record -> extractStreamMessage(record, shardId)).collect(Collectors.toList());
StreamMessageMetadata lastMessageMetadata = messages.get(messages.size() - 1).getMetadata();
assert lastMessageMetadata != null;
offsetOfNextBatch = (KinesisPartitionGroupOffset) lastMessageMetadata.getNextOffset();
} else {
// TODO: Revisit whether Kinesis can return empty batch when there are available records. The consumer cna handle
// empty message batch, but it will treat it as fully caught up.
messages = List.of();
offsetOfNextBatch = startOffset;
}
assert offsetOfNextBatch != null;
_nextStartSequenceNumber = offsetOfNextBatch.getSequenceNumber();
_nextShardIterator = getRecordsResponse.nextShardIterator();
return new KinesisMessageBatch(messages, offsetOfNextBatch, _nextShardIterator == null);
}

/**
* Kinesis enforces a limit of 5 getRecords request per second on each shard from AWS end, beyond which we start
* getting {@link ProvisionedThroughputExceededException}. Rate limit the requests to avoid this.
*/
private void rateLimitRequests() {
long currentTimeMs = System.currentTimeMillis();
int currentTimeSeconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMs);
if (currentTimeSeconds == _currentSecond) {
if (_numRequestsInCurrentSecond == _config.getRpsLimit()) {
try {
Thread.sleep(1000 - (currentTimeMs % 1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
_currentSecond++;
_numRequestsInCurrentSecond = 1;
} else {
_numRequestsInCurrentSecond++;
}
} else {
_currentSecond = currentTimeSeconds;
_numRequestsInCurrentSecond = 1;
}
}

private BytesStreamMessage extractStreamMessage(Record record, String shardId) {
byte[] key = record.partitionKey().getBytes(StandardCharsets.UTF_8);
byte[] value = record.data().asByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
Expand Down Expand Up @@ -153,7 +154,7 @@ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientI
// 3. Parent reached EOL and completely consumed.
if (parentShardId == null || !shardIdToShardMap.containsKey(parentShardId) || shardsEnded.contains(
parentShardId)) {
// TODO: Revisit this. Kinesis starts consuming after the start sequence number, and we might miss the first
// TODO: Revisit this. Kinesis starts consuming AFTER the start sequence number, and we might miss the first
// message.
StreamPartitionMsgOffset newStartOffset =
new KinesisPartitionGroupOffset(newShardId, newShard.sequenceNumberRange().startingSequenceNumber());
Expand All @@ -180,7 +181,8 @@ private boolean consumedEndOfShard(StreamPartitionMsgOffset startCheckpoint,
throws IOException, TimeoutException {
try (PartitionGroupConsumer partitionGroupConsumer = _kinesisStreamConsumerFactory.createPartitionGroupConsumer(
_clientId, partitionGroupConsumptionStatus)) {
return partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs).isEndOfPartitionGroup();
MessageBatch<?> messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, _fetchTimeoutMs);
return messageBatch.getMessageCount() == 0 && messageBatch.isEndOfPartitionGroup();
}
}

Expand Down

0 comments on commit bd15dac

Please sign in to comment.