diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java index 88ca71760..4e7c2d7c8 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java @@ -33,7 +33,6 @@ import java.util.ArrayDeque; import java.util.Collection; import java.util.Deque; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -113,7 +112,13 @@ public synchronized Optional messageIfAvailable() throws Check if (messages.isEmpty()) { return Optional.empty(); } - return Optional.of(Objects.requireNonNull(messages.pollFirst())); + SequencedMessage msg = messages.remove(); + underlying.allowFlow( + FlowControlRequest.newBuilder() + .setAllowedMessages(1) + .setAllowedBytes(msg.byteSize()) + .build()); + return Optional.of(msg); } @Override diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java index 3801e251f..80bdab14f 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java @@ -148,10 +148,17 @@ public void onDataSuccess() throws Exception { @Test public void pullMessage() throws Exception { + int byteSize = 30; SequencedMessage message = - SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), 30); + SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), byteSize); messageConsumer.accept(ImmutableList.of(message)); assertThat(Optional.of(message)).isEqualTo(subscriber.messageIfAvailable()); + verify(underlying) + .allowFlow( + FlowControlRequest.newBuilder() + .setAllowedBytes(byteSize) + .setAllowedMessages(1) + .build()); } @Test