From 4860e95fa8d4f26df0042a343bc59229100c11f4 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Thu, 5 Aug 2021 05:52:34 -0400 Subject: [PATCH 1/3] feat: Handle out-of-band seeks --- .../pubsublite/kafka/ConsumerSettings.java | 3 +- .../kafka/PullSubscriberFactory.java | 4 +- .../kafka/SinglePartitionSubscriber.java | 150 ++++++++++++++++++ .../kafka/SingleSubscriptionConsumerImpl.java | 106 +++++-------- .../kafka/SinglePartitionSubscriberTest.java | 132 +++++++++++++++ .../SingleSubscriptionConsumerImplTest.java | 27 ++-- 6 files changed, 339 insertions(+), 83 deletions(-) create mode 100644 src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java create mode 100644 src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 05e667c4..408a1b4b 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -101,7 +101,7 @@ public Consumer instantiate() throws ApiException { } }; PullSubscriberFactory pullSubscriberFactory = - (partition, initialSeek) -> { + (partition, initialSeek, resetHandler) -> { SubscriberFactory subscriberFactory = consumer -> { try { @@ -118,6 +118,7 @@ public Consumer instantiate() throws ApiException { RoutingMetadata.of(subscriptionPath(), partition), SubscriberServiceSettings.newBuilder())))) .setInitialLocation(initialSeek) + .setResetHandler(resetHandler) .build(); } catch (Throwable t) { throw toCanonical(t).underlying; diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java index e72ba77d..615a565c 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java @@ -19,10 +19,12 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler; import com.google.cloud.pubsublite.proto.SeekRequest; /** A factory for making new PullSubscribers for a given partition of a subscription. */ interface PullSubscriberFactory { - BlockingPullSubscriber newPullSubscriber(Partition partition, SeekRequest initial) + BlockingPullSubscriber newPullSubscriber( + Partition partition, SeekRequest initial, SubscriberResetHandler resetHandler) throws CheckedApiException; } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java b/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java new file mode 100644 index 00000000..e2cb2d15 --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java @@ -0,0 +1,150 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.CloseableMonitor; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.ArrayDeque; +import java.util.Optional; + +/** Pulls messages and manages commits for a single partition of a subscription. */ +class SinglePartitionSubscriber { + private final PullSubscriberFactory subscriberFactory; + private final Partition partition; + private final Committer committer; + + private final CloseableMonitor monitor = new CloseableMonitor(); + + @GuardedBy("monitor.monitor") + private BlockingPullSubscriber subscriber; + + @GuardedBy("monitor.monitor") + private boolean needsCommitting = false; + + @GuardedBy("monitor.monitor") + private Optional lastReceived = Optional.empty(); + + SinglePartitionSubscriber( + PullSubscriberFactory subscriberFactory, + Partition partition, + SeekRequest initialSeek, + Committer committer) + throws CheckedApiException { + this.subscriberFactory = subscriberFactory; + this.partition = partition; + this.committer = committer; + this.subscriber = + subscriberFactory.newPullSubscriber(partition, initialSeek, this::onSubscriberReset); + } + + /** Executes a client-initiated seek. */ + void clientSeek(SeekRequest request) throws CheckedApiException { + try (CloseableMonitor.Hold h = monitor.enter()) { + subscriber.close(); + subscriber = subscriberFactory.newPullSubscriber(partition, request, this::onSubscriberReset); + } + } + + ApiFuture onData() { + try (CloseableMonitor.Hold h = monitor.enter()) { + return subscriber.onData(); + } + } + + @GuardedBy("monitor.monitor") + private ArrayDeque pullMessages() throws CheckedApiException { + ArrayDeque messages = new ArrayDeque<>(); + for (Optional message = subscriber.messageIfAvailable(); + message.isPresent(); + message = subscriber.messageIfAvailable()) { + messages.add(message.get()); + } + return messages; + } + + /** Pulls all available messages. */ + ArrayDeque getMessages() throws CheckedApiException { + try (CloseableMonitor.Hold h = monitor.enter()) { + ArrayDeque messages = pullMessages(); + if (!messages.isEmpty()) { + lastReceived = Optional.of(Iterables.getLast(messages).offset()); + needsCommitting = true; + } + return messages; + } + } + + Optional position() { + try (CloseableMonitor.Hold h = monitor.enter()) { + return lastReceived.map(lastReceived -> lastReceived.value() + 1); + } + } + + /** Executes a client-initiated commit. */ + ApiFuture commitOffset(Offset offset) { + return committer.commitOffset(offset); + } + + /** Auto-commits the offset of the last received message. */ + Optional> autoCommit() { + try (CloseableMonitor.Hold h = monitor.enter()) { + if (!needsCommitting) return Optional.empty(); + checkState(lastReceived.isPresent()); + needsCommitting = false; + // The Pub/Sub Lite commit offset is one more than the last received. + Offset toCommit = Offset.of(lastReceived.get().value() + 1); + return Optional.of( + ApiFutures.transform( + committer.commitOffset(toCommit), + ignored -> toCommit, + MoreExecutors.directExecutor())); + } + } + + void close() throws CheckedApiException { + try (CloseableMonitor.Hold h = monitor.enter()) { + subscriber.close(); + } + committer.stopAsync().awaitTerminated(); + } + + private boolean onSubscriberReset() throws CheckedApiException { + // Handle an out-of-band seek notification from the server. There must be no pending commits + // after this function returns. + try (CloseableMonitor.Hold h = monitor.enter()) { + // Discard undelivered messages. + pullMessages(); + // Prevent further auto-commits until post-seek messages are received. + needsCommitting = false; + } + committer.waitUntilEmpty(); + return true; + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java index 5a5adca9..0af8d703 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -17,7 +17,6 @@ package com.google.cloud.pubsublite.kafka; import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; -import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.api.core.ApiFuture; @@ -28,7 +27,6 @@ import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; -import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Committer; @@ -37,12 +35,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.time.Duration; -import java.util.ArrayDeque; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,15 +67,8 @@ class SingleSubscriptionConsumerImpl implements SingleSubscriptionConsumer { private final CloseableMonitor monitor = new CloseableMonitor(); - static class SubscriberState { - BlockingPullSubscriber subscriber; - Committer committer; - boolean needsCommitting = false; - Optional lastReceived = Optional.empty(); - } - @GuardedBy("monitor.monitor") - private final Map partitions = new HashMap<>(); + private final Map partitions = new HashMap<>(); // When the set of assignments changes, this future will be set and swapped with a new future to // let ongoing pollers know that they should pick up new assignments. @GuardedBy("monitor.monitor") @@ -102,30 +93,30 @@ static class SubscriberState { public void setAssignment(Set assignment) { try (CloseableMonitor.Hold h = monitor.enter()) { - List unassigned = + List unassigned = ImmutableSet.copyOf(partitions.keySet()).stream() .filter(p -> !assignment.contains(p)) .map(partitions::remove) .collect(Collectors.toList()); - for (SubscriberState state : unassigned) { - state.subscriber.close(); - state.committer.stopAsync().awaitTerminated(); + for (SinglePartitionSubscriber subscriber : unassigned) { + subscriber.close(); } assignment.stream() .filter(p -> !partitions.containsKey(p)) .forEach( ExtractStatus.rethrowAsRuntime( partition -> { - SubscriberState s = new SubscriberState(); - s.subscriber = - subscriberFactory.newPullSubscriber( + Committer committer = committerFactory.newCommitter(partition); + committer.startAsync().awaitRunning(); + SinglePartitionSubscriber subscriber = + new SinglePartitionSubscriber( + subscriberFactory, partition, SeekRequest.newBuilder() .setNamedTarget(NamedTarget.COMMITTED_CURSOR) - .build()); - s.committer = committerFactory.newCommitter(partition); - s.committer.startAsync().awaitRunning(); - partitions.put(partition, s); + .build(), + committer); + partitions.put(partition, subscriber); })); assignmentChanged.set(null); assignmentChanged = SettableApiFuture.create(); @@ -141,30 +132,13 @@ public Set assignment() { } } - @GuardedBy("monitor.monitor") - private Map> fetchAll() { - Map> partitionQueues = new HashMap<>(); - partitions.forEach( - ExtractStatus.rethrowAsRuntime( - (partition, state) -> { - ArrayDeque messages = new ArrayDeque<>(); - for (Optional message = state.subscriber.messageIfAvailable(); - message.isPresent(); - message = state.subscriber.messageIfAvailable()) { - messages.add(message.get()); - } - partitionQueues.put(partition, messages); - })); - return partitionQueues; - } - private Map> doPoll(Duration duration) { try { ImmutableList.Builder> stopSleepingSignals = ImmutableList.builder(); try (CloseableMonitor.Hold h = monitor.enter()) { stopSleepingSignals.add(wakeupTriggered); stopSleepingSignals.add(assignmentChanged); - partitions.values().forEach(state -> stopSleepingSignals.add(state.subscriber.onData())); + partitions.values().forEach(subscriber -> stopSleepingSignals.add(subscriber.onData())); } try { ApiFuturesExtensions.whenFirstDone(stopSleepingSignals.build()) @@ -174,7 +148,12 @@ private Map> doPoll(Duration duration) { } try (CloseableMonitor.Hold h = monitor.enter()) { if (wakeupTriggered.isDone()) throw new WakeupException(); - return fetchAll(); + Map> partitionQueues = new HashMap<>(); + partitions.forEach( + ExtractStatus.rethrowAsRuntime( + (partition, subscriber) -> + partitionQueues.put(partition, subscriber.getMessages()))); + return partitionQueues; } } catch (Throwable t) { throw toKafka(t); @@ -203,12 +182,6 @@ public void onSuccess(Object result) {} partitionQueues.forEach( (partition, queue) -> { if (queue.isEmpty()) return; - try (CloseableMonitor.Hold h = monitor.enter()) { - SubscriberState state = partitions.getOrDefault(partition, null); - if (state == null) return; - state.lastReceived = Optional.of(Iterables.getLast(queue).offset()); - state.needsCommitting = true; - } List> partitionRecords = queue.stream() .map(message -> RecordTransforms.fromMessage(message, topic, partition)) @@ -222,22 +195,20 @@ public void onSuccess(Object result) {} @Override public ApiFuture> commitAll() { try (CloseableMonitor.Hold h = monitor.enter()) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - ImmutableList.Builder> commitFutures = ImmutableList.builder(); + List>> commitFutures = new ArrayList<>(); partitions.forEach( - (partition, state) -> { - if (!state.needsCommitting) return; - checkState(state.lastReceived.isPresent()); - state.needsCommitting = false; - // The Pub/Sub Lite commit offset is one more than the last received. - Offset toCommit = Offset.of(state.lastReceived.get().value() + 1); - builder.put(partition, toCommit); - commitFutures.add(state.committer.commitOffset(toCommit)); + (partition, subscriber) -> { + Optional> commitFuture = subscriber.autoCommit(); + if (!commitFuture.isPresent()) return; + commitFutures.add( + ApiFutures.transform( + commitFuture.get(), + offset -> new SimpleEntry<>(partition, offset), + MoreExecutors.directExecutor())); }); - Map map = builder.build(); return ApiFutures.transform( - ApiFutures.allAsList(commitFutures.build()), - ignored -> map, + ApiFutures.allAsList(commitFutures), + results -> results.stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())), MoreExecutors.directExecutor()); } } @@ -255,7 +226,7 @@ public ApiFuture commit(Map commitOffsets) { + partition.value() + " which is not assigned to this consumer."); } - commitFutures.add(partitions.get(partition).committer.commitOffset(offset)); + commitFutures.add(partitions.get(partition).commitOffset(offset)); }); return ApiFutures.transform( ApiFutures.allAsList(commitFutures.build()), @@ -273,9 +244,7 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti + partition.value() + " which is not assigned to this consumer."); } - SubscriberState state = partitions.get(partition); - state.subscriber.close(); - state.subscriber = subscriberFactory.newPullSubscriber(partition, request); + partitions.get(partition).clientSeek(request); } catch (IllegalStateException e) { throw e; } catch (Throwable t) { @@ -287,16 +256,15 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti public Optional position(Partition partition) { try (CloseableMonitor.Hold h = monitor.enter()) { if (!partitions.containsKey(partition)) return Optional.empty(); - return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1); + return partitions.get(partition).position(); } } @Override public void close(Duration duration) { try (CloseableMonitor.Hold h = monitor.enter()) { - for (SubscriberState state : partitions.values()) { - state.subscriber.close(); - state.committer.stopAsync().awaitTerminated(); + for (SinglePartitionSubscriber subscriber : partitions.values()) { + subscriber.close(); } } catch (Throwable t) { throw toKafka(t); diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java new file mode 100644 index 00000000..919e62f3 --- /dev/null +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java @@ -0,0 +1,132 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.protobuf.Timestamp; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +@RunWith(JUnit4.class) +public class SinglePartitionSubscriberTest { + private static final SeekRequest INITIAL_SEEK = + SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build(); + private static final Partition PARTITION = Partition.of(2); + + @Mock PullSubscriberFactory subscriberFactory; + @Mock Committer committer; + @Mock BlockingPullSubscriber pullSubscriber; + + @Captor private ArgumentCaptor resetHandlerCaptor; + + private SinglePartitionSubscriber subscriber; + + @Before + public void setUp() throws CheckedApiException { + initMocks(this); + when(subscriberFactory.newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), any())) + .thenReturn(pullSubscriber); + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer); + } + + @After + public void tearDown() throws Exception { + verifyNoMoreInteractions(subscriberFactory); + verifyNoMoreInteractions(pullSubscriber); + verifyNoMoreInteractions(committer); + } + + private static SequencedMessage message(long offset) { + return SequencedMessage.of( + Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L); + } + + @Test + public void pullAndCommit() throws Exception { + verify(subscriberFactory).newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), any()); + + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.of(message(5))) + .thenReturn(Optional.of(message(7))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + assertThat(subscriber.position()).hasValue(8); + verify(pullSubscriber, times(4)).messageIfAvailable(); + + when(committer.commitOffset(Offset.of(8))).thenReturn(ApiFutures.immediateFuture(null)); + subscriber.autoCommit(); + verify(committer).commitOffset(Offset.of(8)); + + // Second auto commit does nothing. + subscriber.autoCommit(); + } + + @Test + public void resetSubscriber() throws Exception { + verify(subscriberFactory) + .newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), resetHandlerCaptor.capture()); + + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.of(message(5))) + .thenReturn(Optional.of(message(7))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(9))) + .thenReturn(Optional.empty()); + assertThat(resetHandlerCaptor.getValue().handleReset()).isTrue(); + verify(committer).waitUntilEmpty(); + verify(pullSubscriber, times(6)).messageIfAvailable(); + + // Subsequent messages are discarded. + assertThat(subscriber.position()).hasValue(8); + + // Auto commit does nothing. + subscriber.autoCommit(); + } +} diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java index b0a429bd..e32e6e61 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java @@ -93,8 +93,10 @@ public void setUp() throws CheckedApiException { new SingleSubscriptionConsumerImpl( example(TopicPath.class), false, subscriberFactory, committerFactory); verifyNoInteractions(subscriberFactory, committerFactory); - when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), any())).thenReturn(subscriber5); - when(subscriberFactory.newPullSubscriber(eq(Partition.of(8)), any())).thenReturn(subscriber8); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), any(), any())) + .thenReturn(subscriber5); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(8)), any(), any())) + .thenReturn(subscriber8); when(committerFactory.newCommitter(Partition.of(5))).thenReturn(committer5); when(committerFactory.newCommitter(Partition.of(8))).thenReturn(committer8); } @@ -120,8 +122,8 @@ private static void assertConsumerRecordsEqual( @Test public void assignAndPoll() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5), Partition.of(8))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); - verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(8)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); verify(committerFactory).newCommitter(Partition.of(8)); // ----------------------------- @@ -202,8 +204,8 @@ public void assignAndPollAutocommit() throws Exception { new SingleSubscriptionConsumerImpl( example(TopicPath.class), true, subscriberFactory, committerFactory); consumer.setAssignment(ImmutableSet.of(Partition.of(5), Partition.of(8))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); - verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(8)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); verify(committerFactory).newCommitter(Partition.of(8)); // ----------------------------- @@ -293,12 +295,12 @@ public void wakeupDuringPoll() { public void assignmentChange() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(5))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); verify(committer5).startAsync(); consumer.setAssignment(ImmutableSet.of(Partition.of(8))); assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(8))); - verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(8)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(8)); verify(committer8).startAsync(); verify(subscriber5).close(); @@ -309,7 +311,7 @@ public void assignmentChange() throws Exception { public void assignmentChangeMakesPollReturn() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(5))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); SettableApiFuture pollRunning = SettableApiFuture.create(); when(subscriber5.onData()) @@ -358,12 +360,13 @@ public void seekNotAssigned() throws Exception { @Test public void seekAssigned() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(DEFAULT_SEEK), any()); verify(committerFactory).newCommitter(Partition.of(5)); - when(subscriberFactory.newPullSubscriber(Partition.of(5), OFFSET_SEEK)).thenReturn(subscriber8); + when(subscriberFactory.newPullSubscriber(eq(Partition.of(5)), eq(OFFSET_SEEK), any())) + .thenReturn(subscriber8); consumer.doSeek(Partition.of(5), OFFSET_SEEK); verify(subscriber5).close(); - verify(subscriberFactory).newPullSubscriber(Partition.of(5), OFFSET_SEEK); + verify(subscriberFactory).newPullSubscriber(eq(Partition.of(5)), eq(OFFSET_SEEK), any()); } @Test From 5d66c8d3f42174e1436fd2e3751f0dcf82084165 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Mon, 9 Aug 2021 20:58:18 -0400 Subject: [PATCH 2/3] Address review comments --- .../kafka/SinglePartitionSubscriber.java | 34 +++++++--- .../kafka/SingleSubscriptionConsumerImpl.java | 20 +++--- .../kafka/SinglePartitionSubscriberTest.java | 62 +++++++++++++++++-- .../SingleSubscriptionConsumerImplTest.java | 4 -- 4 files changed, 92 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java b/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java index e2cb2d15..28806255 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java @@ -26,6 +26,7 @@ import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CloseableMonitor; +import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.common.collect.Iterables; @@ -35,10 +36,11 @@ import java.util.Optional; /** Pulls messages and manages commits for a single partition of a subscription. */ -class SinglePartitionSubscriber { +class SinglePartitionSubscriber extends ProxyService { private final PullSubscriberFactory subscriberFactory; private final Partition partition; private final Committer committer; + private final boolean enableReset; private final CloseableMonitor monitor = new CloseableMonitor(); @@ -55,13 +57,32 @@ class SinglePartitionSubscriber { PullSubscriberFactory subscriberFactory, Partition partition, SeekRequest initialSeek, - Committer committer) + Committer committer, + boolean enableReset) throws CheckedApiException { this.subscriberFactory = subscriberFactory; this.partition = partition; this.committer = committer; + this.enableReset = enableReset; this.subscriber = subscriberFactory.newPullSubscriber(partition, initialSeek, this::onSubscriberReset); + addServices(committer); + } + + // ProxyService implementation. + @Override + protected void start() {} + + @Override + protected void stop() { + try (CloseableMonitor.Hold h = monitor.enter()) { + subscriber.close(); + } + } + + @Override + protected void handlePermanentError(CheckedApiException error) { + stop(); } /** Executes a client-initiated seek. */ @@ -128,14 +149,11 @@ Optional> autoCommit() { } } - void close() throws CheckedApiException { - try (CloseableMonitor.Hold h = monitor.enter()) { - subscriber.close(); + private boolean onSubscriberReset() throws CheckedApiException { + if (!enableReset) { + return false; } - committer.stopAsync().awaitTerminated(); - } - private boolean onSubscriberReset() throws CheckedApiException { // Handle an out-of-band seek notification from the server. There must be no pending commits // after this function returns. try (CloseableMonitor.Hold h = monitor.enter()) { diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java index 0af8d703..f80b18cd 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsublite.kafka; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -29,7 +30,6 @@ import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; import com.google.common.collect.ImmutableList; @@ -98,16 +98,12 @@ public void setAssignment(Set assignment) { .filter(p -> !assignment.contains(p)) .map(partitions::remove) .collect(Collectors.toList()); - for (SinglePartitionSubscriber subscriber : unassigned) { - subscriber.close(); - } + blockingShutdown(unassigned); assignment.stream() .filter(p -> !partitions.containsKey(p)) .forEach( ExtractStatus.rethrowAsRuntime( partition -> { - Committer committer = committerFactory.newCommitter(partition); - committer.startAsync().awaitRunning(); SinglePartitionSubscriber subscriber = new SinglePartitionSubscriber( subscriberFactory, @@ -115,7 +111,9 @@ public void setAssignment(Set assignment) { SeekRequest.newBuilder() .setNamedTarget(NamedTarget.COMMITTED_CURSOR) .build(), - committer); + committerFactory.newCommitter(partition), + autocommit); + subscriber.startAsync().awaitRunning(); partitions.put(partition, subscriber); })); assignmentChanged.set(null); @@ -208,7 +206,9 @@ public ApiFuture> commitAll() { }); return ApiFutures.transform( ApiFutures.allAsList(commitFutures), - results -> results.stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())), + results -> + ImmutableMap.copyOf( + results.stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))), MoreExecutors.directExecutor()); } } @@ -263,9 +263,7 @@ public Optional position(Partition partition) { @Override public void close(Duration duration) { try (CloseableMonitor.Hold h = monitor.enter()) { - for (SinglePartitionSubscriber subscriber : partitions.values()) { - subscriber.close(); - } + blockingShutdown(partitions.values()); } catch (Throwable t) { throw toKafka(t); } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java index 919e62f3..4b3e2482 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java @@ -33,6 +33,7 @@ import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler; import com.google.cloud.pubsublite.proto.SeekRequest; @@ -47,6 +48,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Spy; @RunWith(JUnit4.class) public class SinglePartitionSubscriberTest { @@ -54,9 +56,11 @@ public class SinglePartitionSubscriberTest { SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build(); private static final Partition PARTITION = Partition.of(2); + abstract static class FakeCommitter extends FakeApiService implements Committer {} + @Mock PullSubscriberFactory subscriberFactory; - @Mock Committer committer; @Mock BlockingPullSubscriber pullSubscriber; + @Spy FakeCommitter committer; @Captor private ArgumentCaptor resetHandlerCaptor; @@ -67,8 +71,6 @@ public void setUp() throws CheckedApiException { initMocks(this); when(subscriberFactory.newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), any())) .thenReturn(pullSubscriber); - subscriber = - new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer); } @After @@ -85,8 +87,12 @@ private static SequencedMessage message(long offset) { @Test public void pullAndCommit() throws Exception { + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer, true); verify(subscriberFactory).newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), any()); + verify(committer).state(); + // Pull messages. when(pullSubscriber.messageIfAvailable()) .thenReturn(Optional.of(message(3))) .thenReturn(Optional.of(message(5))) @@ -96,6 +102,7 @@ public void pullAndCommit() throws Exception { assertThat(subscriber.position()).hasValue(8); verify(pullSubscriber, times(4)).messageIfAvailable(); + // Auto commit handled. when(committer.commitOffset(Offset.of(8))).thenReturn(ApiFutures.immediateFuture(null)); subscriber.autoCommit(); verify(committer).commitOffset(Offset.of(8)); @@ -105,10 +112,14 @@ public void pullAndCommit() throws Exception { } @Test - public void resetSubscriber() throws Exception { + public void resetSubscriberEnabled() throws Exception { + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer, true); verify(subscriberFactory) .newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), resetHandlerCaptor.capture()); + verify(committer).state(); + // Pull messages. when(pullSubscriber.messageIfAvailable()) .thenReturn(Optional.of(message(3))) .thenReturn(Optional.of(message(5))) @@ -116,6 +127,7 @@ public void resetSubscriber() throws Exception { .thenReturn(Optional.empty()); assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + // Subscriber reset handled. when(pullSubscriber.messageIfAvailable()) .thenReturn(Optional.of(message(9))) .thenReturn(Optional.empty()); @@ -123,10 +135,50 @@ public void resetSubscriber() throws Exception { verify(committer).waitUntilEmpty(); verify(pullSubscriber, times(6)).messageIfAvailable(); - // Subsequent messages are discarded. + // Undelivered messages are discarded. assertThat(subscriber.position()).hasValue(8); // Auto commit does nothing. subscriber.autoCommit(); + + // Pull messages after reset. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(2))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(2)); + assertThat(subscriber.position()).hasValue(3); + verify(pullSubscriber, times(8)).messageIfAvailable(); + + // Auto commit handled. + when(committer.commitOffset(Offset.of(3))).thenReturn(ApiFutures.immediateFuture(null)); + subscriber.autoCommit(); + verify(committer).commitOffset(Offset.of(3)); + } + + @Test + public void resetSubscriberDisabled() throws Exception { + subscriber = + new SinglePartitionSubscriber(subscriberFactory, PARTITION, INITIAL_SEEK, committer, false); + verify(subscriberFactory) + .newPullSubscriber(eq(PARTITION), eq(INITIAL_SEEK), resetHandlerCaptor.capture()); + verify(committer).state(); + + // Pull messages. + when(pullSubscriber.messageIfAvailable()) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.of(message(5))) + .thenReturn(Optional.of(message(7))) + .thenReturn(Optional.empty()); + assertThat(subscriber.getMessages()).containsExactly(message(3), message(5), message(7)); + assertThat(subscriber.position()).hasValue(8); + verify(pullSubscriber, times(4)).messageIfAvailable(); + + // Subscriber reset not handled. + assertThat(resetHandlerCaptor.getValue().handleReset()).isFalse(); + + // Auto commit handled. + when(committer.commitOffset(Offset.of(8))).thenReturn(ApiFutures.immediateFuture(null)); + subscriber.autoCommit(); + verify(committer).commitOffset(Offset.of(8)); } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java index e32e6e61..48855977 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java @@ -193,9 +193,7 @@ public void assignAndPoll() throws Exception { verify(subscriber5).close(); verify(subscriber8).close(); verify(committer5).stopAsync(); - verify(committer5).awaitTerminated(); verify(committer8).stopAsync(); - verify(committer8).awaitTerminated(); } @Test @@ -266,9 +264,7 @@ public void assignAndPollAutocommit() throws Exception { verify(subscriber5).close(); verify(subscriber8).close(); verify(committer5).stopAsync(); - verify(committer5).awaitTerminated(); verify(committer8).stopAsync(); - verify(committer8).awaitTerminated(); } @Test From a0b9d6cbf78e0df91c448764086885d8077a3305 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Tue, 10 Aug 2021 19:45:13 -0400 Subject: [PATCH 3/3] Add notes about use of seek operations --- .readme-partials.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.readme-partials.yaml b/.readme-partials.yaml index 438c755f..92fb2cae 100644 --- a/.readme-partials.yaml +++ b/.readme-partials.yaml @@ -80,3 +80,9 @@ about: | - Producers operate on a single topic, and Consumers on a single subscription. - ProducerRecord may not specify partition explicitly. - Consumers may not dynamically create consumer groups (subscriptions). + + Note: + - In order to use Pub/Sub Lite [seek operations](https://cloud.google.com/pubsub/lite/docs/seek), + Consumers must have auto-commit enabled. Consumer seek methods are client-initiated, whereas + Pub/Sub Lite seek operations are initiated out-of-band and pushed to Consumers. Both types of + seeks should not be used concurrently, as they would interfere with one another.