From b746ff86539c24bba52008830c696e625582b28a Mon Sep 17 00:00:00 2001 From: stephwang Date: Fri, 29 Oct 2021 11:52:20 -0400 Subject: [PATCH 01/13] feat: add support for StorageError --- .../cloud/bigquery/storage/v1/Exceptions.java | 110 ++++++++++++++++++ .../bigquery/storage/v1/StreamWriter.java | 16 ++- .../bigquery/storage/v1/StreamWriterTest.java | 34 ++++++ 3 files changed, 155 insertions(+), 5 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java new file mode 100644 index 0000000000..ce1f766cb1 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -0,0 +1,110 @@ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import javax.annotation.Nullable; + +/** Exceptions for Storage Client Libraries. */ +public final class Exceptions { + /** Main Storage Exception. Might contain map of streams to errors for that stream. */ + public static class StorageException extends RuntimeException { + + protected StorageException() {} + + protected StorageException(String message) { + super(message); + } + + protected StorageException(String message, Throwable cause) { + super(message, cause); + } + + protected StorageException(Throwable cause) { + super(cause); + } + + private ImmutableMap errors; + + public ImmutableMap getErrors() { + return errors; + } + + public void setErrors(ImmutableMap value) { + this.errors = value; + } + + private String streamName; + + public String getStreamName() { + return streamName; + } + + protected void setStreamName(String value) { + this.streamName = value; + } + } + + /** Stream has already been finalized. */ + public static class StreamFinalizedException extends StorageException { + protected StreamFinalizedException() {} + + protected StreamFinalizedException(String name, String message, Throwable cause) { + super(message, cause); + setStreamName(name); + } + } + + /** + * There was a schema mismatch due to bigquery table with fewer fields than the input message. + * This can be resolved by updating the table's schema with the message schema. + */ + public static class SchemaMismatchedException extends StorageException { + protected SchemaMismatchedException() {} + + protected SchemaMismatchedException(String name, String message, Throwable cause) { + super(message, cause); + setStreamName(name); + } + } + + private static StorageError toStorageError(com.google.rpc.Status rpcStatus) { + for (Any detail : rpcStatus.getDetailsList()) { + if (detail.is(StorageError.class)) { + try { + return detail.unpack(StorageError.class); + } catch (InvalidProtocolBufferException protoException) { + throw new IllegalStateException(protoException); + } + } + } + return null; + } + + /* Converts a c.g.rpc.Status into a StorageException, if possible. + * Examines the embedded StorageError, and potentially returns a StreamFinalizedException or + * SchemaMismatchedException (both derive from StorageException). + * If there is no StorageError, or the StorageError is a different error it will return NULL. + */ + @Nullable + public static StorageException toStorageException( + com.google.rpc.Status rpcStatus, Throwable exception) { + StorageError error = toStorageError(rpcStatus); + if (error == null) { + return null; + } + switch (error.getCode()) { + case STREAM_FINALIZED: + return new StreamFinalizedException(error.getEntity(), error.getErrorMessage(), exception); + + case SCHEMA_MISMATCH_EXTRA_FIELDS: + return new SchemaMismatchedException(error.getEntity(), error.getErrorMessage(), exception); + + default: + return null; + } + } + + private Exceptions() {} +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 3f94a85f03..65638a7e92 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -460,11 +460,17 @@ private void requestCallback(AppendRowsResponse response) { this.lock.unlock(); } if (response.hasError()) { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); - requestWrapper.appendResult.setException(exception); + Exceptions.StorageException storageException = + Exceptions.toStorageException(response.getError(), null); + if (storageException != null) { + requestWrapper.appendResult.setException(storageException); + } else { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + requestWrapper.appendResult.setException(exception); + } } else { requestWrapper.appendResult.set(response); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index a07616dbdb..2de58e8c88 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -27,7 +27,9 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; import com.google.common.base.Strings; +import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; import io.grpc.Status; @@ -304,6 +306,38 @@ public void testAppendSuccessAndInStreamError() throws Exception { writer.close(); } + @Test + public void testAppendFailedSchemaError() throws Exception { + StreamWriter writer = getTestStreamWriter(); + + StorageError storageError = + StorageError.newBuilder() + .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS) + .setEntity("foobar") + .build(); + com.google.rpc.Status statusProto = + com.google.rpc.Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT.getHttpStatusCode()) + .addDetails(Any.pack(storageError)) + .build(); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setError(statusProto).build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + Exceptions.SchemaMismatchedException actualError = + assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); + assertEquals("foobar", actualError.getStreamName()); + assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + @Test public void longIdleBetweenAppends() throws Exception { StreamWriter writer = getTestStreamWriter(); From 2acdc53869c1f41a58085d430517a2ee442cc93b Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 29 Oct 2021 15:54:53 +0000 Subject: [PATCH 02/13] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4d2a291cd4..193e7b8b81 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies ```Groovy -implementation platform('com.google.cloud:libraries-bom:23.1.0') +implementation platform('com.google.cloud:libraries-bom:24.0.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.4.0' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.5.0' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.4.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.5.0" ``` ## Authentication From 472aae65bc34bc4dd295a13b5e0292246d6e3749 Mon Sep 17 00:00:00 2001 From: stephwang Date: Fri, 29 Oct 2021 11:58:53 -0400 Subject: [PATCH 03/13] add license header --- .../cloud/bigquery/storage/v1/Exceptions.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index ce1f766cb1..f15eb14cab 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -1,3 +1,18 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.bigquery.storage.v1; import com.google.api.gax.grpc.GrpcStatusCode; From 5e52f7a1f44f5936da1ad635b196600a56a767ab Mon Sep 17 00:00:00 2001 From: stephwang Date: Mon, 1 Nov 2021 15:51:35 -0400 Subject: [PATCH 04/13] add error parsing in doneCallback --- .../cloud/bigquery/storage/v1/Exceptions.java | 20 +++++++++++++++++++ .../bigquery/storage/v1/StreamWriter.java | 5 +++++ 2 files changed, 25 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index f15eb14cab..659a439630 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -121,5 +121,25 @@ public static StorageException toStorageException( } } + /* Converts a Throwable into a StorageException, if possible. + * Examines the embedded error message, and potentially returns a StreamFinalizedException or + * SchemaMismatchedException (both derive from StorageException). + * If there is no StorageError, or the StorageError is a different error it will return NULL. + */ + @Nullable + public static StorageException toStorageException(Throwable exception) { + String errorMsg = exception.getMessage(); + if (errorMsg == null) { + return null; + } + if (errorMsg.toLowerCase().contains("finazlied")) { + return new StreamFinalizedException(null, errorMsg, exception); + } + if (errorMsg.toLowerCase().contains("mismatch")) { + return new SchemaMismatchedException(null, errorMsg, exception); + } + return null; + } + private Exceptions() {} } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 65638a7e92..679d367897 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -21,6 +21,7 @@ import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; @@ -488,6 +489,10 @@ private void doneCallback(Throwable finalStatus) { } finally { this.lock.unlock(); } + Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus); + if (storageException != null) { + throw new StorageException(finalStatus); + } } @GuardedBy("lock") From a78a524870c023d92b9d75c22751bc615b21b530 Mon Sep 17 00:00:00 2001 From: stephwang Date: Tue, 2 Nov 2021 13:52:25 -0400 Subject: [PATCH 05/13] address feedback --- .../com/google/cloud/bigquery/storage/v1/StreamWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 679d367897..0e0010db2a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -21,7 +21,6 @@ import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; -import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; @@ -491,7 +490,7 @@ private void doneCallback(Throwable finalStatus) { } Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus); if (storageException != null) { - throw new StorageException(finalStatus); + this.connectionFinalStatus = storageException; } } From bd0b2c0fa9f4b83aa14215512ded9272053d186c Mon Sep 17 00:00:00 2001 From: stephwang Date: Thu, 4 Nov 2021 15:07:52 -0400 Subject: [PATCH 06/13] add test case for onDone --- .../bigquery/storage/v1/StreamWriterTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 2de58e8c88..8d7685073a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -338,6 +338,28 @@ public void testAppendFailedSchemaError() throws Exception { writer.close(); } + @Test + public void testAppendFailedOnDone() throws Exception { + StreamWriter writer = getTestStreamWriter(); + + StatusRuntimeException exception = + new StatusRuntimeException( + io.grpc.Status.INVALID_ARGUMENT.withDescription("schema mismatch")); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addException(exception); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + Exceptions.SchemaMismatchedException actualError = + assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); + assertTrue(actualError.getMessage().contains("schema mismatch")); + + writer.close(); + } + @Test public void longIdleBetweenAppends() throws Exception { StreamWriter writer = getTestStreamWriter(); From f86833368cbf25d05581ee04579a8e880a139c95 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 4 Nov 2021 19:10:07 +0000 Subject: [PATCH 07/13] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 193e7b8b81..1f378666c8 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.5.0' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.5.1' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.5.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.5.1" ``` ## Authentication From 8a2f95f7e7768b84c668c4f208c50b4edc41f7dd Mon Sep 17 00:00:00 2001 From: stephwang Date: Thu, 4 Nov 2021 15:42:51 -0400 Subject: [PATCH 08/13] refactor code --- .../cloud/bigquery/storage/v1/Exceptions.java | 65 ++++++++----------- 1 file changed, 27 insertions(+), 38 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index 659a439630..2eb0762337 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigquery.storage.v1; +import static com.google.common.base.Preconditions.checkArgument; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; @@ -26,48 +27,37 @@ public final class Exceptions { /** Main Storage Exception. Might contain map of streams to errors for that stream. */ public static class StorageException extends RuntimeException { - protected StorageException() {} + private final ImmutableMap errors; + private final String streamName; - protected StorageException(String message) { - super(message); + private StorageException() { + this(null, null, null, ImmutableMap.of()); } - protected StorageException(String message, Throwable cause) { + private StorageException( + @Nullable String message, + @Nullable Throwable cause, + @Nullable String streamName, + ImmutableMap errors) { super(message, cause); + checkArgument(errors != null, "errors must be non null"); + this.streamName = streamName; + this.errors = errors; } - protected StorageException(Throwable cause) { - super(cause); - } - - private ImmutableMap errors; - public ImmutableMap getErrors() { return errors; } - public void setErrors(ImmutableMap value) { - this.errors = value; - } - - private String streamName; - public String getStreamName() { return streamName; } - - protected void setStreamName(String value) { - this.streamName = value; - } } /** Stream has already been finalized. */ - public static class StreamFinalizedException extends StorageException { - protected StreamFinalizedException() {} - + public static final class StreamFinalizedException extends StorageException { protected StreamFinalizedException(String name, String message, Throwable cause) { - super(message, cause); - setStreamName(name); + super(message, cause, name, null); } } @@ -75,12 +65,9 @@ protected StreamFinalizedException(String name, String message, Throwable cause) * There was a schema mismatch due to bigquery table with fewer fields than the input message. * This can be resolved by updating the table's schema with the message schema. */ - public static class SchemaMismatchedException extends StorageException { - protected SchemaMismatchedException() {} - + public static final class SchemaMismatchedException extends StorageException { protected SchemaMismatchedException(String name, String message, Throwable cause) { - super(message, cause); - setStreamName(name); + super(message, cause, name, null); } } @@ -97,10 +84,11 @@ private static StorageError toStorageError(com.google.rpc.Status rpcStatus) { return null; } - /* Converts a c.g.rpc.Status into a StorageException, if possible. - * Examines the embedded StorageError, and potentially returns a StreamFinalizedException or - * SchemaMismatchedException (both derive from StorageException). - * If there is no StorageError, or the StorageError is a different error it will return NULL. + /** + * Converts a c.g.rpc.Status into a StorageException, if possible. Examines the embedded + * StorageError, and potentially returns a {@link StreamFinalizedException} or {@link + * SchemaMismatchedException} (both derive from StorageException). If there is no StorageError, or + * the StorageError is a different error it will return NULL. */ @Nullable public static StorageException toStorageException( @@ -121,10 +109,11 @@ public static StorageException toStorageException( } } - /* Converts a Throwable into a StorageException, if possible. - * Examines the embedded error message, and potentially returns a StreamFinalizedException or - * SchemaMismatchedException (both derive from StorageException). - * If there is no StorageError, or the StorageError is a different error it will return NULL. + /** + * Converts a Throwable into a StorageException, if possible. Examines the embedded error message, + * and potentially returns a {@link StreamFinalizedException} or {@link SchemaMismatchedException} + * (both derive from StorageException). If there is no StorageError, or the StorageError is a + * different error it will return NULL. */ @Nullable public static StorageException toStorageException(Throwable exception) { From 283388600eb8e9572e5357b8908d8a6b767eb3d2 Mon Sep 17 00:00:00 2001 From: stephwang Date: Thu, 4 Nov 2021 15:45:59 -0400 Subject: [PATCH 09/13] lint --- .../java/com/google/cloud/bigquery/storage/v1/Exceptions.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index 2eb0762337..d47d7dbd91 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1; import static com.google.common.base.Preconditions.checkArgument; + import com.google.api.gax.grpc.GrpcStatusCode; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; From d321f17bf89d96cddb6c5281fd7b97338672afea Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 4 Nov 2021 19:46:06 +0000 Subject: [PATCH 10/13] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../java/com/google/cloud/bigquery/storage/v1/Exceptions.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index 2eb0762337..d47d7dbd91 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1; import static com.google.common.base.Preconditions.checkArgument; + import com.google.api.gax.grpc.GrpcStatusCode; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; From 436f40b5f750f341f07fb1b09c8aff621c7053ae Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 5 Nov 2021 11:14:51 -0400 Subject: [PATCH 11/13] apply changes from code review Co-authored-by: BenWhitehead --- .../java/com/google/cloud/bigquery/storage/v1/Exceptions.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index d47d7dbd91..88295ab05f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -58,7 +58,7 @@ public String getStreamName() { /** Stream has already been finalized. */ public static final class StreamFinalizedException extends StorageException { protected StreamFinalizedException(String name, String message, Throwable cause) { - super(message, cause, name, null); + super(message, cause, name, ImmutableMap.of()); } } @@ -68,7 +68,7 @@ protected StreamFinalizedException(String name, String message, Throwable cause) */ public static final class SchemaMismatchedException extends StorageException { protected SchemaMismatchedException(String name, String message, Throwable cause) { - super(message, cause, name, null); + super(message, cause, name, ImmutableMap.of()); } } From c88dad21aea1e5ea5e7b67234ca26d7473a37ee2 Mon Sep 17 00:00:00 2001 From: stephwang Date: Mon, 8 Nov 2021 15:20:38 -0500 Subject: [PATCH 12/13] add integration test for onDoneCallback --- .../cloud/bigquery/storage/v1/Exceptions.java | 5 +- .../bigquery/storage/v1/StreamWriterTest.java | 4 +- .../it/ITBigQueryWriteManualClientTest.java | 49 +++++++++++++++++++ 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index d47d7dbd91..c89305e1cb 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -15,8 +15,6 @@ */ package com.google.cloud.bigquery.storage.v1; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.api.gax.grpc.GrpcStatusCode; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; @@ -41,7 +39,6 @@ private StorageException( @Nullable String streamName, ImmutableMap errors) { super(message, cause); - checkArgument(errors != null, "errors must be non null"); this.streamName = streamName; this.errors = errors; } @@ -125,7 +122,7 @@ public static StorageException toStorageException(Throwable exception) { if (errorMsg.toLowerCase().contains("finazlied")) { return new StreamFinalizedException(null, errorMsg, exception); } - if (errorMsg.toLowerCase().contains("mismatch")) { + if (errorMsg.toLowerCase().contains("mismatched")) { return new SchemaMismatchedException(null, errorMsg, exception); } return null; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 8d7685073a..71f66cbadf 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -344,7 +344,7 @@ public void testAppendFailedOnDone() throws Exception { StatusRuntimeException exception = new StatusRuntimeException( - io.grpc.Status.INVALID_ARGUMENT.withDescription("schema mismatch")); + io.grpc.Status.INVALID_ARGUMENT.withDescription("schema mismatched")); testBigQueryWrite.addResponse(createAppendResponse(0)); testBigQueryWrite.addException(exception); @@ -355,7 +355,7 @@ public void testAppendFailedOnDone() throws Exception { assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); Exceptions.SchemaMismatchedException actualError = assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); - assertTrue(actualError.getMessage().contains("schema mismatch")); + assertTrue(actualError.getMessage().contains("schema mismatched")); writer.close(); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 7d06ec1e53..91c17e342e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -49,13 +49,16 @@ public class ITBigQueryWriteManualClientTest { private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; + private static final String TABLEINT = "testtableintcol"; private static final String TABLE2 = "complicatedtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; private static BigQueryWriteClient client; + private static TableInfo intTableInfo; private static TableInfo tableInfo; private static TableInfo tableInfo2; private static TableInfo tableInfoEU; + private static String intTableId; private static String tableId; private static String tableId2; private static String tableIdEU; @@ -71,6 +74,15 @@ public static void beforeClass() throws IOException { DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); + intTableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLEINT), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); tableInfo = TableInfo.newBuilder( TableId.of(DATASET, TABLE), @@ -101,8 +113,13 @@ public static void beforeClass() throws IOException { .build(), innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build()))) .build(); + bigquery.create(intTableInfo); bigquery.create(tableInfo); bigquery.create(tableInfo2); + intTableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, TABLEINT); tableId = String.format( "projects/%s/datasets/%s/tables/%s", @@ -499,6 +516,38 @@ public void testStreamError() throws IOException, InterruptedException, Executio } } + @Test + public void testStreamSchemaMisMatchError() throws IOException, InterruptedException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(intTableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + + try (StreamWriter streamWriter = + StreamWriter.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build()) { + // Create a proto row that is not compatible with the table schema. + ApiFuture response = + streamWriter.append(CreateProtoRows(new String[] {"a"}), /*offset=*/ -1L); + try { + response.get(); + Assert.fail("Should fail"); + } catch (ExecutionException e) { + assertThat(e.getCause().getMessage()) + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at com_google_cloud_bigquery_storage_test_FooType.foo, the proto field type string, BigQuery field type INTEGER Entity"); + assertThat( + e.getMessage() + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at com_google_cloud_bigquery_storage_test_FooType.foo, the proto field type string, BigQuery field type INTEGER Entity")); + } + } + } + @Test public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException { WriteStream writeStream = From 1da0f7a3cc759fed5d07845e7f214bcc12dbf44d Mon Sep 17 00:00:00 2001 From: stephwang Date: Fri, 12 Nov 2021 18:12:37 -0500 Subject: [PATCH 13/13] make error check more vigorous --- .../cloud/bigquery/storage/v1/Exceptions.java | 29 ++++++-- .../bigquery/storage/v1/StreamWriterTest.java | 40 +++++++++- .../it/ITBigQueryWriteManualClientTest.java | 74 ++++++++++++------- 3 files changed, 109 insertions(+), 34 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index ecafaee13e..a979e1fa61 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -19,6 +19,10 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.Status; +import io.grpc.Status.Code; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.annotation.Nullable; /** Exceptions for Storage Client Libraries. */ @@ -115,15 +119,28 @@ public static StorageException toStorageException( */ @Nullable public static StorageException toStorageException(Throwable exception) { - String errorMsg = exception.getMessage(); - if (errorMsg == null) { + // TODO: switch to using rpcStatus when cl/408735437 is rolled out + // com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception); + Status grpcStatus = Status.fromThrowable(exception); + String message = exception.getMessage(); + String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"; + Pattern streamPattern = Pattern.compile(streamPatternString); + if (message == null) { return null; } - if (errorMsg.toLowerCase().contains("finazlied")) { - return new StreamFinalizedException(null, errorMsg, exception); + // TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE + if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT) + && message.toLowerCase().contains("input schema has more fields than bigquery schema")) { + Matcher streamMatcher = streamPattern.matcher(message); + String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown"; + return new SchemaMismatchedException(entity, message, exception); } - if (errorMsg.toLowerCase().contains("mismatched")) { - return new SchemaMismatchedException(null, errorMsg, exception); + // TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE + if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT) + && message.toLowerCase().contains("stream has been finalized and cannot be appended")) { + Matcher streamMatcher = streamPattern.matcher(message); + String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown"; + return new StreamFinalizedException(entity, message, exception); } return null; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 71f66cbadf..b95c981bf3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -344,7 +344,8 @@ public void testAppendFailedOnDone() throws Exception { StatusRuntimeException exception = new StatusRuntimeException( - io.grpc.Status.INVALID_ARGUMENT.withDescription("schema mismatched")); + io.grpc.Status.INVALID_ARGUMENT.withDescription( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema")); testBigQueryWrite.addResponse(createAppendResponse(0)); testBigQueryWrite.addException(exception); @@ -355,11 +356,46 @@ public void testAppendFailedOnDone() throws Exception { assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); Exceptions.SchemaMismatchedException actualError = assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); - assertTrue(actualError.getMessage().contains("schema mismatched")); + assertTrue( + actualError + .getMessage() + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema")); writer.close(); } + // TODO(stephwang): update test case to below when toStorageException is updated + // @Test + // public void testAppendFailedOnDone2() throws Exception { + // StreamWriter writer = getTestStreamWriter(); + // + // StorageError storageError = + // StorageError.newBuilder() + // .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS) + // .setEntity("foobar") + // .build(); + // com.google.rpc.Status statusProto = + // com.google.rpc.Status.newBuilder() + // .addDetails(Any.pack(storageError)) + // .build(); + // + // StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto); + // + // testBigQueryWrite.addResponse(createAppendResponse(0)); + // testBigQueryWrite.addException(exception); + // + // ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + // ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + // + // assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + // Exceptions.SchemaMismatchedException actualError = + // assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2); + // assertEquals("foobar", actualError.getStreamName()); + // + // writer.close(); + // } + @Test public void longIdleBetweenAppends() throws Exception { StreamWriter writer = getTestStreamWriter(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 91c17e342e..9f65a74ca7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -49,16 +49,13 @@ public class ITBigQueryWriteManualClientTest { private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; - private static final String TABLEINT = "testtableintcol"; private static final String TABLE2 = "complicatedtable"; private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; private static BigQueryWriteClient client; - private static TableInfo intTableInfo; private static TableInfo tableInfo; private static TableInfo tableInfo2; private static TableInfo tableInfoEU; - private static String intTableId; private static String tableId; private static String tableId2; private static String tableIdEU; @@ -74,15 +71,6 @@ public static void beforeClass() throws IOException { DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); - intTableInfo = - TableInfo.newBuilder( - TableId.of(DATASET, TABLEINT), - StandardTableDefinition.of( - Schema.of( - com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.INTEGER) - .setMode(Field.Mode.NULLABLE) - .build()))) - .build(); tableInfo = TableInfo.newBuilder( TableId.of(DATASET, TABLE), @@ -113,13 +101,8 @@ public static void beforeClass() throws IOException { .build(), innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build()))) .build(); - bigquery.create(intTableInfo); bigquery.create(tableInfo); bigquery.create(tableInfo2); - intTableId = - String.format( - "projects/%s/datasets/%s/tables/%s", - ServiceOptions.getDefaultProjectId(), DATASET, TABLEINT); tableId = String.format( "projects/%s/datasets/%s/tables/%s", @@ -170,6 +153,15 @@ ProtoRows CreateProtoRows(String[] messages) { return rows.build(); } + ProtoRows CreateProtoRowsMultipleColumns(String[] messages) { + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + UpdatedFooType foo = UpdatedFooType.newBuilder().setFoo(message).setBar(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + return rows.build(); + } + ProtoRows CreateProtoRowsComplex(String[] messages) { ProtoRows.Builder rows = ProtoRows.newBuilder(); for (String message : messages) { @@ -521,29 +513,59 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep WriteStream writeStream = client.createWriteStream( CreateWriteStreamRequest.newBuilder() - .setParent(intTableId) + .setParent(tableId) .setWriteStream( WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()) - .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor())) + .build()) { + // Create a proto row that has extra fields than the table schema defined which should trigger + // the SCHEMA_MISMATCH_EXTRA_FIELDS error + ApiFuture response = + streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0); + try { + response.get(); + Assert.fail("Should fail"); + } catch (ExecutionException e) { + // TODO(stephwang): update test case when toStroageException is updated + assertThat(e.getCause().getMessage()) + .contains( + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"); + } + } + } + + @Test + public void testStreamFinalizedError() + throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (StreamWriter streamWriter = + StreamWriter.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor())) .build()) { - // Create a proto row that is not compatible with the table schema. + // Finalize the stream in order to trigger STREAM_FINALIZED error + client.finalizeWriteStream( + FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); + // Try to append to a finalized stream ApiFuture response = - streamWriter.append(CreateProtoRows(new String[] {"a"}), /*offset=*/ -1L); + streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0); try { response.get(); Assert.fail("Should fail"); } catch (ExecutionException e) { + // //TODO(stephwang): update test case when toStroageException is updated assertThat(e.getCause().getMessage()) .contains( - "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at com_google_cloud_bigquery_storage_test_FooType.foo, the proto field type string, BigQuery field type INTEGER Entity"); - assertThat( - e.getMessage() - .contains( - "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at com_google_cloud_bigquery_storage_test_FooType.foo, the proto field type string, BigQuery field type INTEGER Entity")); + "io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended"); } } }