diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 6c4973043c..8775669893 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -401,7 +401,15 @@ private void requestCallback(AppendRowsResponse response) { } finally { this.lock.unlock(); } - requestWrapper.appendResult.set(response); + if (response.hasError()) { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + requestWrapper.appendResult.setException(exception); + } else { + requestWrapper.appendResult.set(response); + } } private void doneCallback(Throwable finalStatus) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index be51412c2e..88d6fcd1ac 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -122,6 +122,12 @@ private AppendRowsResponse createAppendResponse(long offset) { .build(); } + private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) { + return AppendRowsResponse.newBuilder() + .setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message)) + .build(); + } + private ApiFuture sendTestMessage(StreamWriterV2 writer, String[] messages) { return writer.append(createAppendRequest(messages, -1)); } @@ -196,7 +202,7 @@ public void testAppendSuccess() throws Exception { } @Test - public void testAppendSuccessAndError() throws Exception { + public void testAppendSuccessAndConnectionError() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); testBigQueryWrite.addResponse(createAppendResponse(0)); testBigQueryWrite.addException(Status.INTERNAL.asException()); @@ -211,6 +217,28 @@ public void testAppendSuccessAndError() throws Exception { writer.close(); } + @Test + public void testAppendSuccessAndInStreamError() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse( + createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message")); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + StatusRuntimeException actualError = + assertFutureException(StatusRuntimeException.class, appendFuture2); + assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode()); + assertEquals("test message", actualError.getStatus().getDescription()); + assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + @Test public void longIdleBetweenAppends() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2();