Skip to content

Commit

Permalink
feat: Implement Consumer.endOffsets (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Mar 19, 2021
1 parent 8ca9b61 commit 58e2e60
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 9 deletions.
Expand Up @@ -29,6 +29,8 @@
import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.CursorClientSettings;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.AssignerSettings;
import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
Expand Down Expand Up @@ -148,12 +150,21 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {

CursorClient cursorClient =
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());
TopicStatsClient topicStatsClient =
TopicStatsClient.create(
TopicStatsClientSettings.newBuilder().setRegion(zone.region()).build());
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
return new PubsubLiteConsumer(
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
subscriptionPath(),
topic,
shared,
consumerFactory,
assignerFactory,
cursorClient,
topicStatsClient);
} catch (Exception e) {
throw toCanonical(e).underlying;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
Expand All @@ -26,6 +27,7 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver;
Expand Down Expand Up @@ -74,6 +76,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
private final TopicStatsClient topicStatsClient;
private Optional<Assigner> assigner = Optional.empty();
private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

Expand All @@ -83,13 +86,15 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
SharedBehavior shared,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient) {
CursorClient cursorClient,
TopicStatsClient topicStatsClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
this.topicStatsClient = topicStatsClient;
}

private TopicPartition toTopicPartition(Partition partition) {
Expand Down Expand Up @@ -490,8 +495,25 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collectio
@Override
public Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> collection, Duration duration) {
throw new UnsupportedVersionException(
"Pub/Sub Lite does not support Consumer backlog introspection.");
try {
Map<TopicPartition, ApiFuture<Cursor>> cursors =
collection.stream()
.collect(
Collectors.toMap(
topicPartition -> topicPartition,
topicPartition ->
topicStatsClient.computeHeadCursor(
topicPath, checkTopicGetPartition(topicPartition))));
ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);

ImmutableMap.Builder<TopicPartition, Long> output = ImmutableMap.builder();
for (Map.Entry<TopicPartition, ApiFuture<Cursor>> entry : cursors.entrySet()) {
output.put(entry.getKey(), entry.getValue().get().getOffset());
}
return output.build();
} catch (Throwable t) {
throw toKafka(t);
}
}

@Override
Expand All @@ -511,6 +533,12 @@ public void close(Duration timeout) {
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
try {
topicStatsClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Error closing topic stats client during Consumer shutdown.");
}
try {
shared.close();
} catch (Exception e) {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
Expand All @@ -54,6 +55,7 @@
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -102,6 +104,7 @@ private static <T> T example(Class<T> klass) {
@Mock AssignerFactory assignerFactory;
@Mock CursorClient cursorClient;
@Mock AdminClient adminClient;
@Mock TopicStatsClient topicStatsClient;

@Mock Assigner assigner;
@Mock SingleSubscriptionConsumer underlying;
Expand All @@ -118,7 +121,8 @@ public void setUp() {
new SharedBehavior(adminClient),
consumerFactory,
assignerFactory,
cursorClient);
cursorClient,
topicStatsClient);
when(consumerFactory.newConsumer()).thenReturn(underlying);
}

Expand All @@ -140,10 +144,6 @@ public void unsupportedOperations() {
assertThrows(
UnsupportedVersionException.class,
() -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO));
assertThrows(UnsupportedVersionException.class, () -> consumer.endOffsets(ImmutableList.of()));
assertThrows(
UnsupportedVersionException.class,
() -> consumer.endOffsets(ImmutableList.of(), Duration.ZERO));
}

@Test
Expand Down Expand Up @@ -468,10 +468,24 @@ public void partitionsFor() {
assertThat(info.size()).isEqualTo(2L);
}

@Test
public void endOffsets() {
TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2);
TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4);
when(topicStatsClient.computeHeadCursor(example(TopicPath.class), Partition.of(2)))
.thenReturn(ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(22).build()));
when(topicStatsClient.computeHeadCursor(example(TopicPath.class), Partition.of(4)))
.thenReturn(ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(44).build()));
Map<TopicPartition, Long> output =
consumer.endOffsets(ImmutableList.of(partition2, partition4));
assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L));
}

@Test
public void close() {
consumer.close();
verify(adminClient).close();
verify(cursorClient).close();
verify(topicStatsClient).close();
}
}

0 comments on commit 58e2e60

Please sign in to comment.