From 1cd1535787e1dbbc1d4ee3f7aaddcfef906336bd Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 5 Feb 2021 09:06:02 -0500 Subject: [PATCH 1/2] fix: Rebatch messages when restarting a publish stream This prevents publishers which are network isolated for a while from appearing disconnected when the server attempts to acknowledge messages. --- .../google/cloud/pubsublite/Constants.java | 2 +- .../internal/wire/PublisherImpl.java | 42 +++++- .../internal/wire/PublisherImplTest.java | 133 +++++++++++++++++- 3 files changed, 168 insertions(+), 9 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java index b9b7d7953..bba207be3 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java @@ -31,7 +31,7 @@ public class Constants { .build(); public static final long MAX_PUBLISH_BATCH_COUNT = 1_000; - public static final long MAX_PUBLISH_BATCH_BYTES = 3_500_000; + public static final long MAX_PUBLISH_BATCH_BYTES = 1024 * 1024 * 7 / 4; // 3.5 MiB private Constants() {} } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index 91e89ee9e..717e9ab1b 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -26,6 +26,7 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.pubsublite.Constants; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.CheckedApiException; @@ -85,12 +86,8 @@ private static class InFlightBatch { final List> messageFutures; InFlightBatch(Collection toBatch) { - messages = - toBatch.stream() - .map(SerialBatcher.UnbatchedMessage::message) - .collect(Collectors.toList()); - messageFutures = - toBatch.stream().map(SerialBatcher.UnbatchedMessage::future).collect(Collectors.toList()); + messages = toBatch.stream().map(UnbatchedMessage::message).collect(Collectors.toList()); + messageFutures = toBatch.stream().map(UnbatchedMessage::future).collect(Collectors.toList()); } } @@ -136,10 +133,43 @@ public PublisherImpl( addServices(backgroundResourceAsApiService(client)); } + @GuardedBy("monitor.monitor") + private void rebatchForRestart() { + Queue messages = new ArrayDeque<>(); + for (InFlightBatch batch : batchesInFlight) { + for (int i = 0; i < batch.messages.size(); ++i) { + messages.add(UnbatchedMessage.of(batch.messages.get(i), batch.messageFutures.get(i))); + } + } + long size = 0; + int count = 0; + Queue currentBatch = new ArrayDeque<>(); + batchesInFlight.clear(); + for (UnbatchedMessage message : messages) { + long messageSize = message.message().getSerializedSize(); + if (size + messageSize > Constants.MAX_PUBLISH_BATCH_BYTES + || count + 1 > Constants.MAX_PUBLISH_BATCH_COUNT) { + if (!currentBatch.isEmpty()) { + batchesInFlight.add(new InFlightBatch(currentBatch)); + currentBatch = new ArrayDeque<>(); + count = 0; + size = 0; + } + } + currentBatch.add(message); + size = size + messageSize; + count += 1; + } + if (!currentBatch.isEmpty()) { + batchesInFlight.add(new InFlightBatch(currentBatch)); + } + } + @Override public void triggerReinitialize() { try (CloseableMonitor.Hold h = monitor.enter()) { connection.reinitialize(); + rebatchForRestart(); Collection batches = batchesInFlight; connection.modifyConnection( connectionOr -> { diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java index 838dabb2f..920cb6fe2 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java @@ -19,11 +19,13 @@ import static com.google.cloud.pubsublite.internal.wire.RetryingConnectionHelpers.whenFailed; import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -37,6 +39,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.pubsublite.Constants; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.CheckedApiException; @@ -45,17 +48,24 @@ import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.cloud.pubsublite.proto.PublishRequest; import com.google.cloud.pubsublite.proto.PublishResponse; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentMatchers; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.stubbing.Answer; import org.threeten.bp.Duration; @@ -226,8 +236,14 @@ public void multipleBatches_Ok() throws Exception { @Test public void retryableError_RecreatesAndRetriesAll() throws Exception { startPublisher(); - Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build(); - Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build(); + Message message1 = + Message.builder() + .setData(ByteString.copyFrom(new byte[(int) Constants.MAX_PUBLISH_BATCH_BYTES - 20])) + .build(); + Message message2 = + Message.builder() + .setData(ByteString.copyFromUtf8(String.join("", Collections.nCopies(21, "a")))) + .build(); Future future1 = publisher.publish(message1); publisher.flushToStream(); verify(mockBatchPublisher) @@ -273,6 +289,119 @@ public void retryableError_RecreatesAndRetriesAll() throws Exception { verifyNoMoreInteractions(mockBatchPublisher, mockBatchPublisher2); } + @Test + public void retryableError_RebatchesProperly() throws Exception { + startPublisher(); + Message message1 = Message.builder().setData(ByteString.copyFromUtf8("message1")).build(); + Message message2 = Message.builder().setData(ByteString.copyFromUtf8("message2")).build(); + Message message3 = + Message.builder() + .setData(ByteString.copyFrom(new byte[(int) Constants.MAX_PUBLISH_BATCH_BYTES - 20])) + .build(); + Message message4 = + Message.builder() + .setData(ByteString.copyFromUtf8(String.join("", Collections.nCopies(21, "a")))) + .build(); + List remaining = + IntStream.range(0, (int) Constants.MAX_PUBLISH_BATCH_COUNT) + .mapToObj(x -> Message.builder().setData(ByteString.copyFromUtf8("clone-" + x)).build()) + .collect(Collectors.toList()); + + Future future1 = publisher.publish(message1); + Future future2 = publisher.publish(message2); + publisher.flushToStream(); + verify(mockBatchPublisher) + .publish( + (Collection) argThat(hasItems(message1.toProto(), message2.toProto()))); + publisher.flushToStream(); + Future future3 = publisher.publish(message3); + publisher.flushToStream(); + verify(mockBatchPublisher) + .publish((Collection) argThat(hasItems(message3.toProto()))); + Future future4 = publisher.publish(message4); + publisher.flushToStream(); + verify(mockBatchPublisher) + .publish((Collection) argThat(hasItems(message4.toProto()))); + List> remainingFutures = + remaining.stream().map(publisher::publish).collect(Collectors.toList()); + publisher.flushToStream(); + + assertThat(future1.isDone()).isFalse(); + assertThat(future2.isDone()).isFalse(); + assertThat(future3.isDone()).isFalse(); + assertThat(future4.isDone()).isFalse(); + for (Future future : remainingFutures) { + assertThat(future.isDone()).isFalse(); + } + + BatchPublisher mockBatchPublisher2 = mock(BatchPublisher.class); + doReturn(mockBatchPublisher2) + .when(mockPublisherFactory) + .New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + leakedOffsetStream.onError(new CheckedApiException(Code.UNKNOWN)); + + // wait for retry to complete + Thread.sleep(500); + + verify(mockBatchPublisher).close(); + verify(mockPublisherFactory, times(2)).New(any(), any(), eq(INITIAL_PUBLISH_REQUEST)); + InOrder order = inOrder(mockBatchPublisher2); + order + .verify(mockBatchPublisher2) + .publish( + (Collection) argThat(hasItems(message1.toProto(), message2.toProto()))); + order + .verify(mockBatchPublisher2) + .publish((Collection) argThat(hasItems(message3.toProto()))); + ImmutableList.Builder expectedRebatch = ImmutableList.builder(); + expectedRebatch.add(message4.toProto()); + for (int i = 0; i < (Constants.MAX_PUBLISH_BATCH_COUNT - 1); ++i) { + expectedRebatch.add(remaining.get(i).toProto()); + } + order + .verify(mockBatchPublisher2) + .publish((Collection) argThat(contains(expectedRebatch.build().toArray()))); + order + .verify(mockBatchPublisher2) + .publish( + (Collection) argThat(hasItems(Iterables.getLast(remaining).toProto()))); + + assertThat(future1.isDone()).isFalse(); + assertThat(future2.isDone()).isFalse(); + assertThat(future3.isDone()).isFalse(); + assertThat(future4.isDone()).isFalse(); + for (Future future : remainingFutures) { + assertThat(future.isDone()).isFalse(); + } + + leakedOffsetStream.onResponse(Offset.of(10)); + assertThat(future1.isDone()).isTrue(); + assertThat(future1.get()).isEqualTo(Offset.of(10)); + assertThat(future2.isDone()).isTrue(); + assertThat(future2.get()).isEqualTo(Offset.of(11)); + assertThat(future3.isDone()).isFalse(); + + leakedOffsetStream.onResponse(Offset.of(50)); + assertThat(future3.isDone()).isTrue(); + assertThat(future3.get()).isEqualTo(Offset.of(50)); + assertThat(future4.isDone()).isFalse(); + + leakedOffsetStream.onResponse(Offset.of(100)); + assertThat(future4.isDone()).isTrue(); + assertThat(future4.get()).isEqualTo(Offset.of(100)); + for (int i = 0; i < (Constants.MAX_PUBLISH_BATCH_COUNT - 1); ++i) { + Future future = remainingFutures.get(i); + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isEqualTo(Offset.of(100 + 1 + i)); + } + Future lastFuture = Iterables.getLast(remainingFutures); + assertThat(lastFuture.isDone()).isFalse(); + + leakedOffsetStream.onResponse(Offset.of(10000)); + assertThat(lastFuture.isDone()).isTrue(); + assertThat(lastFuture.get()).isEqualTo(Offset.of(10000)); + } + @Test public void invalidOffsetSequence_SetsPermanentException() throws Exception { startPublisher(); From b3893f7d67875f796afe4c84ccf909327ebd8514 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 5 Feb 2021 11:48:43 -0500 Subject: [PATCH 2/2] fix: Rebatch messages when restarting a publish stream This prevents publishers which are network isolated for a while from appearing disconnected when the server attempts to acknowledge messages. --- .../src/main/java/com/google/cloud/pubsublite/Constants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java index bba207be3..cbd0f5d68 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java @@ -31,7 +31,7 @@ public class Constants { .build(); public static final long MAX_PUBLISH_BATCH_COUNT = 1_000; - public static final long MAX_PUBLISH_BATCH_BYTES = 1024 * 1024 * 7 / 4; // 3.5 MiB + public static final long MAX_PUBLISH_BATCH_BYTES = 1024 * 1024 * 7 / 2; // 3.5 MiB private Constants() {} }