Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Consumer.offsetsForTimes #123

Merged
merged 5 commits into from May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,11 +34,14 @@
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -59,7 +62,6 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;

/**
* A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka
Expand Down Expand Up @@ -466,8 +468,36 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> map, Duration duration) {
throw new UnsupportedVersionException(
"Pub/Sub Lite does not support Consumer backlog introspection.");
try {
Map<TopicPartition, ApiFuture<Optional<Cursor>>> cursors =
map.entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry ->
topicStatsClient.computeCursorForEventTime(
topicPath,
checkTopicGetPartition(entry.getKey()),
Timestamps.fromMillis(entry.getValue()))));
ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);

Map<TopicPartition, OffsetAndTimestamp> output = new HashMap<>();
for (Map.Entry<TopicPartition, ApiFuture<Optional<Cursor>>> entry : cursors.entrySet()) {
// As per KafkaConsumer.offsetsForTimes, null is returned if there is no result for the
// partition.
OffsetAndTimestamp offsetAndTime = null;
Optional<Cursor> cursor = entry.getValue().get();
if (cursor.isPresent()) {
offsetAndTime =
new OffsetAndTimestamp(
cursor.get().getOffset(), Preconditions.checkNotNull(map.get(entry.getKey())));
}
output.put(entry.getKey(), offsetAndTime);
}
return output;
} catch (Throwable t) {
throw toKafka(t);
}
}

@Override
Expand Down
Expand Up @@ -53,22 +53,25 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimaps;
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import com.google.protobuf.util.Timestamps;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -139,11 +142,6 @@ public void unsupportedOperations() {
UnsupportedOperationException.class,
() ->
consumer.subscribe(ImmutableList.of("a", "b"), mock(ConsumerRebalanceListener.class)));
assertThrows(
UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of()));
assertThrows(
UnsupportedVersionException.class,
() -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO));
}

@Test
Expand Down Expand Up @@ -481,6 +479,26 @@ public void endOffsets() {
assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L));
}

@Test
public void offsetsForTimes() {
TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2);
TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4);
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(2), Timestamps.fromMillis(2000)))
.thenReturn(
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
ApiFutures.immediateFuture(Optional.of(Cursor.newBuilder().setOffset(22).build())));
when(topicStatsClient.computeCursorForEventTime(
example(TopicPath.class), Partition.of(4), Timestamps.fromMillis(4000)))
.thenReturn(ApiFutures.immediateFuture(Optional.empty()));
Map<TopicPartition, OffsetAndTimestamp> output =
consumer.offsetsForTimes(ImmutableMap.of(partition2, 2000L, partition4, 4000L));

Map<TopicPartition, OffsetAndTimestamp> expected = new HashMap<>();
expected.put(partition2, new OffsetAndTimestamp(22, 2000));
expected.put(partition4, null);
assertThat(output).isEqualTo(expected);
}

@Test
public void close() {
consumer.close();
Expand Down