From 4d15dcce8587bdfc1fc173433f0075aaadaec64e Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Mon, 16 Aug 2021 14:38:05 -0400 Subject: [PATCH] fix: Fail hard in subscribe() call on assignment failure (#197) --- .../google/cloud/pubsublite/kafka/PubsubLiteConsumer.java | 2 +- .../cloud/pubsublite/kafka/PubsubLiteConsumerTest.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index 57fccf8e..1868976c 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -209,7 +209,7 @@ public void subscribe( consumer = Optional.of(consumerFactory.newConsumer()); try { assigner = Optional.of(assignerFactory.New(newAssignmentReceiver(consumerRebalanceListener))); - assigner.get().startAsync(); + assigner.get().startAsync().awaitRunning(); } catch (ApiException e) { throw toKafka(e); } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index f49759d8..2b3d2b95 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -42,6 +42,7 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CursorClient; import com.google.cloud.pubsublite.internal.TopicStatsClient; +import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; import com.google.cloud.pubsublite.internal.wire.Assigner; import com.google.cloud.pubsublite.internal.wire.AssignerFactory; @@ -80,6 +81,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.Spy; @RunWith(JUnit4.class) public class PubsubLiteConsumerTest { @@ -112,7 +114,9 @@ private static T example(Class klass) { @Mock AdminClient adminClient; @Mock TopicStatsClient topicStatsClient; - @Mock Assigner assigner; + abstract static class FakeAssigner extends FakeApiService implements Assigner {} + + @Spy FakeAssigner assigner; @Mock SingleSubscriptionConsumer underlying; Consumer consumer; @@ -305,6 +309,7 @@ public void validSubscribe() throws Exception { verify(consumerFactory).newConsumer(); verify(assignerFactory).New(any()); verify(assigner).startAsync(); + verify(assigner).awaitRunning(); receiver.get().handleAssignment(ImmutableSet.of(Partition.of(5))); verify(listener) .onPartitionsAssigned(