diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java new file mode 100644 index 0000000000..e3fbf56165 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java @@ -0,0 +1,376 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.BetaApi; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Service Description: BigQuery Read API. + * + *
The Read API can be used to read data from BigQuery. + * + *
This class provides the ability to make remote calls to the backing service through method + * calls that map to API methods. Sample code to get started: + * + *
+ *
+ * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+ * String parent = "";
+ * ReadSession readSession = ReadSession.newBuilder().build();
+ * int maxStreamCount = 0;
+ * ReadSession response = BigQueryReadClient.createReadSession(parent, readSession, maxStreamCount);
+ * }
+ *
+ *
+ *
+ * Note: close() needs to be called on the BigQueryReadClient object to clean up resources such + * as threads. In the example above, try-with-resources is used, which automatically calls close(). + * + *
The surface of this class includes several types of Java methods for each of the API's + * methods: + * + *
See the individual methods for example code. + * + *
Many parameters require resource names to be formatted in a particular way. To assist with + * these names, this class includes a format method for each type of name, and additionally a parse + * method to extract the individual identifiers contained within names that are returned. + * + *
This class can be customized by passing in a custom instance of BigQueryReadSettings to + * create(). For example: + * + *
To customize credentials: + * + *
+ *
+ * BigQueryReadSettings BigQueryReadSettings =
+ * BigQueryReadSettings.newBuilder()
+ * .setCredentialsProvider(FixedCredentialsProvider.create(myCredentials))
+ * .build();
+ * BigQueryReadClient BigQueryReadClient =
+ * BigQueryReadClient.create(BigQueryReadSettings);
+ *
+ *
+ *
+ * To customize the endpoint:
+ *
+ *
+ *
+ * BigQueryReadSettings BigQueryReadSettings =
+ * BigQueryReadSettings.newBuilder().setEndpoint(myEndpoint).build();
+ * BigQueryReadClient BigQueryReadClient =
+ * BigQueryReadClient.create(BigQueryReadSettings);
+ *
+ *
+ */
+@BetaApi
+public class BigQueryReadClient implements BackgroundResource {
+ private final BigQueryReadSettings settings;
+ private final EnhancedBigQueryReadStub stub;
+
+ /** Constructs an instance of BigQueryReadClient with default settings. */
+ public static final BigQueryReadClient create() throws IOException {
+ return create(BigQueryReadSettings.newBuilder().build());
+ }
+
+ /**
+ * Constructs an instance of BigQueryReadClient, using the given settings. The channels are
+ * created based on the settings passed in, or defaults for any settings that are not set.
+ */
+ public static final BigQueryReadClient create(BigQueryReadSettings settings) throws IOException {
+ return new BigQueryReadClient(settings);
+ }
+
+ /**
+ * Constructs an instance of BigQueryReadClient, using the given stub for making calls. This is
+ * for advanced usage - prefer to use BigQueryReadSettings}.
+ */
+ @BetaApi("A restructuring of stub classes is planned, so this may break in the future")
+ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) {
+ return new BigQueryReadClient(stub);
+ }
+
+ /**
+ * Constructs an instance of BigQueryReadClient, using the given settings. This is protected so
+ * that it is easy to make a subclass, but otherwise, the static factory methods should be
+ * preferred.
+ */
+ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
+ this.settings = settings;
+ this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings());
+ }
+
+ @BetaApi("A restructuring of stub classes is planned, so this may break in the future")
+ protected BigQueryReadClient(EnhancedBigQueryReadStub stub) {
+ this.settings = null;
+ this.stub = stub;
+ }
+
+ public final BigQueryReadSettings getSettings() {
+ return settings;
+ }
+
+ @BetaApi("A restructuring of stub classes is planned, so this may break in the future")
+ public EnhancedBigQueryReadStub getStub() {
+ return stub;
+ }
+
+ /**
+ * Creates a new read session. A read session divides the contents of a BigQuery table into one or
+ * more streams, which can then be used to read data from the table. The read session also
+ * specifies properties of the data to be read, such as a list of columns or a push-down filter
+ * describing the rows to be returned.
+ *
+ * A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *
Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *
Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *
Sample code: + * + *
+ * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+ * String parent = "";
+ * ReadSession readSession = ReadSession.newBuilder().build();
+ * int maxStreamCount = 0;
+ * ReadSession response = BigQueryReadClient.createReadSession(parent, readSession, maxStreamCount);
+ * }
+ *
+ *
+ * @param parent Required. The request project that owns the session, in the form of
+ * `projects/{project_id}`.
+ * @param readSession Required. Session to be created.
+ * @param maxStreamCount Max initial number of streams. If unset or zero, the server will provide
+ * a value of streams so as to produce reasonable throughput. Must be non-negative. The number
+ * of streams may be lower than the requested number, depending on the amount parallelism that
+ * is reasonable for the table. Error will be returned if the max count is greater than the
+ * current system max limit of 1,000.
+ * Streams must be read starting from offset 0. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final ReadSession createReadSession( + String parent, ReadSession readSession, int maxStreamCount) { + + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(readSession) + .setMaxStreamCount(maxStreamCount) + .build(); + return createReadSession(request); + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *
A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *
Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *
Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *
Sample code: + * + *
+ * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+ * CreateReadSessionRequest request = CreateReadSessionRequest.newBuilder().build();
+ * ReadSession response = BigQueryReadClient.createReadSession(request);
+ * }
+ *
+ *
+ * @param request The request object containing all of the parameters for the API call.
+ * @throws com.google.api.gax.rpc.ApiException if the remote call fails
+ */
+ public final ReadSession createReadSession(CreateReadSessionRequest request) {
+ return createReadSessionCallable().call(request);
+ }
+
+ /**
+ * Creates a new read session. A read session divides the contents of a BigQuery table into one or
+ * more streams, which can then be used to read data from the table. The read session also
+ * specifies properties of the data to be read, such as a list of columns or a push-down filter
+ * describing the rows to be returned.
+ *
+ * A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *
Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *
Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *
Sample code: + * + *
+ * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+ * CreateReadSessionRequest request = CreateReadSessionRequest.newBuilder().build();
+ * ApiFuture<ReadSession> future = BigQueryReadClient.createReadSessionCallable().futureCall(request);
+ * // Do something
+ * ReadSession response = future.get();
+ * }
+ *
+ */
+ public final UnaryCallableEach request also returns a set of stream statistics reflecting the current state of the + * stream. + * + *
Sample code: + * + *
+ * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+ * ReadRowsRequest request = ReadRowsRequest.newBuilder().build();
+ *
+ * ServerStream<ReadRowsResponse> stream = bigQueryReadClient.readRowsCallable().call(request);
+ * for (ReadRowsResponse response : stream) {
+ * // Do something when receive a response
+ * }
+ * }
+ *
+ */
+ public final ServerStreamingCallableMoreover, the two child streams will be allocated back-to-back in the original `ReadStream`. + * Concretely, it is guaranteed that for streams original, primary, and residual, that + * original[0-j] = primary[0-j] and original[j-n] = residual[0-m] once the streams have been read + * to completion. + * + *
Sample code: + * + *
+ * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+ * SplitReadStreamRequest request = SplitReadStreamRequest.newBuilder().build();
+ * SplitReadStreamResponse response = bigQueryReadClient.splitReadStream(request);
+ * }
+ *
+ *
+ * @param request The request object containing all of the parameters for the API call.
+ * @throws com.google.api.gax.rpc.ApiException if the remote call fails
+ */
+ public final SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) {
+ return splitReadStreamCallable().call(request);
+ }
+
+ /**
+ * Splits a given `ReadStream` into two `ReadStream` objects. These `ReadStream` objects are
+ * referred to as the primary and the residual streams of the split. The original `ReadStream` can
+ * still be read from in the same manner as before. Both of the returned `ReadStream` objects can
+ * also be read from, and the rows returned by both child streams will be the same as the rows
+ * read from the original stream.
+ *
+ * Moreover, the two child streams will be allocated back-to-back in the original `ReadStream`. + * Concretely, it is guaranteed that for streams original, primary, and residual, that + * original[0-j] = primary[0-j] and original[j-n] = residual[0-m] once the streams have been read + * to completion. + * + *
Sample code: + * + *
+ * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+ * SplitReadStreamRequest request = SplitReadStreamRequest.newBuilder().build();
+ * ApiFuture<SplitReadStreamResponse> future = bigQueryReadClient.splitReadStreamCallable().futureCall(request);
+ * // Do something
+ * SplitReadStreamResponse response = future.get();
+ * }
+ *
+ */
+ public final UnaryCallableThe default instance has everything set to sensible defaults: + * + *
The builder of this class is recursive, so contained classes are themselves builders. When + * build() is called, the tree of builders is called to create the complete settings object. + * + *
For example, to set the total timeout of createReadSession to 30 seconds: + * + *
+ *
+ * BigQueryReadSettings.Builder BigQueryReadSettingsBuilder =
+ * BigQueryReadSettings.newBuilder();
+ * BigQueryReadSettingsBuilder.createReadSessionSettings().getRetrySettings().toBuilder()
+ * .setTotalTimeout(Duration.ofSeconds(30));
+ * BigQueryReadSettings BigQueryReadSettings = BigQueryReadSettingsBuilder.build();
+ *
+ *
+ */
+@BetaApi
+public class BigQueryReadSettings extends ClientSettingsNote: This method does not support applying settings to streaming methods.
+ */
+ public Builder applyToAllUnaryMethods(
+ ApiFunction This class is for advanced usage and reflects the underlying API directly.
+ */
+public class EnhancedBigQueryReadStub implements BackgroundResource {
+ private final GrpcBigQueryReadStub stub;
+
+ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
+ throws IOException {
+ // Configure the base settings.
+ BigQueryReadStubSettings.Builder baseSettingsBuilder =
+ BigQueryReadStubSettings.newBuilder()
+ .setTransportChannelProvider(settings.getTransportChannelProvider())
+ .setEndpoint(settings.getEndpoint())
+ .setHeaderProvider(settings.getHeaderProvider())
+ .setCredentialsProvider(settings.getCredentialsProvider())
+ .setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval())
+ .setStreamWatchdogProvider(settings.getStreamWatchdogProvider());
+
+ baseSettingsBuilder
+ .createReadSessionSettings()
+ .setRetryableCodes(settings.createReadSessionSettings().getRetryableCodes())
+ .setRetrySettings(settings.createReadSessionSettings().getRetrySettings());
+
+ baseSettingsBuilder
+ .readRowsSettings()
+ .setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
+ .setRetrySettings(settings.readRowsSettings().getRetrySettings())
+ .setResumptionStrategy(settings.readRowsSettings().getResumptionStrategy())
+ .setIdleTimeout(settings.readRowsSettings().getIdleTimeout());
+
+ baseSettingsBuilder
+ .splitReadStreamSettings()
+ .setRetryableCodes(settings.splitReadStreamSettings().getRetryableCodes())
+ .setRetrySettings(settings.splitReadStreamSettings().getRetrySettings());
+
+ BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
+ ClientContext clientContext = ClientContext.create(baseSettings);
+ GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
+ return new EnhancedBigQueryReadStub(stub);
+ }
+
+ @InternalApi("Visible for testing")
+ EnhancedBigQueryReadStub(GrpcBigQueryReadStub stub) {
+ this.stub = stub;
+ }
+
+ public UnaryCallable The default instance dynamically reads and applies the default values used by {@link
+ * BigQueryReadStub}.
+ *
+ * The builder of this class is recursive, so contained classes are themselves builders. When
+ * build() is called, the tree of builders is called to create the complete settings object. For
+ * example, to set the total timeout of createReadSession to 30 seconds:
+ *
+ * Note: This method does not support applying settings to streaming methods.
+ */
+ public Builder applyToAllUnaryMethods(
+ ApiFunction This class is considered an internal implementation detail and not meant to be used by
+ * applications.
+ */
+@InternalApi
+public class ReadRowsResumptionStrategy
+ implements StreamResumptionStrategy Given the initial/original request, this implementation generates a request that will yield
+ * a new stream whose first response would come right after the last response received by
+ * processResponse. It takes into account the offset from the original request.
+ */
+ @Override
+ public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) {
+ ReadRowsRequest.Builder resumeRequestBuilder = originalRequest.toBuilder();
+
+ resumeRequestBuilder.setOffset(originalRequest.getOffset() + rowsProcessed);
+
+ return resumeRequestBuilder.build();
+ }
+
+ @Override
+ public boolean canResume() {
+ return true;
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java
new file mode 100644
index 0000000000..111054d34f
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java
@@ -0,0 +1,16 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1.stub.readrows;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java
new file mode 100644
index 0000000000..01f9101a7e
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GaxGrpcProperties;
+import com.google.api.gax.grpc.testing.LocalChannelProvider;
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.api.gax.grpc.testing.MockStreamObserver;
+import com.google.api.gax.rpc.ApiClientHeaderProvider;
+import com.google.api.gax.rpc.InvalidArgumentException;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.protobuf.AbstractMessage;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BigQueryReadClientTest {
+ private static MockBigQueryRead mockBigQueryRead;
+ private static MockServiceHelper serviceHelper;
+ private BigQueryReadClient client;
+ private LocalChannelProvider channelProvider;
+
+ @BeforeClass
+ public static void startStaticServer() {
+ mockBigQueryRead = new MockBigQueryRead();
+ serviceHelper =
+ new MockServiceHelper(
+ UUID.randomUUID().toString(), Arrays.
+ *
+ */
+public class EnhancedBigQueryReadStubSettings
+ extends StubSettings
+ * EnhancedBigQueryReadStubSettings.Builder builder =
+ * EnhancedBigQueryReadStubSettings.newBuilder();
+ * builder.createReadSessionSettings().getRetrySettings().toBuilder()
+ * .setTotalTimeout(Duration.ofSeconds(30));
+ * EnhancedBigQueryReadStubSettings settings = builder.build();
+ *
+ * retryCodes, RetrySettings retrySettings) {
+ assertThat(retryCodes).contains(Code.UNAVAILABLE);
+ assertThat(retrySettings.getTotalTimeout()).isGreaterThan(Duration.ZERO);
+ assertThat(retrySettings.getInitialRetryDelay()).isGreaterThan(Duration.ZERO);
+ assertThat(retrySettings.getRetryDelayMultiplier()).isAtLeast(1.0);
+ assertThat(retrySettings.getMaxRetryDelay()).isGreaterThan(Duration.ZERO);
+ assertThat(retrySettings.getInitialRpcTimeout()).isGreaterThan(Duration.ZERO);
+ assertThat(retrySettings.getRpcTimeoutMultiplier()).isAtLeast(1.0);
+ assertThat(retrySettings.getMaxRpcTimeout()).isGreaterThan(Duration.ZERO);
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java
new file mode 100644
index 0000000000..82e533dc05
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1.stub;
+
+import static com.google.common.truth.Truth.assertWithMessage;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.testing.InProcessServer;
+import com.google.api.gax.grpc.testing.LocalChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.UnimplementedException;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc.BigQueryReadImplBase;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
+import java.util.regex.Pattern;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ResourceHeaderTest {
+
+ private static final String TEST_TABLE_REFERENCE =
+ "projects/project/datasets/dataset/tables/table";
+
+ private static final String TEST_STREAM_NAME = "streamName";
+
+ private static final String NAME = "resource-header-test:123";
+
+ private static final String HEADER_NAME = "x-goog-request-params";
+
+ private static final Pattern READ_SESSION_NAME_PATTERN =
+ Pattern.compile(
+ ".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*");
+ private static final Pattern READ_STREAM_PATTERN =
+ Pattern.compile(".*" + "read_stream=streamName" + ".*");
+ private static final Pattern STREAM_NAME_PATTERN =
+ Pattern.compile(".*" + "name=streamName" + ".*");
+
+ private static final String TEST_HEADER_NAME = "simple-header-name";
+ private static final String TEST_HEADER_VALUE = "simple-header-value";
+ private static final Pattern TEST_PATTERN = Pattern.compile(".*" + TEST_HEADER_VALUE + ".*");
+
+ private static InProcessServer> server;
+
+ private LocalChannelProvider channelProvider;
+ private BigQueryReadClient client;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ server = new InProcessServer<>(new BigQueryReadImplBase() {}, NAME);
+ server.start();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ channelProvider = LocalChannelProvider.create(NAME);
+ BigQueryReadSettings.Builder settingsBuilder =
+ BigQueryReadSettings.newBuilder()
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE))
+ .setTransportChannelProvider(channelProvider);
+ client = BigQueryReadClient.create(settingsBuilder.build());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ client.close();
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ server.stop();
+ server.blockUntilShutdown();
+ }
+
+ @Test
+ public void createReadSessionTest() {
+ try {
+ client.createReadSession(
+ "parents/project", ReadSession.newBuilder().setTable(TEST_TABLE_REFERENCE).build(), 1);
+ } catch (UnimplementedException e) {
+ // Ignore the error: none of the methods are actually implemented.
+ }
+ verifyHeaderSent(READ_SESSION_NAME_PATTERN);
+ }
+
+ @Test
+ public void readRowsTest() {
+ try {
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder().setReadStream(TEST_STREAM_NAME).setOffset(125).build();
+ client.readRowsCallable().call(request);
+ } catch (UnimplementedException e) {
+ // Ignore the error: none of the methods are actually implemented.
+ }
+
+ verifyHeaderSent(READ_STREAM_PATTERN);
+ }
+
+ @Test
+ public void splitReadStreamTest() {
+ try {
+ client.splitReadStream(SplitReadStreamRequest.newBuilder().setName(TEST_STREAM_NAME).build());
+ } catch (UnimplementedException e) {
+ // Ignore the error: none of the methods are actually implemented.
+ }
+
+ verifyHeaderSent(STREAM_NAME_PATTERN);
+ }
+
+ private void verifyHeaderSent(Pattern... patterns) {
+ for (Pattern pattern : patterns) {
+ boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern);
+ assertWithMessage("Generated header was sent").that(headerSent).isTrue();
+ }
+ boolean testHeaderSent = channelProvider.isHeaderSent(TEST_HEADER_NAME, TEST_PATTERN);
+ assertWithMessage("Provided header was sent").that(testHeaderSent).isTrue();
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java
new file mode 100644
index 0000000000..2a2e513bec
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1.stub.readrows;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc.BigQueryReadImplBase;
+import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.common.collect.Queues;
+import io.grpc.Status.Code;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReadRowsRetryTest {
+
+ @Rule public GrpcServerRule serverRule = new GrpcServerRule();
+
+ private TestBigQueryStorageService service;
+ private BigQueryReadClient client;
+
+ @Before
+ public void setUp() throws IOException {
+ service = new TestBigQueryStorageService();
+ serverRule.getServiceRegistry().addService(service);
+
+ BigQueryReadSettings settings =
+ BigQueryReadSettings.newBuilder()
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .setTransportChannelProvider(
+ FixedTransportChannelProvider.create(
+ GrpcTransportChannel.create(serverRule.getChannel())))
+ .build();
+
+ client = BigQueryReadClient.create(settings);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ client.close();
+ }
+
+ @Test
+ public void happyPathTest() {
+ ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0);
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 0)
+ .respondWithNumberOfRows(10)
+ .respondWithNumberOfRows(7));
+
+ Assert.assertEquals(17, getRowCount(request));
+ }
+
+ @Test
+ public void immediateRetryTest() {
+ ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0);
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 0)
+ .respondWithStatus(Code.UNAVAILABLE));
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 0)
+ .respondWithNumberOfRows(10)
+ .respondWithNumberOfRows(7));
+
+ Assert.assertEquals(17, getRowCount(request));
+ }
+
+ @Test
+ public void multipleRetryTestWithZeroInitialOffset() {
+ ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0);
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 0)
+ .respondWithNumberOfRows(5)
+ .respondWithStatus(Code.UNAVAILABLE));
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 5)
+ .respondWithNumberOfRows(10)
+ .respondWithNumberOfRows(7)
+ .respondWithStatus(Code.UNAVAILABLE));
+
+ service.expectations.add(
+ RpcExpectation.create().expectRequest("fake-stream", 22).respondWithNumberOfRows(6));
+
+ Assert.assertEquals(28, getRowCount(request));
+ }
+
+ @Test
+ public void multipleRetryTestWithNonZeroInitialOffset() {
+ ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 17);
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 17)
+ .respondWithNumberOfRows(5)
+ .respondWithStatus(Code.UNAVAILABLE));
+
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 22)
+ .respondWithNumberOfRows(10)
+ .respondWithNumberOfRows(7)
+ .respondWithStatus(Code.UNAVAILABLE));
+
+ service.expectations.add(
+ RpcExpectation.create().expectRequest("fake-stream", 39).respondWithNumberOfRows(3));
+
+ Assert.assertEquals(25, getRowCount(request));
+ }
+
+ @Test
+ public void errorAtTheVeryEndTest() {
+ ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0);
+ service.expectations.add(
+ RpcExpectation.create()
+ .expectRequest("fake-stream", 0)
+ .respondWithNumberOfRows(10)
+ .respondWithNumberOfRows(7)
+ .respondWithStatus(Code.UNAVAILABLE));
+
+ service.expectations.add(
+ RpcExpectation.create().expectRequest("fake-stream", 17).respondWithNumberOfRows(0));
+
+ Assert.assertEquals(17, getRowCount(request));
+ }
+
+ private int getRowCount(ReadRowsRequest request) {
+ ServerStream