diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index 251cffff..d51dfa3f 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -34,9 +34,11 @@ import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.flogger.GoogleLogger; +import com.google.protobuf.util.Timestamps; import java.time.Duration; import java.util.Collection; import java.util.HashSet; @@ -59,7 +61,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.UnsupportedVersionException; /** * A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka @@ -466,8 +467,33 @@ public Map offsetsForTimes(Map offsetsForTimes( Map map, Duration duration) { - throw new UnsupportedVersionException( - "Pub/Sub Lite does not support Consumer backlog introspection."); + try { + Map>> cursors = + map.entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> + topicStatsClient.computeCursorForEventTime( + topicPath, + checkTopicGetPartition(entry.getKey()), + Timestamps.fromMillis(entry.getValue())))); + ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS); + + // As per KafkaConsumer.offsetsForTimes, null (represented by no map entry) is returned if + // there is no result for the partition. + ImmutableMap.Builder output = ImmutableMap.builder(); + for (Map.Entry>> entry : cursors.entrySet()) { + Optional cursor = entry.getValue().get(); + if (cursor.isPresent()) { + Long timestamp = Preconditions.checkNotNull(map.get(entry.getKey())); + output.put(entry.getKey(), new OffsetAndTimestamp(cursor.get().getOffset(), timestamp)); + } + } + return output.build(); + } catch (Throwable t) { + throw toKafka(t); + } } @Override diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index b618b249..2344ca23 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -53,9 +53,11 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimaps; import com.google.common.reflect.ImmutableTypeToInstanceMap; +import com.google.protobuf.util.Timestamps; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; @@ -63,12 +65,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -139,11 +141,6 @@ public void unsupportedOperations() { UnsupportedOperationException.class, () -> consumer.subscribe(ImmutableList.of("a", "b"), mock(ConsumerRebalanceListener.class))); - assertThrows( - UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of())); - assertThrows( - UnsupportedVersionException.class, - () -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO)); } @Test @@ -481,6 +478,22 @@ public void endOffsets() { assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L)); } + @Test + public void offsetsForTimes() { + TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2); + TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4); + when(topicStatsClient.computeCursorForEventTime( + example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000))) + .thenReturn( + ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build()))); + when(topicStatsClient.computeCursorForEventTime( + example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000))) + .thenReturn(ApiFutures.immediateFuture(Optional.empty())); + Map output = + consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L)); + assertThat(output).isEqualTo(ImmutableMap.of(partition2, new OffsetAndTimestamp(22, 2000))); + } + @Test public void close() { consumer.close();