Skip to content

Commit

Permalink
fix: Start assigner when subscribe() is called (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Nov 18, 2020
1 parent 0fe7a77 commit f143f9d
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 0 deletions.
Expand Up @@ -202,6 +202,7 @@ public void subscribe(
consumer = Optional.of(consumerFactory.newConsumer());
try {
assigner = Optional.of(assignerFactory.New(newAssignmentReceiver(consumerRebalanceListener)));
assigner.get().startAsync();
} catch (ApiException e) {
throw toKafka(e);
}
Expand Down
Expand Up @@ -299,6 +299,7 @@ public void validSubscribe() throws Exception {
consumer.subscribe(ImmutableList.of(example(TopicPath.class).toString()), listener);
verify(consumerFactory).newConsumer();
verify(assignerFactory).New(any());
verify(assigner).startAsync();
receiver.get().handleAssignment(ImmutableSet.of(Partition.of(5)));
verify(listener)
.onPartitionsAssigned(
Expand Down

0 comments on commit f143f9d

Please sign in to comment.