diff --git a/.readme-partials.yaml b/.readme-partials.yaml index 438c755f..92fb2cae 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -80,3 +80,9 @@ about: | - Producers operate on a single topic, and Consumers on a single subscription. - ProducerRecord may not specify partition explicitly. - Consumers may not dynamically create consumer groups (subscriptions). + + Note: + - In order to use Pub/Sub Lite [seek operations](https://cloud.google.com/pubsub/lite/docs/seek), + Consumers must have auto-commit enabled. Consumer seek methods are client-initiated, whereas + Pub/Sub Lite seek operations are initiated out-of-band and pushed to Consumers. Both types of + seeks should not be used concurrently, as they would interfere with one another. diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 1e7df616..498e69ed 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -128,7 +128,7 @@ public Consumer instantiate() throws ApiException { } }; PullSubscriberFactory pullSubscriberFactory = - (partition, initialSeek) -> { + (partition, initialSeek, resetHandler) -> { SubscriberFactory subscriberFactory = consumer -> { try { @@ -145,6 +145,7 @@ public Consumer instantiate() throws ApiException { RoutingMetadata.of(subscriptionPath(), partition), SubscriberServiceSettings.newBuilder())))) .setInitialLocation(initialSeek) + .setResetHandler(resetHandler) .build(); } catch (Throwable t) { throw toCanonical(t).underlying; diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java index e72ba77d..615a565c 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java @@ -19,10 +19,12 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler; import com.google.cloud.pubsublite.proto.SeekRequest; /** A factory for making new PullSubscribers for a given partition of a subscription. */ interface PullSubscriberFactory { - BlockingPullSubscriber newPullSubscriber(Partition partition, SeekRequest initial) + BlockingPullSubscriber newPullSubscriber( + Partition partition, SeekRequest initial, SubscriberResetHandler resetHandler) throws CheckedApiException; } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java b/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java new file mode 100644 index 00000000..28806255 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java @@ -0,0 +1,168 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.CloseableMonitor; +import com.google.cloud.pubsublite.internal.ProxyService; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.ArrayDeque; +import java.util.Optional; + +/** Pulls messages and manages commits for a single partition of a subscription. */ +class SinglePartitionSubscriber extends ProxyService { + private final PullSubscriberFactory subscriberFactory; + private final Partition partition; + private final Committer committer; + private final boolean enableReset; + + private final CloseableMonitor monitor = new CloseableMonitor(); + + @GuardedBy("monitor.monitor") + private BlockingPullSubscriber subscriber; + + @GuardedBy("monitor.monitor") + private boolean needsCommitting = false; + + @GuardedBy("monitor.monitor") + private Optional lastReceived = Optional.empty(); + + SinglePartitionSubscriber( + PullSubscriberFactory subscriberFactory, + Partition partition, + SeekRequest initialSeek, + Committer committer, + boolean enableReset) + throws CheckedApiException { + this.subscriberFactory = subscriberFactory; + this.partition = partition; + this.committer = committer; + this.enableReset = enableReset; + this.subscriber = + subscriberFactory.newPullSubscriber(partition, initialSeek, this::onSubscriberReset); + addServices(committer); + } + + // ProxyService implementation. + @Override + protected void start() {} + + @Override + protected void stop() { + try (CloseableMonitor.Hold h = monitor.enter()) { + subscriber.close(); + } + } + + @Override + protected void handlePermanentError(CheckedApiException error) { + stop(); + } + + /** Executes a client-initiated seek. */ + void clientSeek(SeekRequest request) throws CheckedApiException { + try (CloseableMonitor.Hold h = monitor.enter()) { + subscriber.close(); + subscriber = subscriberFactory.newPullSubscriber(partition, request, this::onSubscriberReset); + } + } + + ApiFuture onData() { + try (CloseableMonitor.Hold h = monitor.enter()) { + return subscriber.onData(); + } + } + + @GuardedBy("monitor.monitor") + private ArrayDeque pullMessages() throws CheckedApiException { + ArrayDeque messages = new ArrayDeque<>(); + for (Optional message = subscriber.messageIfAvailable(); + message.isPresent(); + message = subscriber.messageIfAvailable()) { + messages.add(message.get()); + } + return messages; + } + + /** Pulls all available messages. */ + ArrayDeque getMessages() throws CheckedApiException { + try (CloseableMonitor.Hold h = monitor.enter()) { + ArrayDeque messages = pullMessages(); + if (!messages.isEmpty()) { + lastReceived = Optional.of(Iterables.getLast(messages).offset()); + needsCommitting = true; + } + return messages; + } + } + + Optional position() { + try (CloseableMonitor.Hold h = monitor.enter()) { + return lastReceived.map(lastReceived -> lastReceived.value() + 1); + } + } + + /** Executes a client-initiated commit. */ + ApiFuture commitOffset(Offset offset) { + return committer.commitOffset(offset); + } + + /** Auto-commits the offset of the last received message. */ + Optional> autoCommit() { + try (CloseableMonitor.Hold h = monitor.enter()) { + if (!needsCommitting) return Optional.empty(); + checkState(lastReceived.isPresent()); + needsCommitting = false; + // The Pub/Sub Lite commit offset is one more than the last received. + Offset toCommit = Offset.of(lastReceived.get().value() + 1); + return Optional.of( + ApiFutures.transform( + committer.commitOffset(toCommit), + ignored -> toCommit, + MoreExecutors.directExecutor())); + } + } + + private boolean onSubscriberReset() throws CheckedApiException { + if (!enableReset) { + return false; + } + + // Handle an out-of-band seek notification from the server. There must be no pending commits + // after this function returns. + try (CloseableMonitor.Hold h = monitor.enter()) { + // Discard undelivered messages. + pullMessages(); + // Prevent further auto-commits until post-seek messages are received. + needsCommitting = false; + } + committer.waitUntilEmpty(); + return true; + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java index 5a5adca9..f80b18cd 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -16,8 +16,8 @@ package com.google.cloud.pubsublite.kafka; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; -import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.api.core.ApiFuture; @@ -28,21 +28,19 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.time.Duration; -import java.util.ArrayDeque; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,15 +67,8 @@ class SingleSubscriptionConsumerImpl implements SingleSubscriptionConsumer { private final CloseableMonitor monitor = new CloseableMonitor(); - static class SubscriberState { - BlockingPullSubscriber subscriber; - Committer committer; - boolean needsCommitting = false; - Optional lastReceived = Optional.empty(); - } - @GuardedBy("monitor.monitor") - private final Map partitions = new HashMap<>(); + private final Map partitions = new HashMap<>(); // When the set of assignments changes, this future will be set and swapped with a new future to // let ongoing pollers know that they should pick up new assignments. @GuardedBy("monitor.monitor") @@ -102,30 +93,28 @@ static class SubscriberState { public void setAssignment(Set assignment) { try (CloseableMonitor.Hold h = monitor.enter()) { - List unassigned = + List unassigned = ImmutableSet.copyOf(partitions.keySet()).stream() .filter(p -> !assignment.contains(p)) .map(partitions::remove) .collect(Collectors.toList()); - for (SubscriberState state : unassigned) { - state.subscriber.close(); - state.committer.stopAsync().awaitTerminated(); - } + blockingShutdown(unassigned); assignment.stream() .filter(p -> !partitions.containsKey(p)) .forEach( ExtractStatus.rethrowAsRuntime( partition -> { - SubscriberState s = new SubscriberState(); - s.subscriber = - subscriberFactory.newPullSubscriber( + SinglePartitionSubscriber subscriber = + new SinglePartitionSubscriber( + subscriberFactory, partition, SeekRequest.newBuilder() .setNamedTarget(NamedTarget.COMMITTED_CURSOR) - .build()); - s.committer = committerFactory.newCommitter(partition); - s.committer.startAsync().awaitRunning(); - partitions.put(partition, s); + .build(), + committerFactory.newCommitter(partition), + autocommit); + subscriber.startAsync().awaitRunning(); + partitions.put(partition, subscriber); })); assignmentChanged.set(null); assignmentChanged = SettableApiFuture.create(); @@ -141,30 +130,13 @@ public Set assignment() { } } - @GuardedBy("monitor.monitor") - private Map> fetchAll() { - Map> partitionQueues = new HashMap<>(); - partitions.forEach( - ExtractStatus.rethrowAsRuntime( - (partition, state) -> { - ArrayDeque messages = new ArrayDeque<>(); - for (Optional message = state.subscriber.messageIfAvailable(); - message.isPresent(); - message = state.subscriber.messageIfAvailable()) { - messages.add(message.get()); - } - partitionQueues.put(partition, messages); - })); - return partitionQueues; - } - private Map> doPoll(Duration duration) { try { ImmutableList.Builder> stopSleepingSignals = ImmutableList.builder(); try (CloseableMonitor.Hold h = monitor.enter()) { stopSleepingSignals.add(wakeupTriggered); stopSleepingSignals.add(assignmentChanged); - partitions.values().forEach(state -> stopSleepingSignals.add(state.subscriber.onData())); + partitions.values().forEach(subscriber -> stopSleepingSignals.add(subscriber.onData())); } try { ApiFuturesExtensions.whenFirstDone(stopSleepingSignals.build()) @@ -174,7 +146,12 @@ private Map> doPoll(Duration duration) { } try (CloseableMonitor.Hold h = monitor.enter()) { if (wakeupTriggered.isDone()) throw new WakeupException(); - return fetchAll(); + Map> partitionQueues = new HashMap<>(); + partitions.forEach( + ExtractStatus.rethrowAsRuntime( + (partition, subscriber) -> + partitionQueues.put(partition, subscriber.getMessages()))); + return partitionQueues; } } catch (Throwable t) { throw toKafka(t); @@ -203,12 +180,6 @@ public void onSuccess(Object result) {} partitionQueues.forEach( (partition, queue) -> { if (queue.isEmpty()) return; - try (CloseableMonitor.Hold h = monitor.enter()) { - SubscriberState state = partitions.getOrDefault(partition, null); - if (state == null) return; - state.lastReceived = Optional.of(Iterables.getLast(queue).offset()); - state.needsCommitting = true; - } List> partitionRecords = queue.stream() .map(message -> RecordTransforms.fromMessage(message, topic, partition)) @@ -222,22 +193,22 @@ public void onSuccess(Object result) {} @Override public ApiFuture> commitAll() { try (CloseableMonitor.Hold h = monitor.enter()) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - ImmutableList.Builder> commitFutures = ImmutableList.builder(); + List>> commitFutures = new ArrayList<>(); partitions.forEach( - (partition, state) -> { - if (!state.needsCommitting) return; - checkState(state.lastReceived.isPresent()); - state.needsCommitting = false; - // The Pub/Sub Lite commit offset is one more than the last received. - Offset toCommit = Offset.of(state.lastReceived.get().value() + 1); - builder.put(partition, toCommit); - commitFutures.add(state.committer.commitOffset(toCommit)); + (partition, subscriber) -> { + Optional> commitFuture = subscriber.autoCommit(); + if (!commitFuture.isPresent()) return; + commitFutures.add( + ApiFutures.transform( + commitFuture.get(), + offset -> new SimpleEntry<>(partition, offset), + MoreExecutors.directExecutor())); }); - Map map = builder.build(); return ApiFutures.transform( - ApiFutures.allAsList(commitFutures.build()), - ignored -> map, + ApiFutures.allAsList(commitFutures), + results -> + ImmutableMap.copyOf( + results.stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))), MoreExecutors.directExecutor()); } } @@ -255,7 +226,7 @@ public ApiFuture commit(Map commitOffsets) { + partition.value() + " which is not assigned to this consumer."); } - commitFutures.add(partitions.get(partition).committer.commitOffset(offset)); + commitFutures.add(partitions.get(partition).commitOffset(offset)); }); return ApiFutures.transform( ApiFutures.allAsList(commitFutures.build()), @@ -273,9 +244,7 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti + partition.value() + " which is not assigned to this consumer."); } - SubscriberState state = partitions.get(partition); - state.subscriber.close(); - state.subscriber = subscriberFactory.newPullSubscriber(partition, request); + partitions.get(partition).clientSeek(request); } catch (IllegalStateException e) { throw e; } catch (Throwable t) { @@ -287,17 +256,14 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti public Optional position(Partition partition) { try (CloseableMonitor.Hold h = monitor.enter()) { if (!partitions.containsKey(partition)) return Optional.empty(); - return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1); + return partitions.get(partition).position(); } } @Override public void close(Duration duration) { try (CloseableMonitor.Hold h = monitor.enter()) { - for (SubscriberState state : partitions.values()) { - state.subscriber.close(); - state.committer.stopAsync().awaitTerminated(); - } + blockingShutdown(partitions.values()); } catch (Throwable t) { throw toKafka(t); } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java new file mode 100644 index 00000000..4b3e2482 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.protobuf.Timestamp; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Spy; + +@RunWith(JUnit4.class) +public class SinglePartitionSubscriberTest { + private static final SeekRequest INITIAL_SEEK = + SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build(); + private static final Partition PARTITION = Partition.of(2); + + abstract static class FakeCommitter extends FakeApiService implements Committer {} + + @Mock PullSubscriberFactory subscriberFactory; + @Mock BlockingPullSubscriber pullSubscriber; + @Spy FakeCommitter committer; + + @Captor private ArgumentCaptor resetHandlerCaptor; + + private SinglePartitionSubscriber subscriber; + + @Before + public void setUp() throws CheckedApiException { + initMocks(this); + when(subscriberFactory.newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), any())) + .thenReturn(pullSubscriber); + } + + @After + public void tearDown() throws Exception { + verifyNoMoreInteractions(subscriberFactory); + verifyNoMoreInteractions(pullSubscriber); + verifyNoMoreInteractions(committer); + } + + private static SequencedMessage message(long offset) { + return SequencedMessage.of( + Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L); + } + + @Test + public void pullAndCommit() throws Exception { + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer, true); + verify(subscriberFactory).newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), any()); + verify(committer).state(); + + // Pull messages. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.of(message(5))) + .thenReturn(Optional.of(message(7))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + assertThat(subscriber.position()).hasValue(8); + verify(pullSubscriber, times(4)).messageIfAvailable(); + + // Auto commit handled. + when(committer.commitOffset(Offset.of(8))).thenReturn(ApiFutures.immediateFuture(null)); + subscriber.autoCommit(); + verify(committer).commitOffset(Offset.of(8)); + + // Second auto commit does nothing. + subscriber.autoCommit(); + } + + @Test + public void resetSubscriberEnabled() throws Exception { + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer, true); + verify(subscriberFactory) + .newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), resetHandlerCaptor.capture()); + verify(committer).state(); + + // Pull messages. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.of(message(5))) + .thenReturn(Optional.of(message(7))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + + // Subscriber reset handled. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(9))) + .thenReturn(Optional.empty()); + assertThat(resetHandlerCaptor.getValue().handleReset()).isTrue(); + verify(committer).waitUntilEmpty(); + verify(pullSubscriber, times(6)).messageIfAvailable(); + + // Undelivered messages are discarded. + assertThat(subscriber.position()).hasValue(8); + + // Auto commit does nothing. + subscriber.autoCommit(); + + // Pull messages after reset. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(2))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(2)); + assertThat(subscriber.position()).hasValue(3); + verify(pullSubscriber, times(8)).messageIfAvailable(); + + // Auto commit handled. + when(committer.commitOffset(Offset.of(3))).thenReturn(ApiFutures.immediateFuture(null)); + subscriber.autoCommit(); + verify(committer).commitOffset(Offset.of(3)); + } + + @Test + public void resetSubscriberDisabled() throws Exception { + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer, false); + verify(subscriberFactory) + .newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), resetHandlerCaptor.capture()); + verify(committer).state(); + + // Pull messages. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.of(message(5))) + .thenReturn(Optional.of(message(7))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + assertThat(subscriber.position()).hasValue(8); + verify(pullSubscriber, times(4)).messageIfAvailable(); + + // Subscriber reset not handled. + assertThat(resetHandlerCaptor.getValue().handleReset()).isFalse(); + + // Auto commit handled. + when(committer.commitOffset(Offset.of(8))).thenReturn(ApiFutures.immediateFuture(null)); + subscriber.autoCommit(); + verify(committer).commitOffset(Offset.of(8)); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java index b0a429bd..48855977 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java @@ -93,8 +93,10 @@ public void setUp() throws CheckedApiException { new SingleSubscriptionConsumerImpl( example(TopicPath.class), false, subscriberFactory, committerFactory); verifyNoInteractions(subscriberFactory, committerFactory); - when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), any())).thenReturn(subscriber5); - when(subscriberFactory.newPullSubscriber(eq(Partition.of(8)), any())).thenReturn(subscriber8); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), any(), any())) + .thenReturn(subscriber5); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(8)), any(), any())) + .thenReturn(subscriber8); when(committerFactory.newCommitter(Partition.of(5))).thenReturn(committer5); when(committerFactory.newCommitter(Partition.of(8))).thenReturn(committer8); } @@ -120,8 +122,8 @@ private static void assertConsumerRecordsEqual( @Test public void assignAndPoll() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5), Partition.of(8))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); - verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(8)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); verify(committerFactory).newCommitter(Partition.of(8)); // ----------------------------- @@ -191,9 +193,7 @@ public void assignAndPoll() throws Exception { verify(subscriber5).close(); verify(subscriber8).close(); verify(committer5).stopAsync(); - verify(committer5).awaitTerminated(); verify(committer8).stopAsync(); - verify(committer8).awaitTerminated(); } @Test @@ -202,8 +202,8 @@ public void assignAndPollAutocommit() throws Exception { new SingleSubscriptionConsumerImpl( example(TopicPath.class), true, subscriberFactory, committerFactory); consumer.setAssignment(ImmutableSet.of(Partition.of(5), Partition.of(8))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); - verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(8)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); verify(committerFactory).newCommitter(Partition.of(8)); // ----------------------------- @@ -264,9 +264,7 @@ public void assignAndPollAutocommit() throws Exception { verify(subscriber5).close(); verify(subscriber8).close(); verify(committer5).stopAsync(); - verify(committer5).awaitTerminated(); verify(committer8).stopAsync(); - verify(committer8).awaitTerminated(); } @Test @@ -293,12 +291,12 @@ public void wakeupDuringPoll() { public void assignmentChange() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(5))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); verify(committer5).startAsync(); consumer.setAssignment(ImmutableSet.of(Partition.of(8))); assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(8))); - verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(8)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(8)); verify(committer8).startAsync(); verify(subscriber5).close(); @@ -309,7 +307,7 @@ public void assignmentChange() throws Exception { public void assignmentChangeMakesPollReturn() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(5))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); SettableApiFuture pollRunning = SettableApiFuture.create(); when(subscriber5.onData()) @@ -358,12 +356,13 @@ public void seekNotAssigned() throws Exception { @Test public void seekAssigned() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); - when(subscriberFactory.newPullSubscriber(Partition.of(5), OFFSET_SEEK)).thenReturn(subscriber8); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), eq(OFFSET_SEEK), any())) + .thenReturn(subscriber8); consumer.doSeek(Partition.of(5), OFFSET_SEEK); verify(subscriber5).close(); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), OFFSET_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(OFFSET_SEEK), any()); } @Test