Skip to content

Commit

Permalink
feat: Implement Consumer.offsetsForTimes
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed May 7, 2021
1 parent 5135c6b commit d8eda8d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
Expand Up @@ -34,9 +34,11 @@
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
Expand All @@ -59,7 +61,6 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;

/**
* A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka
Expand Down Expand Up @@ -466,8 +467,33 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> map, Duration duration) {
throw new UnsupportedVersionException(
"Pub/Sub Lite does not support Consumer backlog introspection.");
try {
Map<TopicPartition, ApiFuture<Optional<Cursor>>> cursors =
map.entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry ->
topicStatsClient.computeCursorForEventTime(
topicPath,
checkTopicGetPartition(entry.getKey()),
Timestamps.fromMillis(entry.getValue()))));
ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);

// As per KafkaConsumer.offsetsForTimes, null (represented by no map entry) is returned if
// there is no result for the partition.
ImmutableMap.Builder<TopicPartition, OffsetAndTimestamp> output = ImmutableMap.builder();
for (Map.Entry<TopicPartition, ApiFuture<Optional<Cursor>>> entry : cursors.entrySet()) {
Optional<Cursor> cursor = entry.getValue().get();
if (cursor.isPresent()) {
Long timestamp = Preconditions.checkNotNull(map.get(entry.getKey()));
output.put(entry.getKey(), new OffsetAndTimestamp(cursor.get().getOffset(), timestamp));
}
}
return output.build();
} catch (Throwable t) {
throw toKafka(t);
}
}

@Override
Expand Down
Expand Up @@ -53,22 +53,24 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -139,11 +141,6 @@ public void unsupportedOperations() {
UnsupportedOperationException.class,
() ->
consumer.subscribe(ImmutableList.of("a", "b"), mock(ConsumerRebalanceListener.class)));
assertThrows(
UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of()));
assertThrows(
UnsupportedVersionException.class,
() -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO));
}

@Test
Expand Down Expand Up @@ -481,6 +478,22 @@ public void endOffsets() {
assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L));
}

@Test
public void offsetsForTimes() {
TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2);
TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4);
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000)))
.thenReturn(
ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build())));
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000)))
.thenReturn(ApiFutures.immediateFuture(Optional.empty()));
Map<TopicPartition, OffsetAndTimestamp> output =
consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L));
assertThat(output).isEqualTo(ImmutableMap.of(partition2, new OffsetAndTimestamp(22, 2000)));
}

@Test
public void close() {
consumer.close();
Expand Down

0 comments on commit d8eda8d

Please sign in to comment.