Skip to content

Commit

Permalink
fix: Partition assignment stream does not require an initial response (
Browse files Browse the repository at this point in the history
…#966)

* fix: Partition assignment stream does not require an initial response

* Fix lint
  • Loading branch information
tmdiep committed Dec 7, 2021
1 parent 2f78d56 commit 8227d31
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Expand Up @@ -38,7 +38,7 @@ private ConnectedAssignerImpl(
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory,
ResponseObserver<PartitionAssignment> clientStream,
PartitionAssignmentRequest initialRequest) {
super(streamFactory, clientStream);
super(streamFactory, clientStream, /*expectInitialResponse=*/ false);
initialize(initialRequest);
}

Expand Down
Expand Up @@ -40,6 +40,7 @@ public abstract class SingleConnection<StreamRequestT, StreamResponseT, ClientRe

private final ClientStream<StreamRequestT> requestStream;
private final ResponseObserver<ClientResponseT> clientStream;
private final boolean expectInitial;

private final CloseableMonitor connectionMonitor = new CloseableMonitor();

Expand All @@ -56,13 +57,24 @@ protected abstract void handleInitialResponse(StreamResponseT response)

protected SingleConnection(
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
ResponseObserver<ClientResponseT> clientStream) {
ResponseObserver<ClientResponseT> clientStream,
boolean expectInitialResponse) {
this.clientStream = clientStream;
this.requestStream = streamFactory.New(this);
this.expectInitial = expectInitialResponse;
}

protected SingleConnection(
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
ResponseObserver<ClientResponseT> clientStream) {
this(streamFactory, clientStream, /*expectInitialResponse=*/ true);
}

protected void initialize(StreamRequestT initialRequest) {
this.requestStream.send(initialRequest);
if (!expectInitial) {
return;
}
try (CloseableMonitor.Hold h =
connectionMonitor.enterWhenUninterruptibly(
new Guard(connectionMonitor.monitor) {
Expand Down
Expand Up @@ -86,8 +86,8 @@ public void setUp() throws IOException {
(Answer<ClientStream<PartitionAssignmentRequest>>)
args -> {
Preconditions.checkArgument(!leakedResponseStream.isPresent());
ResponseObserver<PartitionAssignment> ResponseObserver = args.getArgument(0);
leakedResponseStream = Optional.of(ResponseObserver);
ResponseObserver<PartitionAssignment> responseObserver = args.getArgument(0);
leakedResponseStream = Optional.of(responseObserver);
return mockRequestStream;
})
.when(streamFactory)
Expand Down Expand Up @@ -139,6 +139,12 @@ public void construct_SendsInitialThenError() throws Exception {
FACTORY.New(streamFactory, mockOutputStream, initialRequest())) {}
}

@Test
public void construct_noInitialResponse() throws Exception {
assigner = FACTORY.New(streamFactory, mockOutputStream, initialRequest());
verify(mockRequestStream).send(initialRequest());
}

private void initialize() {
doAnswer(AnswerWith(PartitionAssignment.newBuilder().addPartitions(1)))
.when(mockRequestStream)
Expand Down

0 comments on commit 8227d31

Please sign in to comment.