Skip to content

Commit

Permalink
feat: Implement Consumer.offsetsForTimes (#123)
Browse files Browse the repository at this point in the history
Retrieves cursors for event timestamps from the TopicStatsClient to implement PubsubLiteConsumer.offsetsForTimes.
  • Loading branch information
tmdiep committed May 19, 2021
1 parent 3ff2cdf commit a785e53
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
Expand Up @@ -34,11 +34,14 @@
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -59,7 +62,6 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;

/**
* A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka
Expand Down Expand Up @@ -466,8 +468,36 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> map, Duration duration) {
throw new UnsupportedVersionException(
"Pub/Sub Lite does not support Consumer backlog introspection.");
try {
Map<TopicPartition, ApiFuture<Optional<Cursor>>> cursors =
map.entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry ->
topicStatsClient.computeCursorForEventTime(
topicPath,
checkTopicGetPartition(entry.getKey()),
Timestamps.fromMillis(entry.getValue()))));
ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);

Map<TopicPartition, OffsetAndTimestamp> output = new HashMap<>();
for (Map.Entry<TopicPartition, ApiFuture<Optional<Cursor>>> entry : cursors.entrySet()) {
// As per KafkaConsumer.offsetsForTimes, null is returned if there is no result for the
// partition.
OffsetAndTimestamp offsetAndTime = null;
Optional<Cursor> cursor = entry.getValue().get();
if (cursor.isPresent()) {
offsetAndTime =
new OffsetAndTimestamp(
cursor.get().getOffset(), Preconditions.checkNotNull(map.get(entry.getKey())));
}
output.put(entry.getKey(), offsetAndTime);
}
return output;
} catch (Throwable t) {
throw toKafka(t);
}
}

@Override
Expand Down
Expand Up @@ -30,6 +30,7 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
Expand All @@ -38,6 +39,7 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
Expand All @@ -53,22 +55,26 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -139,11 +145,6 @@ public void unsupportedOperations() {
UnsupportedOperationException.class,
() ->
consumer.subscribe(ImmutableList.of("a", "b"), mock(ConsumerRebalanceListener.class)));
assertThrows(
UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of()));
assertThrows(
UnsupportedVersionException.class,
() -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO));
}

@Test
Expand Down Expand Up @@ -481,6 +482,44 @@ public void endOffsets() {
assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L));
}

@Test
public void offsetsForTimes() {
TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2);
TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4);
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000)))
.thenReturn(
ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build())));
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000)))
.thenReturn(ApiFutures.immediateFuture(Optional.empty()));
Map<TopicPartition, OffsetAndTimestamp> output =
consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L));

Map<TopicPartition, OffsetAndTimestamp> expected = new HashMap<>();
expected.put(partition2, new OffsetAndTimestamp(22, 2000));
expected.put(partition4, null);
assertThat(output).isEqualTo(expected);
}

@Test
public void offsetsForTimesFailure() {
TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2);
TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4);
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000)))
.thenReturn(
ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build())));
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000)))
.thenReturn(
ApiFutures.immediateFailedFuture(new CheckedApiException(Code.UNAVAILABLE).underlying));

assertThrows(
BrokerNotAvailableException.class,
() -> consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L)));
}

@Test
public void close() {
consumer.close();
Expand Down

0 comments on commit a785e53

Please sign in to comment.