From 4677d7bd56db6c76106daeb7be38fa65f1c9c745 Mon Sep 17 00:00:00 2001 From: yayi-google <75696801+yayi-google@users.noreply.github.com> Date: Thu, 25 Feb 2021 06:15:41 -0800 Subject: [PATCH] feat: StreamWriterV2 sets exception for response with error (#884) --- .../storage/v1beta2/StreamWriterV2.java | 10 ++++++- .../storage/v1beta2/StreamWriterV2Test.java | 30 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6c4973043c..8775669893 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -401,7 +401,15 @@ private void requestCallback(AppendRowsResponse response) { } finally { this.lock.unlock(); } - requestWrapper.appendResult.set(response); + if (response.hasError()) { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + requestWrapper.appendResult.setException(exception); + } else { + requestWrapper.appendResult.set(response); + } } private void doneCallback(Throwable finalStatus) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index be51412c2e..88d6fcd1ac 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -122,6 +122,12 @@ private AppendRowsResponse createAppendResponse(long offset) { .build(); } + private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) { + return AppendRowsResponse.newBuilder() + .setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message)) + .build(); + } + private ApiFuture sendTestMessage(StreamWriterV2 writer, String[] messages) { return writer.append(createAppendRequest(messages, -1)); } @@ -196,7 +202,7 @@ public void testAppendSuccess() throws Exception { } @Test - public void testAppendSuccessAndError() throws Exception { + public void testAppendSuccessAndConnectionError() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); testBigQueryWrite.addResponse(createAppendResponse(0)); testBigQueryWrite.addException(Status.INTERNAL.asException()); @@ -211,6 +217,28 @@ public void testAppendSuccessAndError() throws Exception { writer.close(); } + @Test + public void testAppendSuccessAndInStreamError() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse( + createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message")); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + StatusRuntimeException actualError = + assertFutureException(StatusRuntimeException.class, appendFuture2); + assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode()); + assertEquals("test message", actualError.getStatus().getDescription()); + assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + @Test public void longIdleBetweenAppends() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2();