diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index 251cffff..57fccf8e 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -34,11 +34,14 @@ import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.flogger.GoogleLogger; +import com.google.protobuf.util.Timestamps; import java.time.Duration; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -59,7 +62,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.UnsupportedVersionException; /** * A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka @@ -466,8 +468,36 @@ public Map offsetsForTimes(Map offsetsForTimes( Map map, Duration duration) { - throw new UnsupportedVersionException( - "Pub/Sub Lite does not support Consumer backlog introspection."); + try { + Map>> cursors = + map.entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> + topicStatsClient.computeCursorForEventTime( + topicPath, + checkTopicGetPartition(entry.getKey()), + Timestamps.fromMillis(entry.getValue())))); + ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS); + + Map output = new HashMap<>(); + for (Map.Entry>> entry : cursors.entrySet()) { + // As per KafkaConsumer.offsetsForTimes, null is returned if there is no result for the + // partition. + OffsetAndTimestamp offsetAndTime = null; + Optional cursor = entry.getValue().get(); + if (cursor.isPresent()) { + offsetAndTime = + new OffsetAndTimestamp( + cursor.get().getOffset(), Preconditions.checkNotNull(map.get(entry.getKey()))); + } + output.put(entry.getKey(), offsetAndTime); + } + return output; + } catch (Throwable t) { + throw toKafka(t); + } } @Override diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index b618b249..f49759d8 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -30,6 +30,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.Offset; @@ -38,6 +39,7 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.TopicStatsClient; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; @@ -53,9 +55,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimaps; import com.google.common.reflect.ImmutableTypeToInstanceMap; +import com.google.protobuf.util.Timestamps; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; @@ -63,12 +68,13 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -139,11 +145,6 @@ public void unsupportedOperations() { UnsupportedOperationException.class, () -> consumer.subscribe(ImmutableList.of("a", "b"), mock(ConsumerRebalanceListener.class))); - assertThrows( - UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of())); - assertThrows( - UnsupportedVersionException.class, - () -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO)); } @Test @@ -481,6 +482,44 @@ public void endOffsets() { assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L)); } + @Test + public void offsetsForTimes() { + TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2); + TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4); + when(topicStatsClient.computeCursorForEventTime( + example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000))) + .thenReturn( + ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build()))); + when(topicStatsClient.computeCursorForEventTime( + example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000))) + .thenReturn(ApiFutures.immediateFuture(Optional.empty())); + Map output = + consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L)); + + Map expected = new HashMap<>(); + expected.put(partition2, new OffsetAndTimestamp(22, 2000)); + expected.put(partition4, null); + assertThat(output).isEqualTo(expected); + } + + @Test + public void offsetsForTimesFailure() { + TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2); + TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4); + when(topicStatsClient.computeCursorForEventTime( + example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000))) + .thenReturn( + ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build()))); + when(topicStatsClient.computeCursorForEventTime( + example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000))) + .thenReturn( + ApiFutures.immediateFailedFuture(new CheckedApiException(Code.UNAVAILABLE).underlying)); + + assertThrows( + BrokerNotAvailableException.class, + () -> consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L))); + } + @Test public void close() { consumer.close();