From 0261af4e2dff5fd8515109c6336796815acb6c3c Mon Sep 17 00:00:00 2001 From: yayi-google <75696801+yayi-google@users.noreply.github.com> Date: Tue, 23 Feb 2021 12:18:49 -0800 Subject: [PATCH] feat: Add max size check to StreamWriterV2 (#873) * Add a new StreamWriterV2. Compared to existing StreamWriter, its locking mechanism is much simpler. * Stop using Java8 features as we still need to support Java7 * Do not hold lock while sending requests, and some minor refactoring. * Add max message size check. --- .../storage/v1beta2/StreamWriterV2.java | 21 +++++++++++++++++-- .../storage/v1beta2/StreamWriterV2Test.java | 17 ++++++++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 10ed99d137..8debea15f9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.util.Deque; import java.util.LinkedList; @@ -39,8 +40,6 @@ * *

TODO: Attach schema. * - *

TODO: Add max size check. - * *

TODO: Add inflight control. * *

TODO: Attach traceId. @@ -94,6 +93,11 @@ public class StreamWriterV2 implements AutoCloseable { */ private Thread appendThread; + /** The maximum size of one request. Defined by the API. */ + public static long getApiMaxRequestBytes() { + return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte) + } + private StreamWriterV2(Builder builder) { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -154,6 +158,17 @@ public void run() { */ public ApiFuture append(AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); + if (requestWrapper.messageSize > getApiMaxRequestBytes()) { + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription( + "MessageSize is too large. Max allow: " + + getApiMaxRequestBytes() + + " Actual: " + + requestWrapper.messageSize))); + return requestWrapper.appendResult; + } this.lock.lock(); try { if (userClosed) { @@ -355,10 +370,12 @@ public StreamWriterV2 build() { private static final class AppendRequestAndResponse { final SettableApiFuture appendResult; final AppendRowsRequest message; + final long messageSize; AppendRequestAndResponse(AppendRowsRequest message) { this.appendResult = SettableApiFuture.create(); this.message = message; + this.messageSize = message.getProtoRows().getSerializedSize(); } } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index c50e5abb70..4d6fba9dcd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -26,6 +26,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.common.base.Strings; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -288,6 +289,20 @@ public void serverCloseWhileRequestsInflight() throws Exception { } writer.close(); - ; + } + + @Test + public void testMessageTooLarge() { + StreamWriterV2 writer = getTestStreamWriterV2(); + + String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1)); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {oversized}); + assertTrue(appendFuture1.isDone()); + StatusRuntimeException actualError = + assertFutureException(StatusRuntimeException.class, appendFuture1); + assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode()); + assertTrue(actualError.getStatus().getDescription().contains("MessageSize is too large")); + + writer.close(); } }