From a3cdd44a8607bf0c8615e4a5c73f75ce5dd0a1ec Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Tue, 15 Jun 2021 08:38:57 -0400 Subject: [PATCH] fix: Use BlockingPullSubscriber instead of BufferingPullSubscriber in kafka client (#150) This also makes polling reactive, and not have a minimum 100ms latency. --- .../kafka/ApiFuturesExtensions.java | 34 ++++ .../pubsublite/kafka/ConsumerSettings.java | 4 +- .../kafka/PullSubscriberFactory.java | 5 +- .../kafka/SingleSubscriptionConsumerImpl.java | 94 ++++++----- .../SingleSubscriptionConsumerImplTest.java | 152 ++++++++++++------ 5 files changed, 197 insertions(+), 92 deletions(-) create mode 100644 src/main/java/com/google/cloud/pubsublite/kafka/ApiFuturesExtensions.java diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ApiFuturesExtensions.java b/src/main/java/com/google/cloud/pubsublite/kafka/ApiFuturesExtensions.java new file mode 100644 index 00000000..445f0bea --- /dev/null +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ApiFuturesExtensions.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.kafka; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collection; + +final class ApiFuturesExtensions { + private ApiFuturesExtensions() {} + + public static ApiFuture whenFirstDone(Collection> futures) { + SettableApiFuture someFutureDone = SettableApiFuture.create(); + futures.forEach( + future -> + future.addListener(() -> someFutureDone.set(null), MoreExecutors.directExecutor())); + return someFutureDone; + } +} diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 6e53853b..8fdd3c4a 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -26,7 +26,7 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; -import com.google.cloud.pubsublite.internal.BufferingPullSubscriber; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.CursorClientSettings; import com.google.cloud.pubsublite.internal.TopicStatsClient; @@ -124,7 +124,7 @@ public Consumer instantiate() throws ApiException { throw toCanonical(t).underlying; } }; - return new BufferingPullSubscriber( + return new BlockingPullSubscriberImpl( subscriberFactory, perPartitionFlowControlSettings()); }; CommitterFactory committerFactory = diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java index 2e925416..e72ba77d 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java @@ -17,13 +17,12 @@ package com.google.cloud.pubsublite.kafka; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.PullSubscriber; import com.google.cloud.pubsublite.proto.SeekRequest; /** A factory for making new PullSubscribers for a given partition of a subscription. */ interface PullSubscriberFactory { - PullSubscriber newPullSubscriber(Partition partition, SeekRequest initial) + BlockingPullSubscriber newPullSubscriber(Partition partition, SeekRequest initial) throws CheckedApiException; } diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java index 61de8704..5758f63a 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -17,17 +17,20 @@ package com.google.cloud.pubsublite.kafka; import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; +import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; -import com.google.cloud.pubsublite.internal.PullSubscriber; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; @@ -40,13 +43,13 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import java.time.Duration; import java.util.ArrayDeque; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -67,17 +70,21 @@ class SingleSubscriptionConsumerImpl implements SingleSubscriptionConsumer { private final CloseableMonitor monitor = new CloseableMonitor(); static class SubscriberState { - PullSubscriber subscriber; + BlockingPullSubscriber subscriber; Committer committer; - Optional lastUncommitted = Optional.empty(); + boolean needsCommitting = false; + Optional lastReceived = Optional.empty(); } @GuardedBy("monitor.monitor") - private Map partitions = new HashMap<>(); - - // Set to true when wakeup() has been called once. + private final Map partitions = new HashMap<>(); + // When the set of assignments changes, this future will be set and swapped with a new future to + // let ongoing pollers know that they should pick up new assignments. @GuardedBy("monitor.monitor") - private boolean wakeupTriggered = false; + private SettableApiFuture assignmentChanged = SettableApiFuture.create(); + + // Set when wakeup() has been called once. + private final SettableApiFuture wakeupTriggered = SettableApiFuture.create(); SingleSubscriptionConsumerImpl( TopicPath topic, @@ -118,6 +125,8 @@ public void setAssignment(Set assignment) { s.committer.startAsync().awaitRunning(); partitions.put(partition, s); })); + assignmentChanged.set(null); + assignmentChanged = SettableApiFuture.create(); } catch (Throwable t) { throw ExtractStatus.toCanonical(t).underlying; } @@ -136,27 +145,33 @@ private Map> fetchAll() { partitions.forEach( ExtractStatus.rethrowAsRuntime( (partition, state) -> { - List messages = state.subscriber.pull(); - if (messages.isEmpty()) return; - partitionQueues.computeIfAbsent(partition, x -> new ArrayDeque<>()).addAll(messages); + ArrayDeque messages = new ArrayDeque<>(); + for (Optional message = state.subscriber.messageIfAvailable(); + message.isPresent(); + message = state.subscriber.messageIfAvailable()) { + messages.add(message.get()); + } + partitionQueues.put(partition, messages); })); return partitionQueues; } private Map> doPoll(Duration duration) { try { - while (!duration.isZero()) { - try (CloseableMonitor.Hold h = monitor.enter()) { - if (wakeupTriggered) throw new WakeupException(); - Map> partitionQueues = fetchAll(); - if (!partitionQueues.isEmpty()) return partitionQueues; - } - Duration sleepFor = Collections.min(ImmutableList.of(duration, Duration.ofMillis(10))); - Thread.sleep(sleepFor.toMillis()); - duration = duration.minus(sleepFor); + ImmutableList.Builder> stopSleepingSignals = ImmutableList.builder(); + try (CloseableMonitor.Hold h = monitor.enter()) { + stopSleepingSignals.add(wakeupTriggered); + stopSleepingSignals.add(assignmentChanged); + partitions.values().forEach(state -> stopSleepingSignals.add(state.subscriber.onData())); + } + try { + ApiFuturesExtensions.whenFirstDone(stopSleepingSignals.build()) + .get(duration.toMillis(), MILLISECONDS); + } catch (TimeoutException e) { + return ImmutableMap.of(); } - // Last fetch to handle duration originally being 0 and last time window sleep. try (CloseableMonitor.Hold h = monitor.enter()) { + if (wakeupTriggered.isDone()) throw new WakeupException(); return fetchAll(); } } catch (Throwable t) { @@ -166,8 +181,6 @@ private Map> doPoll(Duration duration) { @Override public ConsumerRecords poll(Duration duration) { - Map> partitionQueues = doPoll(duration); - Map>> records = new HashMap<>(); if (autocommit) { ApiFuture future = commitAll(); ApiFutures.addCallback( @@ -183,14 +196,16 @@ public void onSuccess(Object result) {} }, MoreExecutors.directExecutor()); } + Map> partitionQueues = doPoll(duration); + Map>> records = new HashMap<>(); partitionQueues.forEach( (partition, queue) -> { if (queue.isEmpty()) return; try (CloseableMonitor.Hold h = monitor.enter()) { SubscriberState state = partitions.getOrDefault(partition, null); - if (state != null) { - state.lastUncommitted = Optional.of(Iterables.getLast(queue).offset()); - } + if (state == null) return; + state.lastReceived = Optional.of(Iterables.getLast(queue).offset()); + state.needsCommitting = true; } List> partitionRecords = queue.stream() @@ -207,17 +222,16 @@ public ApiFuture> commitAll() { try (CloseableMonitor.Hold h = monitor.enter()) { ImmutableMap.Builder builder = ImmutableMap.builder(); ImmutableList.Builder> commitFutures = ImmutableList.builder(); - partitions.entrySet().stream() - .filter(entry -> entry.getValue().lastUncommitted.isPresent()) - .forEach( - entry -> { - // The Pub/Sub Lite commit offset is the next to be received. - Offset lastUncommitted = entry.getValue().lastUncommitted.get(); - entry.getValue().lastUncommitted = Optional.empty(); - Offset toCommit = Offset.of(lastUncommitted.value() + 1); - builder.put(entry.getKey(), toCommit); - commitFutures.add(entry.getValue().committer.commitOffset(toCommit)); - }); + partitions.forEach( + (partition, state) -> { + if (!state.needsCommitting) return; + checkState(state.lastReceived.isPresent()); + state.needsCommitting = false; + // The Pub/Sub Lite commit offset is one more than the last received. + Offset toCommit = Offset.of(state.lastReceived.get().value() + 1); + builder.put(partition, toCommit); + commitFutures.add(state.committer.commitOffset(toCommit)); + }); Map map = builder.build(); return ApiFutures.transform( ApiFutures.allAsList(commitFutures.build()), @@ -269,7 +283,7 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti @Override public Optional position(Partition partition) { if (!partitions.containsKey(partition)) return Optional.empty(); - return partitions.get(partition).subscriber.nextOffset().map(Offset::value); + return partitions.get(partition).lastReceived.map(lastReceived -> lastReceived.value() + 1); } @Override @@ -286,8 +300,6 @@ public void close(Duration duration) { @Override public void wakeup() { - try (CloseableMonitor.Hold h = monitor.enter()) { - wakeupTriggered = true; - } + wakeupTriggered.set(null); } } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java index 20318504..b0a429bd 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java @@ -29,16 +29,18 @@ import static org.mockito.MockitoAnnotations.initMocks; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.PullSubscriber; import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; @@ -74,8 +76,8 @@ public class SingleSubscriptionConsumerImplTest { @Mock PullSubscriberFactory subscriberFactory; @Mock CommitterFactory committerFactory; - @Mock PullSubscriber subscriber5; - @Mock PullSubscriber subscriber8; + @Mock BlockingPullSubscriber subscriber5; + @Mock BlockingPullSubscriber subscriber8; abstract static class FakeCommitter extends FakeApiService implements Committer {} @@ -97,9 +99,13 @@ public void setUp() throws CheckedApiException { when(committerFactory.newCommitter(Partition.of(8))).thenReturn(committer8); } - private static SequencedMessage message(long offset) { + private static SequencedMessage message(Offset offset) { return SequencedMessage.of( - Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L); + Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset.value()), 0L); + } + + private static SequencedMessage message(long offset) { + return message(Offset.of(offset)); } private static void assertConsumerRecordsEqual( @@ -118,37 +124,51 @@ public void assignAndPoll() throws Exception { verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); verify(committerFactory).newCommitter(Partition.of(5)); verify(committerFactory).newCommitter(Partition.of(8)); - when(subscriber5.pull()).thenReturn(ImmutableList.of()); - when(subscriber8.pull()).thenReturn(ImmutableList.of()); // ----------------------------- - // Pulls ceil(15ms / 10ms) + 1 times (3) when no messages are returned - assertConsumerRecordsEqual(consumer.poll(Duration.ofMillis(15)), ImmutableListMultimap.of()); - verify(subscriber5, times(3)).pull(); - verify(subscriber8, times(3)).pull(); + // No data available + when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); + when(subscriber8.onData()).thenReturn(SettableApiFuture.create()); + assertConsumerRecordsEqual(consumer.poll(Duration.ZERO), ImmutableListMultimap.of()); + verify(subscriber5, times(1)).onData(); + verify(subscriber8, times(1)).onData(); + verify(subscriber5, times(0)).messageIfAvailable(); + verify(subscriber8, times(0)).messageIfAvailable(); verify(committer5, times(0)).commitOffset(any()); verify(committer8, times(0)).commitOffset(any()); // ----------------------------- // Pulls once when messages are available. - when(subscriber5.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(3))); - when(subscriber8.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(4))); + when(subscriber5.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber8.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber5.messageIfAvailable()) + .thenReturn(Optional.of(message(1))) + .thenReturn(Optional.of(message(2))) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.empty()); + when(subscriber8.messageIfAvailable()) + .thenReturn(Optional.of(message(1))) + .thenReturn(Optional.of(message(2))) + .thenReturn(Optional.of(message(4))) + .thenReturn(Optional.empty()); assertConsumerRecordsEqual( - consumer.poll(Duration.ofMillis(15)), + consumer.poll(Duration.ofDays(1)), // Still returns immediately ImmutableListMultimap.builder() .putAll(Partition.of(5), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(3))) .putAll(Partition.of(8), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(4))) .build()); - verify(subscriber5, times(4)).pull(); - verify(subscriber8, times(4)).pull(); + verify(subscriber5, times(2)).onData(); + verify(subscriber8, times(2)).onData(); + verify(subscriber5, times(4)).messageIfAvailable(); + verify(subscriber8, times(4)).messageIfAvailable(); verify(committer5, times(0)).commitOffset(any()); verify(committer8, times(0)).commitOffset(any()); // -------------------------- - // Zero duration poll pulls once. - when(subscriber5.pull()).thenReturn(ImmutableList.of()); - when(subscriber8.pull()).thenReturn(ImmutableList.of()); + // No data available no commit + when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); + when(subscriber8.onData()).thenReturn(SettableApiFuture.create()); assertConsumerRecordsEqual(consumer.poll(Duration.ZERO), ImmutableListMultimap.of()); - verify(subscriber5, times(5)).pull(); - verify(subscriber8, times(5)).pull(); + verify(subscriber5, times(4)).messageIfAvailable(); + verify(subscriber8, times(4)).messageIfAvailable(); verify(committer5, times(0)).commitOffset(any()); verify(committer8, times(0)).commitOffset(any()); // -------------------------- @@ -186,43 +206,55 @@ public void assignAndPollAutocommit() throws Exception { verify(subscriberFactory).newPullSubscriber(Partition.of(8), DEFAULT_SEEK); verify(committerFactory).newCommitter(Partition.of(5)); verify(committerFactory).newCommitter(Partition.of(8)); - when(subscriber5.pull()).thenReturn(ImmutableList.of()); - when(subscriber8.pull()).thenReturn(ImmutableList.of()); // ----------------------------- - // Pulls ceil(15ms / 10ms) + 1 times (3) when no messages are returned - assertConsumerRecordsEqual(consumer.poll(Duration.ofMillis(15)), ImmutableListMultimap.of()); - verify(subscriber5, times(3)).pull(); - verify(subscriber8, times(3)).pull(); + // No data available + when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); + when(subscriber8.onData()).thenReturn(SettableApiFuture.create()); + assertConsumerRecordsEqual(consumer.poll(Duration.ZERO), ImmutableListMultimap.of()); + verify(subscriber5, times(1)).onData(); + verify(subscriber8, times(1)).onData(); + verify(subscriber5, times(0)).messageIfAvailable(); + verify(subscriber8, times(0)).messageIfAvailable(); verify(committer5, times(0)).commitOffset(any()); verify(committer8, times(0)).commitOffset(any()); // ----------------------------- // Pulls once when messages are available. - when(subscriber5.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(3))); - when(subscriber8.pull()).thenReturn(ImmutableList.of(message(1), message(2), message(4))); + when(subscriber5.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber8.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber5.messageIfAvailable()) + .thenReturn(Optional.of(message(1))) + .thenReturn(Optional.of(message(2))) + .thenReturn(Optional.of(message(3))) + .thenReturn(Optional.empty()); + when(subscriber8.messageIfAvailable()) + .thenReturn(Optional.of(message(1))) + .thenReturn(Optional.of(message(2))) + .thenReturn(Optional.of(message(4))) + .thenReturn(Optional.empty()); assertConsumerRecordsEqual( - consumer.poll(Duration.ofMillis(15)), + consumer.poll(Duration.ofDays(1)), // Still returns immediately ImmutableListMultimap.builder() .putAll(Partition.of(5), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(3))) .putAll(Partition.of(8), ImmutableList.of(Offset.of(1), Offset.of(2), Offset.of(4))) .build()); - verify(subscriber5, times(4)).pull(); - verify(subscriber8, times(4)).pull(); + verify(subscriber5, times(2)).onData(); + verify(subscriber8, times(2)).onData(); + verify(subscriber5, times(4)).messageIfAvailable(); + verify(subscriber8, times(4)).messageIfAvailable(); verify(committer5, times(0)).commitOffset(any()); verify(committer8, times(0)).commitOffset(any()); // -------------------------- - // Zero duration poll pulls once, commits previous offsets. + // No data poll commits previous offsets. SettableApiFuture commit5 = SettableApiFuture.create(); SettableApiFuture commit8 = SettableApiFuture.create(); // Commits are last received + 1 when(committer5.commitOffset(Offset.of(4))).thenReturn(commit5); when(committer8.commitOffset(Offset.of(5))).thenReturn(commit8); - when(subscriber5.pull()).thenReturn(ImmutableList.of()); - when(subscriber8.pull()).thenReturn(ImmutableList.of()); + when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); + when(subscriber8.onData()).thenReturn(SettableApiFuture.create()); assertConsumerRecordsEqual(consumer.poll(Duration.ZERO), ImmutableListMultimap.of()); - verify(subscriber5, times(5)).pull(); - verify(subscriber8, times(5)).pull(); verify(committer5).commitOffset(Offset.of(4)); verify(committer8).commitOffset(Offset.of(5)); commit5.set(null); @@ -238,21 +270,21 @@ public void assignAndPollAutocommit() throws Exception { } @Test - public void wakeupBeforePoll() throws Exception { + public void wakeupBeforePoll() { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); - when(subscriber5.pull()).thenReturn(ImmutableList.of()); + when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); consumer.wakeup(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMillis(15))); } @Test - public void wakeupDuringPoll() throws Exception { + public void wakeupDuringPoll() { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); - when(subscriber5.pull()) + when(subscriber5.onData()) .thenAnswer( args -> { consumer.wakeup(); - return ImmutableList.of(); + return SettableApiFuture.create(); }); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofDays(1))); } @@ -273,6 +305,31 @@ public void assignmentChange() throws Exception { verify(committer5).stopAsync(); } + @Test + public void assignmentChangeMakesPollReturn() throws Exception { + consumer.setAssignment(ImmutableSet.of(Partition.of(5))); + assertThat(consumer.assignment()).isEqualTo(ImmutableSet.of(Partition.of(5))); + verify(subscriberFactory).newPullSubscriber(Partition.of(5), DEFAULT_SEEK); + when(subscriber5.onData()).thenReturn(SettableApiFuture.create()); + SettableApiFuture pollRunning = SettableApiFuture.create(); + when(subscriber5.onData()) + .thenAnswer( + args -> { + pollRunning.set(null); + return SettableApiFuture.create(); // never finishes + }); + SettableApiFuture pollDone = SettableApiFuture.create(); + SystemExecutors.getAlarmExecutor() + .execute( + () -> { + assertThat(consumer.poll(Duration.ofDays(1)).isEmpty()).isTrue(); + pollDone.set(null); + }); + pollRunning.get(); + consumer.setAssignment(ImmutableSet.of()); + pollDone.get(); + } + @Test public void commitNotAssigned() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); @@ -312,10 +369,13 @@ public void seekAssigned() throws Exception { @Test public void position() throws Exception { consumer.setAssignment(ImmutableSet.of(Partition.of(5))); - assertThat(consumer.position(Partition.of(8))).isEmpty(); - when(subscriber5.nextOffset()).thenReturn(Optional.empty()); assertThat(consumer.position(Partition.of(5))).isEmpty(); - when(subscriber5.nextOffset()).thenReturn(Optional.of(example(Offset.class))); - assertThat(consumer.position(Partition.of(5))).hasValue(example(Offset.class).value()); + assertThat(consumer.position(Partition.of(8))).isEmpty(); + when(subscriber5.onData()).thenReturn(ApiFutures.immediateFuture(null)); + when(subscriber5.messageIfAvailable()) + .thenReturn(Optional.of(message(example(Offset.class)))) + .thenReturn(Optional.empty()); + assertThat(consumer.poll(Duration.ofMillis(0)).count()).isEqualTo(1); + assertThat(consumer.position(Partition.of(5))).hasValue(example(Offset.class).value() + 1); } }