From 58e2e609ed9d9c73ed817e7ddf2c5be930e66339 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Sat, 20 Mar 2021 04:59:48 +1100 Subject: [PATCH] feat: Implement Consumer.endOffsets (#102) --- .../pubsublite/kafka/ConsumerSettings.java | 13 ++++++- .../pubsublite/kafka/PubsubLiteConsumer.java | 34 +++++++++++++++++-- .../kafka/PubsubLiteConsumerTest.java | 24 ++++++++++--- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index c2ace41b..f7b123b0 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -29,6 +29,8 @@ import com.google.cloud.pubsublite.internal.BufferingPullSubscriber; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.CursorClientSettings; +import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.internal.TopicStatsClientSettings; import com.google.cloud.pubsublite.internal.wire.AssignerFactory; import com.google.cloud.pubsublite.internal.wire.AssignerSettings; import com.google.cloud.pubsublite.internal.wire.CommitterSettings; @@ -148,12 +150,21 @@ public Consumer instantiate() throws ApiException { CursorClient cursorClient = CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build()); + TopicStatsClient topicStatsClient = + TopicStatsClient.create( + TopicStatsClientSettings.newBuilder().setRegion(zone.region()).build()); SharedBehavior shared = new SharedBehavior( AdminClient.create( AdminClientSettings.newBuilder().setRegion(topic.location().region()).build())); return new PubsubLiteConsumer( - subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient); + subscriptionPath(), + topic, + shared, + consumerFactory, + assignerFactory, + cursorClient, + topicStatsClient); } catch (Exception e) { throw toCanonical(e).underlying; } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index ab4facc4..251cffff 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; +import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; @@ -26,6 +27,7 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.TopicStatsClient; import com.google.cloud.pubsublite.internal.wire.Assigner; import com.google.cloud.pubsublite.internal.wire.AssignerFactory; import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver; @@ -74,6 +76,7 @@ class PubsubLiteConsumer implements Consumer { private final ConsumerFactory consumerFactory; private final AssignerFactory assignerFactory; private final CursorClient cursorClient; + private final TopicStatsClient topicStatsClient; private Optional assigner = Optional.empty(); private Optional consumer = Optional.empty(); @@ -83,13 +86,15 @@ class PubsubLiteConsumer implements Consumer { SharedBehavior shared, ConsumerFactory consumerFactory, AssignerFactory assignerFactory, - CursorClient cursorClient) { + CursorClient cursorClient, + TopicStatsClient topicStatsClient) { this.subscriptionPath = subscriptionPath; this.topicPath = topicPath; this.shared = shared; this.consumerFactory = consumerFactory; this.assignerFactory = assignerFactory; this.cursorClient = cursorClient; + this.topicStatsClient = topicStatsClient; } private TopicPartition toTopicPartition(Partition partition) { @@ -490,8 +495,25 @@ public Map endOffsets(Collection collectio @Override public Map endOffsets( Collection collection, Duration duration) { - throw new UnsupportedVersionException( - "Pub/Sub Lite does not support Consumer backlog introspection."); + try { + Map> cursors = + collection.stream() + .collect( + Collectors.toMap( + topicPartition -> topicPartition, + topicPartition -> + topicStatsClient.computeHeadCursor( + topicPath, checkTopicGetPartition(topicPartition)))); + ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS); + + ImmutableMap.Builder output = ImmutableMap.builder(); + for (Map.Entry> entry : cursors.entrySet()) { + output.put(entry.getKey(), entry.getValue().get().getOffset()); + } + return output.build(); + } catch (Throwable t) { + throw toKafka(t); + } } @Override @@ -511,6 +533,12 @@ public void close(Duration timeout) { } catch (Exception e) { logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown."); } + try { + topicStatsClient.close(); + } catch (Exception e) { + logger.atSevere().withCause(e).log( + "Error closing topic stats client during Consumer shutdown."); + } try { shared.close(); } catch (Exception e) { diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index b9c9bad9..b618b249 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -39,6 +39,7 @@ import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CursorClient; +import com.google.cloud.pubsublite.internal.TopicStatsClient; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; import com.google.cloud.pubsublite.internal.wire.Assigner; import com.google.cloud.pubsublite.internal.wire.AssignerFactory; @@ -54,6 +55,7 @@ import com.google.common.reflect.ImmutableTypeToInstanceMap; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; @@ -102,6 +104,7 @@ private static T example(Class klass) { @Mock AssignerFactory assignerFactory; @Mock CursorClient cursorClient; @Mock AdminClient adminClient; + @Mock TopicStatsClient topicStatsClient; @Mock Assigner assigner; @Mock SingleSubscriptionConsumer underlying; @@ -118,7 +121,8 @@ public void setUp() { new SharedBehavior(adminClient), consumerFactory, assignerFactory, - cursorClient); + cursorClient, + topicStatsClient); when(consumerFactory.newConsumer()).thenReturn(underlying); } @@ -140,10 +144,6 @@ public void unsupportedOperations() { assertThrows( UnsupportedVersionException.class, () -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO)); - assertThrows(UnsupportedVersionException.class, () -> consumer.endOffsets(ImmutableList.of())); - assertThrows( - UnsupportedVersionException.class, - () -> consumer.endOffsets(ImmutableList.of(), Duration.ZERO)); } @Test @@ -468,10 +468,24 @@ public void partitionsFor() { assertThat(info.size()).isEqualTo(2L); } + @Test + public void endOffsets() { + TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2); + TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4); + when(topicStatsClient.computeHeadCursor(example(TopicPath.class), Partition.of(2))) + .thenReturn(ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(22).build())); + when(topicStatsClient.computeHeadCursor(example(TopicPath.class), Partition.of(4))) + .thenReturn(ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(44).build())); + Map output = + consumer.endOffsets(ImmutableList.of(partition2, partition4)); + assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L)); + } + @Test public void close() { consumer.close(); verify(adminClient).close(); verify(cursorClient).close(); + verify(topicStatsClient).close(); } }