From 43fc284e00ddbc9a018d734e3f6f09c82ebd92d4 Mon Sep 17 00:00:00 2001 From: Martin Mladenovski Date: Fri, 7 Feb 2020 08:27:06 -0800 Subject: [PATCH] feat: add an enhanced layer for BigQuery Storage v1 client (#66) * feat: seed v1 with v1beta2 enhanced layer files This commit makes it easier for reviewing later changes that update the enhanced layer to v1. * feat: add an enhanced layer for BigQuery Storage v1 client v1beta2 and v1 APIs are the same, with the only difference in the version. --- .../storage/v1/BigQueryReadClient.java | 376 ++++++++++++++++++ .../storage/v1/BigQueryReadSettings.java | 202 ++++++++++ .../v1/stub/EnhancedBigQueryReadStub.java | 121 ++++++ .../EnhancedBigQueryReadStubSettings.java | 232 +++++++++++ .../readrows/ReadRowsResumptionStrategy.java | 72 ++++ .../v1/stub/readrows/package-info.java | 16 + .../storage/v1/BigQueryReadClientTest.java | 165 ++++++++ .../EnhancedBigQueryReadStubSettingsTest.java | 142 +++++++ .../storage/v1/stub/ResourceHeaderTest.java | 140 +++++++ .../v1/stub/readrows/ReadRowsRetryTest.java | 243 +++++++++++ 10 files changed, 1709 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java new file mode 100644 index 0000000000..e3fbf56165 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java @@ -0,0 +1,376 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.BetaApi; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Service Description: BigQuery Read API. + * + *

The Read API can be used to read data from BigQuery. + * + *

This class provides the ability to make remote calls to the backing service through method + * calls that map to API methods. Sample code to get started: + * + *

+ * 
+ * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+ *   String parent = "";
+ *   ReadSession readSession = ReadSession.newBuilder().build();
+ *   int maxStreamCount = 0;
+ *   ReadSession response = BigQueryReadClient.createReadSession(parent, readSession, maxStreamCount);
+ * }
+ * 
+ * 
+ * + *

Note: close() needs to be called on the BigQueryReadClient object to clean up resources such + * as threads. In the example above, try-with-resources is used, which automatically calls close(). + * + *

The surface of this class includes several types of Java methods for each of the API's + * methods: + * + *

    + *
  1. A "flattened" method. With this type of method, the fields of the request type have been + * converted into function parameters. It may be the case that not all fields are available as + * parameters, and not every API method will have a flattened method entry point. + *
  2. A "request object" method. This type of method only takes one parameter, a request object, + * which must be constructed before the call. Not every API method will have a request object + * method. + *
  3. A "callable" method. This type of method takes no parameters and returns an immutable API + * callable object, which can be used to initiate calls to the service. + *
+ * + *

See the individual methods for example code. + * + *

Many parameters require resource names to be formatted in a particular way. To assist with + * these names, this class includes a format method for each type of name, and additionally a parse + * method to extract the individual identifiers contained within names that are returned. + * + *

This class can be customized by passing in a custom instance of BigQueryReadSettings to + * create(). For example: + * + *

To customize credentials: + * + *

+ * 
+ * BigQueryReadSettings BigQueryReadSettings =
+ *     BigQueryReadSettings.newBuilder()
+ *         .setCredentialsProvider(FixedCredentialsProvider.create(myCredentials))
+ *         .build();
+ * BigQueryReadClient BigQueryReadClient =
+ *     BigQueryReadClient.create(BigQueryReadSettings);
+ * 
+ * 
+ * + * To customize the endpoint: + * + *
+ * 
+ * BigQueryReadSettings BigQueryReadSettings =
+ *     BigQueryReadSettings.newBuilder().setEndpoint(myEndpoint).build();
+ * BigQueryReadClient BigQueryReadClient =
+ *     BigQueryReadClient.create(BigQueryReadSettings);
+ * 
+ * 
+ */ +@BetaApi +public class BigQueryReadClient implements BackgroundResource { + private final BigQueryReadSettings settings; + private final EnhancedBigQueryReadStub stub; + + /** Constructs an instance of BigQueryReadClient with default settings. */ + public static final BigQueryReadClient create() throws IOException { + return create(BigQueryReadSettings.newBuilder().build()); + } + + /** + * Constructs an instance of BigQueryReadClient, using the given settings. The channels are + * created based on the settings passed in, or defaults for any settings that are not set. + */ + public static final BigQueryReadClient create(BigQueryReadSettings settings) throws IOException { + return new BigQueryReadClient(settings); + } + + /** + * Constructs an instance of BigQueryReadClient, using the given stub for making calls. This is + * for advanced usage - prefer to use BigQueryReadSettings}. + */ + @BetaApi("A restructuring of stub classes is planned, so this may break in the future") + public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { + return new BigQueryReadClient(stub); + } + + /** + * Constructs an instance of BigQueryReadClient, using the given settings. This is protected so + * that it is easy to make a subclass, but otherwise, the static factory methods should be + * preferred. + */ + protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { + this.settings = settings; + this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + } + + @BetaApi("A restructuring of stub classes is planned, so this may break in the future") + protected BigQueryReadClient(EnhancedBigQueryReadStub stub) { + this.settings = null; + this.stub = stub; + } + + public final BigQueryReadSettings getSettings() { + return settings; + } + + @BetaApi("A restructuring of stub classes is planned, so this may break in the future") + public EnhancedBigQueryReadStub getStub() { + return stub; + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *

A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *

Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *

Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+   *   String parent = "";
+   *   ReadSession readSession = ReadSession.newBuilder().build();
+   *   int maxStreamCount = 0;
+   *   ReadSession response = BigQueryReadClient.createReadSession(parent, readSession, maxStreamCount);
+   * }
+   * 
+ * + * @param parent Required. The request project that owns the session, in the form of + * `projects/{project_id}`. + * @param readSession Required. Session to be created. + * @param maxStreamCount Max initial number of streams. If unset or zero, the server will provide + * a value of streams so as to produce reasonable throughput. Must be non-negative. The number + * of streams may be lower than the requested number, depending on the amount parallelism that + * is reasonable for the table. Error will be returned if the max count is greater than the + * current system max limit of 1,000. + *

Streams must be read starting from offset 0. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final ReadSession createReadSession( + String parent, ReadSession readSession, int maxStreamCount) { + + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(readSession) + .setMaxStreamCount(maxStreamCount) + .build(); + return createReadSession(request); + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *

A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *

Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *

Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+   *   CreateReadSessionRequest request = CreateReadSessionRequest.newBuilder().build();
+   *   ReadSession response = BigQueryReadClient.createReadSession(request);
+   * }
+   * 
+ * + * @param request The request object containing all of the parameters for the API call. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final ReadSession createReadSession(CreateReadSessionRequest request) { + return createReadSessionCallable().call(request); + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *

A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *

Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *

Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   CreateReadSessionRequest request = CreateReadSessionRequest.newBuilder().build();
+   *   ApiFuture<ReadSession> future = BigQueryReadClient.createReadSessionCallable().futureCall(request);
+   *   // Do something
+   *   ReadSession response = future.get();
+   * }
+   * 
+ */ + public final UnaryCallable createReadSessionCallable() { + return stub.createReadSessionCallable(); + } + + /** + * Reads rows from the stream in the format prescribed by the ReadSession. Each response contains + * one or more table rows, up to a maximum of 100 MiB per response; read requests which attempt to + * read individual rows larger than 100 MiB will fail. + * + *

Each request also returns a set of stream statistics reflecting the current state of the + * stream. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   ReadRowsRequest request = ReadRowsRequest.newBuilder().build();
+   *
+   *   ServerStream<ReadRowsResponse> stream = bigQueryReadClient.readRowsCallable().call(request);
+   *   for (ReadRowsResponse response : stream) {
+   *     // Do something when receive a response
+   *   }
+   * }
+   * 
+ */ + public final ServerStreamingCallable readRowsCallable() { + return stub.readRowsCallable(); + } + + /** + * Splits a given `ReadStream` into two `ReadStream` objects. These `ReadStream` objects are + * referred to as the primary and the residual streams of the split. The original `ReadStream` can + * still be read from in the same manner as before. Both of the returned `ReadStream` objects can + * also be read from, and the rows returned by both child streams will be the same as the rows + * read from the original stream. + * + *

Moreover, the two child streams will be allocated back-to-back in the original `ReadStream`. + * Concretely, it is guaranteed that for streams original, primary, and residual, that + * original[0-j] = primary[0-j] and original[j-n] = residual[0-m] once the streams have been read + * to completion. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   SplitReadStreamRequest request = SplitReadStreamRequest.newBuilder().build();
+   *   SplitReadStreamResponse response = bigQueryReadClient.splitReadStream(request);
+   * }
+   * 
+ * + * @param request The request object containing all of the parameters for the API call. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) { + return splitReadStreamCallable().call(request); + } + + /** + * Splits a given `ReadStream` into two `ReadStream` objects. These `ReadStream` objects are + * referred to as the primary and the residual streams of the split. The original `ReadStream` can + * still be read from in the same manner as before. Both of the returned `ReadStream` objects can + * also be read from, and the rows returned by both child streams will be the same as the rows + * read from the original stream. + * + *

Moreover, the two child streams will be allocated back-to-back in the original `ReadStream`. + * Concretely, it is guaranteed that for streams original, primary, and residual, that + * original[0-j] = primary[0-j] and original[j-n] = residual[0-m] once the streams have been read + * to completion. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   SplitReadStreamRequest request = SplitReadStreamRequest.newBuilder().build();
+   *   ApiFuture<SplitReadStreamResponse> future = bigQueryReadClient.splitReadStreamCallable().futureCall(request);
+   *   // Do something
+   *   SplitReadStreamResponse response = future.get();
+   * }
+   * 
+ */ + public final UnaryCallable + splitReadStreamCallable() { + return stub.splitReadStreamCallable(); + } + + @Override + public final void close() { + stub.close(); + } + + @Override + public void shutdown() { + stub.shutdown(); + } + + @Override + public boolean isShutdown() { + return stub.isShutdown(); + } + + @Override + public boolean isTerminated() { + return stub.isTerminated(); + } + + @Override + public void shutdownNow() { + stub.shutdownNow(); + } + + @Override + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return stub.awaitTermination(duration, unit); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java new file mode 100644 index 0000000000..fcf02a2331 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java @@ -0,0 +1,202 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFunction; +import com.google.api.core.BetaApi; +import com.google.api.gax.core.GoogleCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ClientSettings; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; +import java.io.IOException; +import java.util.List; + +/** + * Settings class to configure an instance of {@link BigQueryReadClient}. + * + *

The default instance has everything set to sensible defaults: + * + *

    + *
  • The default service address (bigquerystorage.googleapis.com) and default port (443) are + * used. + *
  • Credentials are acquired automatically through Application Default Credentials. + *
  • Retries are configured for idempotent methods but not for non-idempotent methods. + *
+ * + *

The builder of this class is recursive, so contained classes are themselves builders. When + * build() is called, the tree of builders is called to create the complete settings object. + * + *

For example, to set the total timeout of createReadSession to 30 seconds: + * + *

+ * 
+ * BigQueryReadSettings.Builder BigQueryReadSettingsBuilder =
+ *     BigQueryReadSettings.newBuilder();
+ * BigQueryReadSettingsBuilder.createReadSessionSettings().getRetrySettings().toBuilder()
+ *     .setTotalTimeout(Duration.ofSeconds(30));
+ * BigQueryReadSettings BigQueryReadSettings = BigQueryReadSettingsBuilder.build();
+ * 
+ * 
+ */ +@BetaApi +public class BigQueryReadSettings extends ClientSettings { + /** Returns the object with the settings used for calls to createReadSession. */ + public UnaryCallSettings createReadSessionSettings() { + return getTypedStubSettings().createReadSessionSettings(); + } + + /** Returns the object with the settings used for calls to readRows. */ + public ServerStreamingCallSettings readRowsSettings() { + return getTypedStubSettings().readRowsSettings(); + } + + /** Returns the object with the settings used for calls to splitReadStream. */ + public UnaryCallSettings + splitReadStreamSettings() { + return getTypedStubSettings().splitReadStreamSettings(); + } + + EnhancedBigQueryReadStubSettings getTypedStubSettings() { + return (EnhancedBigQueryReadStubSettings) getStubSettings(); + } + + public static final BigQueryReadSettings create(EnhancedBigQueryReadStubSettings stub) + throws IOException { + return new BigQueryReadSettings.Builder(stub.toBuilder()).build(); + } + + /** Returns a builder for the default ExecutorProvider for this service. */ + public static InstantiatingExecutorProvider.Builder defaultExecutorProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultExecutorProviderBuilder(); + } + + /** Returns the default service endpoint. */ + public static String getDefaultEndpoint() { + return EnhancedBigQueryReadStubSettings.getDefaultEndpoint(); + } + + /** Returns the default service scopes. */ + public static List getDefaultServiceScopes() { + return EnhancedBigQueryReadStubSettings.getDefaultServiceScopes(); + } + + /** Returns a builder for the default credentials for this service. */ + public static GoogleCredentialsProvider.Builder defaultCredentialsProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultCredentialsProviderBuilder(); + } + + /** Returns a builder for the default ChannelProvider for this service. */ + public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultGrpcTransportProviderBuilder(); + } + + public static TransportChannelProvider defaultTransportChannelProvider() { + return EnhancedBigQueryReadStubSettings.defaultTransportChannelProvider(); + } + + @BetaApi("The surface for customizing headers is not stable yet and may change in the future.") + public static ApiClientHeaderProvider.Builder defaultApiClientHeaderProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultApiClientHeaderProviderBuilder(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder() { + return Builder.createDefault(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder(ClientContext clientContext) { + return new Builder(clientContext); + } + + /** Returns a builder containing all the values of this settings class. */ + public Builder toBuilder() { + return new Builder(this); + } + + protected BigQueryReadSettings(Builder settingsBuilder) throws IOException { + super(settingsBuilder); + } + + /** Builder for BigQueryReadSettings. */ + public static class Builder extends ClientSettings.Builder { + protected Builder() throws IOException { + this((ClientContext) null); + } + + protected Builder(ClientContext clientContext) { + super(EnhancedBigQueryReadStubSettings.newBuilder(clientContext)); + } + + private static Builder createDefault() { + return new Builder(EnhancedBigQueryReadStubSettings.newBuilder()); + } + + protected Builder(BigQueryReadSettings settings) { + super(settings.getStubSettings().toBuilder()); + } + + protected Builder(EnhancedBigQueryReadStubSettings.Builder stubSettings) { + super(stubSettings); + } + + public EnhancedBigQueryReadStubSettings.Builder getStubSettingsBuilder() { + return ((EnhancedBigQueryReadStubSettings.Builder) getStubSettings()); + } + + // NEXT_MAJOR_VER: remove 'throws Exception' + /** + * Applies the given settings updater function to all of the unary API methods in this service. + * + *

Note: This method does not support applying settings to streaming methods. + */ + public Builder applyToAllUnaryMethods( + ApiFunction, Void> settingsUpdater) throws Exception { + super.applyToAllUnaryMethods( + getStubSettingsBuilder().unaryMethodSettingsBuilders(), settingsUpdater); + return this; + } + + /** Returns the builder for the settings used for calls to createReadSession. */ + public UnaryCallSettings.Builder + createReadSessionSettings() { + return getStubSettingsBuilder().createReadSessionSettings(); + } + + /** Returns the builder for the settings used for calls to readRows. */ + public ServerStreamingCallSettings.Builder + readRowsSettings() { + return getStubSettingsBuilder().readRowsSettings(); + } + + /** Returns the builder for the settings used for calls to splitReadStream. */ + public UnaryCallSettings.Builder + splitReadStreamSettings() { + return getStubSettingsBuilder().splitReadStreamSettings(); + } + + @Override + public BigQueryReadSettings build() throws IOException { + return new BigQueryReadSettings(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java new file mode 100644 index 0000000000..16b768d09a --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java @@ -0,0 +1,121 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import com.google.api.core.InternalApi; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Enhanced stub class for BigQuery Storage API. + * + *

This class is for advanced usage and reflects the underlying API directly. + */ +public class EnhancedBigQueryReadStub implements BackgroundResource { + private final GrpcBigQueryReadStub stub; + + public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) + throws IOException { + // Configure the base settings. + BigQueryReadStubSettings.Builder baseSettingsBuilder = + BigQueryReadStubSettings.newBuilder() + .setTransportChannelProvider(settings.getTransportChannelProvider()) + .setEndpoint(settings.getEndpoint()) + .setHeaderProvider(settings.getHeaderProvider()) + .setCredentialsProvider(settings.getCredentialsProvider()) + .setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval()) + .setStreamWatchdogProvider(settings.getStreamWatchdogProvider()); + + baseSettingsBuilder + .createReadSessionSettings() + .setRetryableCodes(settings.createReadSessionSettings().getRetryableCodes()) + .setRetrySettings(settings.createReadSessionSettings().getRetrySettings()); + + baseSettingsBuilder + .readRowsSettings() + .setRetryableCodes(settings.readRowsSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowsSettings().getRetrySettings()) + .setResumptionStrategy(settings.readRowsSettings().getResumptionStrategy()) + .setIdleTimeout(settings.readRowsSettings().getIdleTimeout()); + + baseSettingsBuilder + .splitReadStreamSettings() + .setRetryableCodes(settings.splitReadStreamSettings().getRetryableCodes()) + .setRetrySettings(settings.splitReadStreamSettings().getRetrySettings()); + + BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); + ClientContext clientContext = ClientContext.create(baseSettings); + GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); + return new EnhancedBigQueryReadStub(stub); + } + + @InternalApi("Visible for testing") + EnhancedBigQueryReadStub(GrpcBigQueryReadStub stub) { + this.stub = stub; + } + + public UnaryCallable createReadSessionCallable() { + return stub.createReadSessionCallable(); + } + + public ServerStreamingCallable readRowsCallable() { + return stub.readRowsCallable(); + } + + public UnaryCallable splitReadStreamCallable() { + return stub.splitReadStreamCallable(); + } + + @Override + public void close() { + stub.close(); + } + + @Override + public void shutdown() { + stub.shutdown(); + } + + @Override + public boolean isShutdown() { + return stub.isShutdown(); + } + + @Override + public boolean isTerminated() { + return stub.isTerminated(); + } + + @Override + public void shutdownNow() { + stub.shutdownNow(); + } + + @Override + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return stub.awaitTermination(duration, unit); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java new file mode 100644 index 0000000000..190f355779 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java @@ -0,0 +1,232 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import com.google.api.core.ApiFunction; +import com.google.api.core.BetaApi; +import com.google.api.gax.core.GoogleCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.StubSettings; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.cloud.bigquery.storage.v1.BaseBigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import com.google.cloud.bigquery.storage.v1.stub.readrows.ReadRowsResumptionStrategy; +import com.google.common.collect.ImmutableList; +import java.util.List; + +/** + * Settings class to configure an instance of {@link EnhancedBigQueryReadStub}. + * + *

The default instance dynamically reads and applies the default values used by {@link + * BigQueryReadStub}. + * + *

The builder of this class is recursive, so contained classes are themselves builders. When + * build() is called, the tree of builders is called to create the complete settings object. For + * example, to set the total timeout of createReadSession to 30 seconds: + * + *

+ * 
+ * EnhancedBigQueryReadStubSettings.Builder builder =
+ *     EnhancedBigQueryReadStubSettings.newBuilder();
+ * builder.createReadSessionSettings().getRetrySettings().toBuilder()
+ *     .setTotalTimeout(Duration.ofSeconds(30));
+ * EnhancedBigQueryReadStubSettings settings = builder.build();
+ * 
+ * 
+ */ +public class EnhancedBigQueryReadStubSettings + extends StubSettings { + + private final UnaryCallSettings createReadSessionSettings; + private final ServerStreamingCallSettings readRowsSettings; + private final UnaryCallSettings + splitReadStreamSettings; + + /** Returns the object with the settings used for calls to createReadSession. */ + public UnaryCallSettings createReadSessionSettings() { + return createReadSessionSettings; + } + + /** Returns the object with the settings used for calls to readRows. */ + public ServerStreamingCallSettings readRowsSettings() { + return readRowsSettings; + } + + /** Returns the object with the settings used for calls to splitReadStream. */ + public UnaryCallSettings + splitReadStreamSettings() { + return splitReadStreamSettings; + } + + /** Returns a builder for the default ExecutorProvider for this service. */ + public static InstantiatingExecutorProvider.Builder defaultExecutorProviderBuilder() { + return BigQueryReadStubSettings.defaultExecutorProviderBuilder(); + } + + /** Returns the default service endpoint. */ + public static String getDefaultEndpoint() { + return BigQueryReadStubSettings.getDefaultEndpoint(); + } + + /** Returns the default service scopes. */ + public static List getDefaultServiceScopes() { + return BigQueryReadStubSettings.getDefaultServiceScopes(); + } + + /** Returns a builder for the default credentials for this service. */ + public static GoogleCredentialsProvider.Builder defaultCredentialsProviderBuilder() { + return BaseBigQueryReadSettings.defaultCredentialsProviderBuilder(); + } + + /** Returns a builder for the default ChannelProvider for this service. */ + public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { + return BigQueryReadStubSettings.defaultGrpcTransportProviderBuilder(); + } + + public static TransportChannelProvider defaultTransportChannelProvider() { + return defaultGrpcTransportProviderBuilder().build(); + } + + @BetaApi("The surface for customizing headers is not stable yet and may change in the future.") + public static ApiClientHeaderProvider.Builder defaultApiClientHeaderProviderBuilder() { + return BigQueryReadStubSettings.defaultApiClientHeaderProviderBuilder(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder(ClientContext clientContext) { + return new Builder(clientContext); + } + + /** Returns a builder containing all the values of this settings class. */ + public Builder toBuilder() { + return new Builder(this); + } + + protected EnhancedBigQueryReadStubSettings(Builder settingsBuilder) { + super(settingsBuilder); + + createReadSessionSettings = settingsBuilder.createReadSessionSettings().build(); + readRowsSettings = settingsBuilder.readRowsSettings().build(); + splitReadStreamSettings = settingsBuilder.splitReadStreamSettings().build(); + } + + /** Builder for {@link EnhancedBigQueryReadStubSettings}. */ + public static class Builder + extends StubSettings.Builder { + private final ImmutableList> unaryMethodSettingsBuilders; + + private final UnaryCallSettings.Builder + createReadSessionSettings; + private final ServerStreamingCallSettings.Builder + readRowsSettings; + private final UnaryCallSettings.Builder + splitReadStreamSettings; + + protected Builder() { + this((ClientContext) null); + } + + protected Builder(ClientContext clientContext) { + super(clientContext); + + // Defaults provider + BigQueryReadStubSettings.Builder baseDefaults = BigQueryReadStubSettings.newBuilder(); + setEndpoint(baseDefaults.getEndpoint()); + setTransportChannelProvider(defaultTransportChannelProvider()); + setCredentialsProvider(baseDefaults.getCredentialsProvider()); + setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval()); + setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider()); + + // Per-method settings using baseSettings for defaults. + createReadSessionSettings = baseDefaults.createReadSessionSettings(); + splitReadStreamSettings = baseDefaults.splitReadStreamSettings(); + + // Per-method settings using override values for defaults. + readRowsSettings = + baseDefaults.readRowsSettings().setResumptionStrategy(new ReadRowsResumptionStrategy()); + + unaryMethodSettingsBuilders = + ImmutableList.>of( + createReadSessionSettings, splitReadStreamSettings); + } + + protected Builder(EnhancedBigQueryReadStubSettings settings) { + super(settings); + + createReadSessionSettings = settings.createReadSessionSettings.toBuilder(); + readRowsSettings = settings.readRowsSettings.toBuilder(); + splitReadStreamSettings = settings.splitReadStreamSettings.toBuilder(); + + unaryMethodSettingsBuilders = + ImmutableList.>of( + createReadSessionSettings, splitReadStreamSettings); + } + + // NEXT_MAJOR_VER: remove 'throws Exception' + /** + * Applies the given settings updater function to all of the unary API methods in this service. + * + *

Note: This method does not support applying settings to streaming methods. + */ + public Builder applyToAllUnaryMethods( + ApiFunction, Void> settingsUpdater) throws Exception { + super.applyToAllUnaryMethods(unaryMethodSettingsBuilders, settingsUpdater); + return this; + } + + public ImmutableList> unaryMethodSettingsBuilders() { + return unaryMethodSettingsBuilders; + } + + /** Returns the builder for the settings used for calls to createReadSession. */ + public UnaryCallSettings.Builder + createReadSessionSettings() { + return createReadSessionSettings; + } + + /** Returns the builder for the settings used for calls to readRows. */ + public ServerStreamingCallSettings.Builder + readRowsSettings() { + return readRowsSettings; + } + + /** Returns the builder for the settings used for calls to splitReadStream. */ + public UnaryCallSettings.Builder + splitReadStreamSettings() { + return splitReadStreamSettings; + } + + @Override + public EnhancedBigQueryReadStubSettings build() { + return new EnhancedBigQueryReadStubSettings(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java new file mode 100644 index 0000000000..e14d58b58d --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import javax.annotation.Nonnull; + +/** + * An implementation of a {@link StreamResumptionStrategy} for the ReadRows API. This class tracks + * the offset of the last row received and, upon retry, attempts to resume the stream at the next + * offset. + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class ReadRowsResumptionStrategy + implements StreamResumptionStrategy { + + // Number of rows processed. + private long rowsProcessed = 0; + + @Override + @Nonnull + public StreamResumptionStrategy createNew() { + return new ReadRowsResumptionStrategy(); + } + + @Override + @Nonnull + public ReadRowsResponse processResponse(ReadRowsResponse response) { + rowsProcessed += response.getRowCount(); + return response; + } + + /** + * {@inheritDoc} + * + *

Given the initial/original request, this implementation generates a request that will yield + * a new stream whose first response would come right after the last response received by + * processResponse. It takes into account the offset from the original request. + */ + @Override + public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) { + ReadRowsRequest.Builder resumeRequestBuilder = originalRequest.toBuilder(); + + resumeRequestBuilder.setOffset(originalRequest.getOffset() + rowsProcessed); + + return resumeRequestBuilder.build(); + } + + @Override + public boolean canResume() { + return true; + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java new file mode 100644 index 0000000000..111054d34f --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java @@ -0,0 +1,16 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub.readrows; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java new file mode 100644 index 0000000000..01f9101a7e --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -0,0 +1,165 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.grpc.testing.MockStreamObserver; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; +import com.google.protobuf.AbstractMessage; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class BigQueryReadClientTest { + private static MockBigQueryRead mockBigQueryRead; + private static MockServiceHelper serviceHelper; + private BigQueryReadClient client; + private LocalChannelProvider channelProvider; + + @BeforeClass + public static void startStaticServer() { + mockBigQueryRead = new MockBigQueryRead(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(mockBigQueryRead)); + serviceHelper.start(); + } + + @AfterClass + public static void stopServer() { + serviceHelper.stop(); + } + + @Before + public void setUp() throws IOException { + serviceHelper.reset(); + channelProvider = serviceHelper.createChannelProvider(); + BigQueryReadSettings settings = + BigQueryReadSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + client = BigQueryReadClient.create(settings); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @Test + @SuppressWarnings("all") + public void createReadSessionTest() { + String name = "name3373707"; + String table = "table110115790"; + ReadSession expectedResponse = ReadSession.newBuilder().setName(name).setTable(table).build(); + mockBigQueryRead.addResponse(expectedResponse); + + String parent = "parent-995424086"; + ReadSession readSession = ReadSession.newBuilder().build(); + int maxStreamCount = 940837515; + + ReadSession actualResponse = client.createReadSession(parent, readSession, maxStreamCount); + Assert.assertEquals(expectedResponse, actualResponse); + + List actualRequests = mockBigQueryRead.getRequests(); + Assert.assertEquals(1, actualRequests.size()); + CreateReadSessionRequest actualRequest = (CreateReadSessionRequest) actualRequests.get(0); + + Assert.assertEquals(parent, actualRequest.getParent()); + Assert.assertEquals(readSession, actualRequest.getReadSession()); + Assert.assertEquals(maxStreamCount, actualRequest.getMaxStreamCount()); + Assert.assertTrue( + channelProvider.isHeaderSent( + ApiClientHeaderProvider.getDefaultApiClientHeaderKey(), + GaxGrpcProperties.getDefaultApiClientHeaderPattern())); + } + + @Test + @SuppressWarnings("all") + public void createReadSessionExceptionTest() throws Exception { + StatusRuntimeException exception = new StatusRuntimeException(Status.INVALID_ARGUMENT); + mockBigQueryRead.addException(exception); + + try { + String parent = "parent-995424086"; + ReadSession readSession = ReadSession.newBuilder().build(); + int maxStreamCount = 940837515; + + client.createReadSession(parent, readSession, maxStreamCount); + Assert.fail("No exception raised"); + } catch (InvalidArgumentException e) { + // Expected exception + } + } + + @Test + @SuppressWarnings("all") + public void readRowsTest() throws Exception { + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + Assert.assertEquals(expectedResponse, actualResponses.get(0)); + } + + @Test + @SuppressWarnings("all") + public void readRowsExceptionTest() throws Exception { + StatusRuntimeException exception = new StatusRuntimeException(Status.INVALID_ARGUMENT); + mockBigQueryRead.addException(exception); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof InvalidArgumentException); + InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); + Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java new file mode 100644 index 0000000000..d86067b1bc --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.api.gax.rpc.WatchdogProvider; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class EnhancedBigQueryReadStubSettingsTest { + + @Test + public void testSettingsArePreserved() { + String endpoint = "some.other.host:123"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Duration watchdogInterval = Duration.ofSeconds(12); + WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); + + EnhancedBigQueryReadStubSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder() + .setEndpoint(endpoint) + .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogCheckInterval(watchdogInterval) + .setStreamWatchdogProvider(watchdogProvider); + + verifyBuilder(builder, endpoint, credentialsProvider, watchdogInterval, watchdogProvider); + + verifySettings( + builder.build(), endpoint, credentialsProvider, watchdogInterval, watchdogProvider); + + verifyBuilder( + builder.build().toBuilder(), + endpoint, + credentialsProvider, + watchdogInterval, + watchdogProvider); + } + + private void verifyBuilder( + EnhancedBigQueryReadStubSettings.Builder builder, + String endpoint, + CredentialsProvider credentialsProvider, + Duration watchdogInterval, + WatchdogProvider watchdogProvider) { + assertThat(builder.getEndpoint()).isEqualTo(endpoint); + assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider); + assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(builder.getStreamWatchdogProvider()).isEqualTo(watchdogProvider); + + InstantiatingGrpcChannelProvider channelProvider = + (InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider(); + assertThat(channelProvider.toBuilder().getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + } + + private void verifySettings( + EnhancedBigQueryReadStubSettings settings, + String endpoint, + CredentialsProvider credentialsProvider, + Duration watchdogInterval, + WatchdogProvider watchdogProvider) { + assertThat(settings.getEndpoint()).isEqualTo(endpoint); + assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider); + assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(settings.getStreamWatchdogProvider()).isEqualTo(watchdogProvider); + + InstantiatingGrpcChannelProvider channelProvider = + (InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider(); + assertThat(channelProvider.toBuilder().getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + public void testCreateReadSessionSettings() { + UnaryCallSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder().createReadSessionSettings(); + verifyRetrySettings(builder.getRetryableCodes(), builder.getRetrySettings()); + } + + @Test + public void testReadRowsSettings() { + ServerStreamingCallSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder().readRowsSettings(); + assertThat(builder.getRetryableCodes()).contains(Code.UNAVAILABLE); + RetrySettings retrySettings = builder.getRetrySettings(); + assertThat(retrySettings.getInitialRetryDelay()).isEqualTo(Duration.ofMillis(100L)); + assertThat(retrySettings.getRetryDelayMultiplier()).isWithin(1e-6).of(1.3); + assertThat(retrySettings.getMaxRetryDelay()).isEqualTo(Duration.ofMinutes(1L)); + assertThat(retrySettings.getInitialRpcTimeout()).isEqualTo(Duration.ofDays(1L)); + assertThat(retrySettings.getRpcTimeoutMultiplier()).isWithin(1e-6).of(1.0); + assertThat(retrySettings.getMaxRpcTimeout()).isEqualTo(Duration.ofDays(1L)); + assertThat(retrySettings.getTotalTimeout()).isEqualTo(Duration.ofDays(1L)); + assertThat(builder.getIdleTimeout()).isEqualTo(Duration.ZERO); + } + + @Test + public void testSplitReadStreamSettings() { + UnaryCallSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder().splitReadStreamSettings(); + verifyRetrySettings(builder.getRetryableCodes(), builder.getRetrySettings()); + } + + private void verifyRetrySettings(Set retryCodes, RetrySettings retrySettings) { + assertThat(retryCodes).contains(Code.UNAVAILABLE); + assertThat(retrySettings.getTotalTimeout()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getInitialRetryDelay()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getRetryDelayMultiplier()).isAtLeast(1.0); + assertThat(retrySettings.getMaxRetryDelay()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getInitialRpcTimeout()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getRpcTimeoutMultiplier()).isAtLeast(1.0); + assertThat(retrySettings.getMaxRpcTimeout()).isGreaterThan(Duration.ZERO); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java new file mode 100644 index 0000000000..82e533dc05 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import static com.google.common.truth.Truth.assertWithMessage; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.InProcessServer; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.UnimplementedException; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc.BigQueryReadImplBase; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import java.util.regex.Pattern; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ResourceHeaderTest { + + private static final String TEST_TABLE_REFERENCE = + "projects/project/datasets/dataset/tables/table"; + + private static final String TEST_STREAM_NAME = "streamName"; + + private static final String NAME = "resource-header-test:123"; + + private static final String HEADER_NAME = "x-goog-request-params"; + + private static final Pattern READ_SESSION_NAME_PATTERN = + Pattern.compile( + ".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*"); + private static final Pattern READ_STREAM_PATTERN = + Pattern.compile(".*" + "read_stream=streamName" + ".*"); + private static final Pattern STREAM_NAME_PATTERN = + Pattern.compile(".*" + "name=streamName" + ".*"); + + private static final String TEST_HEADER_NAME = "simple-header-name"; + private static final String TEST_HEADER_VALUE = "simple-header-value"; + private static final Pattern TEST_PATTERN = Pattern.compile(".*" + TEST_HEADER_VALUE + ".*"); + + private static InProcessServer server; + + private LocalChannelProvider channelProvider; + private BigQueryReadClient client; + + @BeforeClass + public static void setUpClass() throws Exception { + server = new InProcessServer<>(new BigQueryReadImplBase() {}, NAME); + server.start(); + } + + @Before + public void setUp() throws Exception { + channelProvider = LocalChannelProvider.create(NAME); + BigQueryReadSettings.Builder settingsBuilder = + BigQueryReadSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE)) + .setTransportChannelProvider(channelProvider); + client = BigQueryReadClient.create(settingsBuilder.build()); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + server.stop(); + server.blockUntilShutdown(); + } + + @Test + public void createReadSessionTest() { + try { + client.createReadSession( + "parents/project", ReadSession.newBuilder().setTable(TEST_TABLE_REFERENCE).build(), 1); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + verifyHeaderSent(READ_SESSION_NAME_PATTERN); + } + + @Test + public void readRowsTest() { + try { + ReadRowsRequest request = + ReadRowsRequest.newBuilder().setReadStream(TEST_STREAM_NAME).setOffset(125).build(); + client.readRowsCallable().call(request); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + + verifyHeaderSent(READ_STREAM_PATTERN); + } + + @Test + public void splitReadStreamTest() { + try { + client.splitReadStream(SplitReadStreamRequest.newBuilder().setName(TEST_STREAM_NAME).build()); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + + verifyHeaderSent(STREAM_NAME_PATTERN); + } + + private void verifyHeaderSent(Pattern... patterns) { + for (Pattern pattern : patterns) { + boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + boolean testHeaderSent = channelProvider.isHeaderSent(TEST_HEADER_NAME, TEST_PATTERN); + assertWithMessage("Provided header was sent").that(testHeaderSent).isTrue(); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java new file mode 100644 index 0000000000..2a2e513bec --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java @@ -0,0 +1,243 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub.readrows; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc.BigQueryReadImplBase; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.common.collect.Queues; +import io.grpc.Status.Code; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ReadRowsRetryTest { + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + + private TestBigQueryStorageService service; + private BigQueryReadClient client; + + @Before + public void setUp() throws IOException { + service = new TestBigQueryStorageService(); + serverRule.getServiceRegistry().addService(service); + + BigQueryReadSettings settings = + BigQueryReadSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + .build(); + + client = BigQueryReadClient.create(settings); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @Test + public void happyPathTest() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7)); + + Assert.assertEquals(17, getRowCount(request)); + } + + @Test + public void immediateRetryTest() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7)); + + Assert.assertEquals(17, getRowCount(request)); + } + + @Test + public void multipleRetryTestWithZeroInitialOffset() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(5) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 5) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create().expectRequest("fake-stream", 22).respondWithNumberOfRows(6)); + + Assert.assertEquals(28, getRowCount(request)); + } + + @Test + public void multipleRetryTestWithNonZeroInitialOffset() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 17); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 17) + .respondWithNumberOfRows(5) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 22) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create().expectRequest("fake-stream", 39).respondWithNumberOfRows(3)); + + Assert.assertEquals(25, getRowCount(request)); + } + + @Test + public void errorAtTheVeryEndTest() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create().expectRequest("fake-stream", 17).respondWithNumberOfRows(0)); + + Assert.assertEquals(17, getRowCount(request)); + } + + private int getRowCount(ReadRowsRequest request) { + ServerStream serverStream = client.readRowsCallable().call(request); + int rowCount = 0; + for (ReadRowsResponse readRowsResponse : serverStream) { + rowCount += readRowsResponse.getRowCount(); + } + return rowCount; + } + + private static class TestBigQueryStorageService extends BigQueryReadImplBase { + + Queue expectations = Queues.newArrayDeque(); + int currentRequestIndex = -1; + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + + RpcExpectation expectedRpc = expectations.poll(); + currentRequestIndex++; + + Assert.assertNotNull( + "Unexpected request #" + currentRequestIndex + ": " + request.toString(), expectedRpc); + + Assert.assertEquals( + "Expected request #" + + currentRequestIndex + + " does not match actual request: " + + request.toString(), + expectedRpc.expectedRequest, + request); + + for (ReadRowsResponse response : expectedRpc.responses) { + responseObserver.onNext(response); + } + + if (expectedRpc.statusCode.toStatus().isOk()) { + responseObserver.onCompleted(); + } else { + responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException()); + } + } + } + + private static class RpcExpectation { + + ReadRowsRequest expectedRequest; + Code statusCode; + List responses; + + private RpcExpectation() { + statusCode = Code.OK; + responses = new ArrayList<>(); + } + + static RpcExpectation create() { + return new RpcExpectation(); + } + + static ReadRowsRequest createRequest(String streamName, long offset) { + return ReadRowsRequest.newBuilder().setReadStream(streamName).setOffset(offset).build(); + } + + static ReadRowsResponse createResponse(int numberOfRows) { + return ReadRowsResponse.newBuilder().setRowCount(numberOfRows).build(); + } + + RpcExpectation expectRequest(String streamName, long offset) { + expectedRequest = createRequest(streamName, offset); + return this; + } + + RpcExpectation respondWithNumberOfRows(int numberOfRows) { + responses.add(createResponse(numberOfRows)); + return this; + } + + RpcExpectation respondWithStatus(Code code) { + this.statusCode = code; + return this; + } + } +}