diff --git a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index 83252967..de02fb4b 100644 --- a/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -202,6 +202,7 @@ public void subscribe( consumer = Optional.of(consumerFactory.newConsumer()); try { assigner = Optional.of(assignerFactory.New(newAssignmentReceiver(consumerRebalanceListener))); + assigner.get().startAsync(); } catch (ApiException e) { throw toKafka(e); } diff --git a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index f152a62e..9b084d11 100644 --- a/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -299,6 +299,7 @@ public void validSubscribe() throws Exception { consumer.subscribe(ImmutableList.of(example(TopicPath.class).toString()), listener); verify(consumerFactory).newConsumer(); verify(assignerFactory).New(any()); + verify(assigner).startAsync(); receiver.get().handleAssignment(ImmutableSet.of(Partition.of(5))); verify(listener) .onPartitionsAssigned(