Skip to content

Commit dbf19c9

Browse files
fix: Rebatch messages when restarting a publish stream (#496)
* fix: Rebatch messages when restarting a publish stream This prevents publishers which are network isolated for a while from appearing disconnected when the server attempts to acknowledge messages. * fix: Rebatch messages when restarting a publish stream This prevents publishers which are network isolated for a while from appearing disconnected when the server attempts to acknowledge messages.
1 parent 9d71415 commit dbf19c9

File tree

3 files changed

+168
-9
lines changed

3 files changed

+168
-9
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class Constants {
3131
.build();
3232

3333
public static final long MAX_PUBLISH_BATCH_COUNT = 1_000;
34-
public static final long MAX_PUBLISH_BATCH_BYTES = 3_500_000;
34+
public static final long MAX_PUBLISH_BATCH_BYTES = 1024 * 1024 * 7 / 2; // 3.5 MiB
3535

3636
private Constants() {}
3737
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.api.gax.batching.BatchingSettings;
2727
import com.google.api.gax.rpc.ApiException;
2828
import com.google.api.gax.rpc.StatusCode.Code;
29+
import com.google.cloud.pubsublite.Constants;
2930
import com.google.cloud.pubsublite.Message;
3031
import com.google.cloud.pubsublite.Offset;
3132
import com.google.cloud.pubsublite.internal.CheckedApiException;
@@ -85,12 +86,8 @@ private static class InFlightBatch {
8586
final List<SettableApiFuture<Offset>> messageFutures;
8687

8788
InFlightBatch(Collection<UnbatchedMessage> toBatch) {
88-
messages =
89-
toBatch.stream()
90-
.map(SerialBatcher.UnbatchedMessage::message)
91-
.collect(Collectors.toList());
92-
messageFutures =
93-
toBatch.stream().map(SerialBatcher.UnbatchedMessage::future).collect(Collectors.toList());
89+
messages = toBatch.stream().map(UnbatchedMessage::message).collect(Collectors.toList());
90+
messageFutures = toBatch.stream().map(UnbatchedMessage::future).collect(Collectors.toList());
9491
}
9592
}
9693

@@ -136,10 +133,43 @@ public PublisherImpl(
136133
addServices(backgroundResourceAsApiService(client));
137134
}
138135

136+
@GuardedBy("monitor.monitor")
137+
private void rebatchForRestart() {
138+
Queue<UnbatchedMessage> messages = new ArrayDeque<>();
139+
for (InFlightBatch batch : batchesInFlight) {
140+
for (int i = 0; i < batch.messages.size(); ++i) {
141+
messages.add(UnbatchedMessage.of(batch.messages.get(i), batch.messageFutures.get(i)));
142+
}
143+
}
144+
long size = 0;
145+
int count = 0;
146+
Queue<UnbatchedMessage> currentBatch = new ArrayDeque<>();
147+
batchesInFlight.clear();
148+
for (UnbatchedMessage message : messages) {
149+
long messageSize = message.message().getSerializedSize();
150+
if (size + messageSize > Constants.MAX_PUBLISH_BATCH_BYTES
151+
|| count + 1 > Constants.MAX_PUBLISH_BATCH_COUNT) {
152+
if (!currentBatch.isEmpty()) {
153+
batchesInFlight.add(new InFlightBatch(currentBatch));
154+
currentBatch = new ArrayDeque<>();
155+
count = 0;
156+
size = 0;
157+
}
158+
}
159+
currentBatch.add(message);
160+
size = size + messageSize;
161+
count += 1;
162+
}
163+
if (!currentBatch.isEmpty()) {
164+
batchesInFlight.add(new InFlightBatch(currentBatch));
165+
}
166+
}
167+
139168
@Override
140169
public void triggerReinitialize() {
141170
try (CloseableMonitor.Hold h = monitor.enter()) {
142171
connection.reinitialize();
172+
rebatchForRestart();
143173
Collection<InFlightBatch> batches = batchesInFlight;
144174
connection.modifyConnection(
145175
connectionOr -> {

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import static com.google.cloud.pubsublite.internal.wire.RetryingConnectionHelpers.whenFailed;
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.hamcrest.CoreMatchers.hasItems;
22+
import static org.hamcrest.Matchers.contains;
2223
import static org.junit.Assert.assertThrows;
2324
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.ArgumentMatchers.eq;
2526
import static org.mockito.Mockito.doAnswer;
2627
import static org.mockito.Mockito.doReturn;
28+
import static org.mockito.Mockito.inOrder;
2729
import static org.mockito.Mockito.mock;
2830
import static org.mockito.Mockito.times;
2931
import static org.mockito.Mockito.verify;
@@ -37,6 +39,7 @@
3739
import com.google.api.gax.rpc.ApiException;
3840
import com.google.api.gax.rpc.ResponseObserver;
3941
import com.google.api.gax.rpc.StatusCode.Code;
42+
import com.google.cloud.pubsublite.Constants;
4043
import com.google.cloud.pubsublite.Message;
4144
import com.google.cloud.pubsublite.Offset;
4245
import com.google.cloud.pubsublite.internal.CheckedApiException;
@@ -45,17 +48,24 @@
4548
import com.google.cloud.pubsublite.proto.PubSubMessage;
4649
import com.google.cloud.pubsublite.proto.PublishRequest;
4750
import com.google.cloud.pubsublite.proto.PublishResponse;
51+
import com.google.common.collect.ImmutableList;
52+
import com.google.common.collect.Iterables;
4853
import com.google.common.util.concurrent.MoreExecutors;
4954
import com.google.protobuf.ByteString;
5055
import java.util.Collection;
56+
import java.util.Collections;
57+
import java.util.List;
5158
import java.util.Optional;
5259
import java.util.concurrent.ExecutionException;
5360
import java.util.concurrent.Future;
61+
import java.util.stream.Collectors;
62+
import java.util.stream.IntStream;
5463
import org.junit.Before;
5564
import org.junit.Test;
5665
import org.junit.runner.RunWith;
5766
import org.junit.runners.JUnit4;
5867
import org.mockito.ArgumentMatchers;
68+
import org.mockito.InOrder;
5969
import org.mockito.Mock;
6070
import org.mockito.stubbing.Answer;
6171
import org.threeten.bp.Duration;
@@ -226,8 +236,14 @@ public void multipleBatches_Ok() throws Exception {
226236
@Test
227237
public void retryableError_RecreatesAndRetriesAll() throws Exception {
228238
startPublisher();
229-
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build();
230-
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build();
239+
Message message1 =
240+
Message.builder()
241+
.setData(ByteString.copyFrom(new byte[(int) Constants.MAX_PUBLISH_BATCH_BYTES - 20]))
242+
.build();
243+
Message message2 =
244+
Message.builder()
245+
.setData(ByteString.copyFromUtf8(String.join("", Collections.nCopies(21, "a"))))
246+
.build();
231247
Future<Offset> future1 = publisher.publish(message1);
232248
publisher.flushToStream();
233249
verify(mockBatchPublisher)
@@ -273,6 +289,119 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception {
273289
verifyNoMoreInteractions(mockBatchPublisher, mockBatchPublisher2);
274290
}
275291

292+
@Test
293+
public void retryableError_RebatchesProperly() throws Exception {
294+
startPublisher();
295+
Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build();
296+
Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build();
297+
Message message3 =
298+
Message.builder()
299+
.setData(ByteString.copyFrom(new byte[(int) Constants.MAX_PUBLISH_BATCH_BYTES - 20]))
300+
.build();
301+
Message message4 =
302+
Message.builder()
303+
.setData(ByteString.copyFromUtf8(String.join("", Collections.nCopies(21, "a"))))
304+
.build();
305+
List<Message> remaining =
306+
IntStream.range(0, (int) Constants.MAX_PUBLISH_BATCH_COUNT)
307+
.mapToObj(x -> Message.builder().setData(ByteString.copyFromUtf8("clone-" + x)).build())
308+
.collect(Collectors.toList());
309+
310+
Future<Offset> future1 = publisher.publish(message1);
311+
Future<Offset> future2 = publisher.publish(message2);
312+
publisher.flushToStream();
313+
verify(mockBatchPublisher)
314+
.publish(
315+
(Collection<PubSubMessage>) argThat(hasItems(message1.toProto(), message2.toProto())));
316+
publisher.flushToStream();
317+
Future<Offset> future3 = publisher.publish(message3);
318+
publisher.flushToStream();
319+
verify(mockBatchPublisher)
320+
.publish((Collection<PubSubMessage>) argThat(hasItems(message3.toProto())));
321+
Future<Offset> future4 = publisher.publish(message4);
322+
publisher.flushToStream();
323+
verify(mockBatchPublisher)
324+
.publish((Collection<PubSubMessage>) argThat(hasItems(message4.toProto())));
325+
List<Future<Offset>> remainingFutures =
326+
remaining.stream().map(publisher::publish).collect(Collectors.toList());
327+
publisher.flushToStream();
328+
329+
assertThat(future1.isDone()).isFalse();
330+
assertThat(future2.isDone()).isFalse();
331+
assertThat(future3.isDone()).isFalse();
332+
assertThat(future4.isDone()).isFalse();
333+
for (Future<Offset> future : remainingFutures) {
334+
assertThat(future.isDone()).isFalse();
335+
}
336+
337+
BatchPublisher mockBatchPublisher2 = mock(BatchPublisher.class);
338+
doReturn(mockBatchPublisher2)
339+
.when(mockPublisherFactory)
340+
.New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
341+
leakedOffsetStream.onError(new CheckedApiException(Code.UNKNOWN));
342+
343+
// wait for retry to complete
344+
Thread.sleep(500);
345+
346+
verify(mockBatchPublisher).close();
347+
verify(mockPublisherFactory, times(2)).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
348+
InOrder order = inOrder(mockBatchPublisher2);
349+
order
350+
.verify(mockBatchPublisher2)
351+
.publish(
352+
(Collection<PubSubMessage>) argThat(hasItems(message1.toProto(), message2.toProto())));
353+
order
354+
.verify(mockBatchPublisher2)
355+
.publish((Collection<PubSubMessage>) argThat(hasItems(message3.toProto())));
356+
ImmutableList.Builder<PubSubMessage> expectedRebatch = ImmutableList.builder();
357+
expectedRebatch.add(message4.toProto());
358+
for (int i = 0; i < (Constants.MAX_PUBLISH_BATCH_COUNT - 1); ++i) {
359+
expectedRebatch.add(remaining.get(i).toProto());
360+
}
361+
order
362+
.verify(mockBatchPublisher2)
363+
.publish((Collection<PubSubMessage>) argThat(contains(expectedRebatch.build().toArray())));
364+
order
365+
.verify(mockBatchPublisher2)
366+
.publish(
367+
(Collection<PubSubMessage>) argThat(hasItems(Iterables.getLast(remaining).toProto())));
368+
369+
assertThat(future1.isDone()).isFalse();
370+
assertThat(future2.isDone()).isFalse();
371+
assertThat(future3.isDone()).isFalse();
372+
assertThat(future4.isDone()).isFalse();
373+
for (Future<Offset> future : remainingFutures) {
374+
assertThat(future.isDone()).isFalse();
375+
}
376+
377+
leakedOffsetStream.onResponse(Offset.of(10));
378+
assertThat(future1.isDone()).isTrue();
379+
assertThat(future1.get()).isEqualTo(Offset.of(10));
380+
assertThat(future2.isDone()).isTrue();
381+
assertThat(future2.get()).isEqualTo(Offset.of(11));
382+
assertThat(future3.isDone()).isFalse();
383+
384+
leakedOffsetStream.onResponse(Offset.of(50));
385+
assertThat(future3.isDone()).isTrue();
386+
assertThat(future3.get()).isEqualTo(Offset.of(50));
387+
assertThat(future4.isDone()).isFalse();
388+
389+
leakedOffsetStream.onResponse(Offset.of(100));
390+
assertThat(future4.isDone()).isTrue();
391+
assertThat(future4.get()).isEqualTo(Offset.of(100));
392+
for (int i = 0; i < (Constants.MAX_PUBLISH_BATCH_COUNT - 1); ++i) {
393+
Future<Offset> future = remainingFutures.get(i);
394+
assertThat(future.isDone()).isTrue();
395+
assertThat(future.get()).isEqualTo(Offset.of(100 + 1 + i));
396+
}
397+
Future<Offset> lastFuture = Iterables.getLast(remainingFutures);
398+
assertThat(lastFuture.isDone()).isFalse();
399+
400+
leakedOffsetStream.onResponse(Offset.of(10000));
401+
assertThat(lastFuture.isDone()).isTrue();
402+
assertThat(lastFuture.get()).isEqualTo(Offset.of(10000));
403+
}
404+
276405
@Test
277406
public void invalidOffsetSequence_SetsPermanentException() throws Exception {
278407
startPublisher();

0 commit comments

Comments
 (0)