Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Consumer.endOffsets #102

Merged
merged 1 commit into from Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,6 +29,8 @@
import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.CursorClientSettings;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.AssignerSettings;
import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
Expand Down Expand Up @@ -148,12 +150,21 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {

CursorClient cursorClient =
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());
TopicStatsClient topicStatsClient =
TopicStatsClient.create(
TopicStatsClientSettings.newBuilder().setRegion(zone.region()).build());
SharedBehavior shared =
new SharedBehavior(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
return new PubsubLiteConsumer(
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
subscriptionPath(),
topic,
shared,
consumerFactory,
assignerFactory,
cursorClient,
topicStatsClient);
} catch (Exception e) {
throw toCanonical(e).underlying;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
Expand All @@ -26,6 +27,7 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.PartitionAssignmentReceiver;
Expand Down Expand Up @@ -74,6 +76,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final CursorClient cursorClient;
private final TopicStatsClient topicStatsClient;
private Optional<Assigner> assigner = Optional.empty();
private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

Expand All @@ -83,13 +86,15 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
SharedBehavior shared,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
CursorClient cursorClient) {
CursorClient cursorClient,
TopicStatsClient topicStatsClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.shared = shared;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.cursorClient = cursorClient;
this.topicStatsClient = topicStatsClient;
}

private TopicPartition toTopicPartition(Partition partition) {
Expand Down Expand Up @@ -490,8 +495,25 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collectio
@Override
public Map<TopicPartition, Long> endOffsets(
Collection<TopicPartition> collection, Duration duration) {
throw new UnsupportedVersionException(
"Pub/Sub Lite does not support Consumer backlog introspection.");
try {
Map<TopicPartition, ApiFuture<Cursor>> cursors =
collection.stream()
.collect(
Collectors.toMap(
topicPartition -> topicPartition,
topicPartition ->
topicStatsClient.computeHeadCursor(
topicPath, checkTopicGetPartition(topicPartition))));
ApiFutures.allAsList(cursors.values()).get(duration.toMillis(), TimeUnit.MILLISECONDS);

ImmutableMap.Builder<TopicPartition, Long> output = ImmutableMap.builder();
for (Map.Entry<TopicPartition, ApiFuture<Cursor>> entry : cursors.entrySet()) {
output.put(entry.getKey(), entry.getValue().get().getOffset());
}
return output.build();
} catch (Throwable t) {
throw toKafka(t);
}
}

@Override
Expand All @@ -511,6 +533,12 @@ public void close(Duration timeout) {
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
}
try {
topicStatsClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Error closing topic stats client during Consumer shutdown.");
}
try {
shared.close();
} catch (Exception e) {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
Expand All @@ -54,6 +55,7 @@
import com.google.common.reflect.ImmutableTypeToInstanceMap;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -102,6 +104,7 @@ private static <T> T example(Class<T> klass) {
@Mock AssignerFactory assignerFactory;
@Mock CursorClient cursorClient;
@Mock AdminClient adminClient;
@Mock TopicStatsClient topicStatsClient;

@Mock Assigner assigner;
@Mock SingleSubscriptionConsumer underlying;
Expand All @@ -118,7 +121,8 @@ public void setUp() {
new SharedBehavior(adminClient),
consumerFactory,
assignerFactory,
cursorClient);
cursorClient,
topicStatsClient);
when(consumerFactory.newConsumer()).thenReturn(underlying);
}

Expand All @@ -140,10 +144,6 @@ public void unsupportedOperations() {
assertThrows(
UnsupportedVersionException.class,
() -> consumer.offsetsForTimes(ImmutableMap.of(), Duration.ZERO));
assertThrows(UnsupportedVersionException.class, () -> consumer.endOffsets(ImmutableList.of()));
assertThrows(
UnsupportedVersionException.class,
() -> consumer.endOffsets(ImmutableList.of(), Duration.ZERO));
}

@Test
Expand Down Expand Up @@ -468,10 +468,24 @@ public void partitionsFor() {
assertThat(info.size()).isEqualTo(2L);
}

@Test
public void endOffsets() {
TopicPartition partition2 = new TopicPartition(example(TopicPath.class).toString(), 2);
TopicPartition partition4 = new TopicPartition(example(TopicPath.class).toString(), 4);
when(topicStatsClient.computeHeadCursor(example(TopicPath.class), Partition.of(2)))
.thenReturn(ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(22).build()));
when(topicStatsClient.computeHeadCursor(example(TopicPath.class), Partition.of(4)))
.thenReturn(ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(44).build()));
Map<TopicPartition, Long> output =
consumer.endOffsets(ImmutableList.of(partition2, partition4));
assertThat(output).isEqualTo(ImmutableMap.of(partition2, 22L, partition4, 44L));
}

@Test
public void close() {
consumer.close();
verify(adminClient).close();
verify(cursorClient).close();
verify(topicStatsClient).close();
}
}