diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java new file mode 100644 index 0000000000..a979e1fa61 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -0,0 +1,149 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.Status; +import io.grpc.Status.Code; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; + +/** Exceptions for Storage Client Libraries. */ +public final class Exceptions { + /** Main Storage Exception. Might contain map of streams to errors for that stream. */ + public static class StorageException extends RuntimeException { + + private final ImmutableMap errors; + private final String streamName; + + private StorageException() { + this(null, null, null, ImmutableMap.of()); + } + + private StorageException( + @Nullable String message, + @Nullable Throwable cause, + @Nullable String streamName, + ImmutableMap errors) { + super(message, cause); + this.streamName = streamName; + this.errors = errors; + } + + public ImmutableMap getErrors() { + return errors; + } + + public String getStreamName() { + return streamName; + } + } + + /** Stream has already been finalized. */ + public static final class StreamFinalizedException extends StorageException { + protected StreamFinalizedException(String name, String message, Throwable cause) { + super(message, cause, name, ImmutableMap.of()); + } + } + + /** + * There was a schema mismatch due to bigquery table with fewer fields than the input message. + * This can be resolved by updating the table's schema with the message schema. + */ + public static final class SchemaMismatchedException extends StorageException { + protected SchemaMismatchedException(String name, String message, Throwable cause) { + super(message, cause, name, ImmutableMap.of()); + } + } + + private static StorageError toStorageError(com.google.rpc.Status rpcStatus) { + for (Any detail : rpcStatus.getDetailsList()) { + if (detail.is(StorageError.class)) { + try { + return detail.unpack(StorageError.class); + } catch (InvalidProtocolBufferException protoException) { + throw new IllegalStateException(protoException); + } + } + } + return null; + } + + /** + * Converts a c.g.rpc.Status into a StorageException, if possible. Examines the embedded + * StorageError, and potentially returns a {@link StreamFinalizedException} or {@link + * SchemaMismatchedException} (both derive from StorageException). If there is no StorageError, or + * the StorageError is a different error it will return NULL. + */ + @Nullable + public static StorageException toStorageException( + com.google.rpc.Status rpcStatus, Throwable exception) { + StorageError error = toStorageError(rpcStatus); + if (error == null) { + return null; + } + switch (error.getCode()) { + case STREAM_FINALIZED: + return new StreamFinalizedException(error.getEntity(), error.getErrorMessage(), exception); + + case SCHEMA_MISMATCH_EXTRA_FIELDS: + return new SchemaMismatchedException(error.getEntity(), error.getErrorMessage(), exception); + + default: + return null; + } + } + + /** + * Converts a Throwable into a StorageException, if possible. Examines the embedded error message, + * and potentially returns a {@link StreamFinalizedException} or {@link SchemaMismatchedException} + * (both derive from StorageException). If there is no StorageError, or the StorageError is a + * different error it will return NULL. + */ + @Nullable + public static StorageException toStorageException(Throwable exception) { + // TODO: switch to using rpcStatus when cl/408735437 is rolled out + // com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception); + Status grpcStatus = Status.fromThrowable(exception); + String message = exception.getMessage(); + String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"; + Pattern streamPattern = Pattern.compile(streamPatternString); + if (message == null) { + return null; + } + // TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE + if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT) + && message.toLowerCase().contains("input schema has more fields than bigquery schema")) { + Matcher streamMatcher = streamPattern.matcher(message); + String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown"; + return new SchemaMismatchedException(entity, message, exception); + } + // TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE + if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT) + && message.toLowerCase().contains("stream has been finalized and cannot be appended")) { + Matcher streamMatcher = streamPattern.matcher(message); + String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown"; + return new StreamFinalizedException(entity, message, exception); + } + return null; + } + + private Exceptions() {} +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 3f94a85f03..0e0010db2a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -460,11 +460,17 @@ private void requestCallback(AppendRowsResponse response) { this.lock.unlock(); } if (response.hasError()) { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); - requestWrapper.appendResult.setException(exception); + Exceptions.StorageException storageException = + Exceptions.toStorageException(response.getError(), null); + if (storageException != null) { + requestWrapper.appendResult.setException(storageException); + } else { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + requestWrapper.appendResult.setException(exception); + } } else { requestWrapper.appendResult.set(response); } @@ -482,6 +488,10 @@ private void doneCallback(Throwable finalStatus) { } finally { this.lock.unlock(); } + Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus); + if (storageException != null) { + this.connectionFinalStatus = storageException; + } } @GuardedBy("lock") diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index a07616dbdb..b95c981bf3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -27,7 +27,9 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; import com.google.common.base.Strings; +import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -304,6 +306,96 @@ public void testAppendSuccessAndInStreamError() throws Exception { writer.close(); } + @Test + public void testAppendFailedSchemaError() throws Exception { + StreamWriter writer = getTestStreamWriter(); + + StorageError storageError = + StorageError.newBuilder() + .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS) + .setEntity("foobar") + .build(); + com.google.rpc.Status statusProto = + com.google.rpc.Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT.getHttpStatusCode()) + .addDetails(Any.pack(storageError)) + .build(); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setError(statusProto).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + Exceptions.SchemaMismatchedException actualError = + assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); + assertEquals("foobar", actualError.getStreamName()); + assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendFailedOnDone() throws Exception { + StreamWriter writer = getTestStreamWriter(); + + StatusRuntimeException exception = + new StatusRuntimeException( + io.grpc.Status.INVALID_ARGUMENT.withDescription( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema")); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addException(exception); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + Exceptions.SchemaMismatchedException actualError = + assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); + assertTrue( + actualError + .getMessage() + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema")); + + writer.close(); + } + + // TODO(stephwang): update test case to below when toStorageException is updated + // @Test + // public void testAppendFailedOnDone2() throws Exception { + // StreamWriter writer = getTestStreamWriter(); + // + // StorageError storageError = + // StorageError.newBuilder() + // .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS) + // .setEntity("foobar") + // .build(); + // com.google.rpc.Status statusProto = + // com.google.rpc.Status.newBuilder() + // .addDetails(Any.pack(storageError)) + // .build(); + // + // StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto); + // + // testBigQueryWrite.addResponse(createAppendResponse(0)); + // testBigQueryWrite.addException(exception); + // + // ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + // ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + // + // assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + // Exceptions.SchemaMismatchedException actualError = + // assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); + // assertEquals("foobar", actualError.getStreamName()); + // + // writer.close(); + // } + @Test public void longIdleBetweenAppends() throws Exception { StreamWriter writer = getTestStreamWriter(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 7d06ec1e53..9f65a74ca7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -153,6 +153,15 @@ ProtoRows CreateProtoRows(String[] messages) { return rows.build(); } + ProtoRows CreateProtoRowsMultipleColumns(String[] messages) { + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + UpdatedFooType foo = UpdatedFooType.newBuilder().setFoo(message).setBar(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + return rows.build(); + } + ProtoRows CreateProtoRowsComplex(String[] messages) { ProtoRows.Builder rows = ProtoRows.newBuilder(); for (String message : messages) { @@ -499,6 +508,68 @@ public void testStreamError() throws IOException, InterruptedException, Executio } } + @Test + public void testStreamSchemaMisMatchError() throws IOException, InterruptedException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + + try (StreamWriter streamWriter = + StreamWriter.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor())) + .build()) { + // Create a proto row that has extra fields than the table schema defined which should trigger + // the SCHEMA_MISMATCH_EXTRA_FIELDS error + ApiFuture response = + streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0); + try { + response.get(); + Assert.fail("Should fail"); + } catch (ExecutionException e) { + // TODO(stephwang): update test case when toStroageException is updated + assertThat(e.getCause().getMessage()) + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"); + } + } + } + + @Test + public void testStreamFinalizedError() + throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (StreamWriter streamWriter = + StreamWriter.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor())) + .build()) { + // Finalize the stream in order to trigger STREAM_FINALIZED error + client.finalizeWriteStream( + FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); + // Try to append to a finalized stream + ApiFuture response = + streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0); + try { + response.get(); + Assert.fail("Should fail"); + } catch (ExecutionException e) { + // //TODO(stephwang): update test case when toStroageException is updated + assertThat(e.getCause().getMessage()) + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended"); + } + } + } + @Test public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream =