From 276f942dd2e668600347a59496525a589d7560da Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Tue, 17 Dec 2019 15:56:25 -0500 Subject: [PATCH] feat: add implementation of ChannelPrimer to establish connection to GFE and integrate into bigtable client (#115) --- .../data/v2/BigtableDataSettings.java | 26 +++++++++ .../data/v2/internal/RefreshChannel.java | 54 +++++++++++++++++++ .../v2/stub/EnhancedBigtableStubSettings.java | 42 +++++++++++++++ .../data/v2/internal/RefreshChannelTest.java | 38 +++++++++++++ .../EnhancedBigtableStubSettingsTest.java | 36 +++++++++++++ 5 files changed, 196 insertions(+) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index bc60362b8..ff9740880 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -184,6 +184,12 @@ public String getAppProfileId() { return stubSettings.getAppProfileId(); } + /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ + @BetaApi("This API depends on experimental gRPC APIs") + public boolean isRefreshingChannel() { + return stubSettings.isRefreshingChannel(); + } + /** Returns the underlying RPC settings. */ public EnhancedBigtableStubSettings getStubSettings() { return stubSettings; @@ -275,6 +281,26 @@ public CredentialsProvider getCredentialsProvider() { return stubSettings.getCredentialsProvider(); } + /** + * Configure periodic gRPC channel refreshes. + * + *

This feature will gracefully refresh connections to the Cloud Bigtable service. This is an + * experimental feature to address tail latency caused by the service dropping long lived gRPC + * connections, which causes the client to renegotiate the gRPC connection in the request path, + * which causes periodic spikes in latency + */ + @BetaApi("This API depends on experimental gRPC APIs") + public Builder setRefreshingChannel(boolean isRefreshingChannel) { + stubSettings.setRefreshingChannel(isRefreshingChannel); + return this; + } + + /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ + @BetaApi("This API depends on experimental gRPC APIs") + public boolean isRefreshingChannel() { + return stubSettings.isRefreshingChannel(); + } + /** * Returns the underlying settings for making RPC calls. The settings should be changed with * care. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java new file mode 100644 index 000000000..e34ecd750 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java @@ -0,0 +1,54 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.internal; + +import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.ChannelPrimer; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import java.util.concurrent.TimeUnit; + +/** + * Establish a connection to the Cloud Bigtable service on managedChannel + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@BetaApi("This API depends on gRPC experimental API") +@InternalApi +public final class RefreshChannel implements ChannelPrimer { + + /** + * primeChannel establishes a connection to Cloud Bigtable service. This typically take less than + * 1s. In case of service failure, an upper limit of 10s prevents primeChannel from looping + * forever. + */ + @Override + public void primeChannel(ManagedChannel managedChannel) { + for (int i = 0; i < 10; i++) { + ConnectivityState connectivityState = managedChannel.getState(true); + if (connectivityState == ConnectivityState.READY) { + break; + } + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index a23cf780c..11d472608 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.stub; +import com.google.api.core.BetaApi; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; @@ -29,6 +30,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.tracing.OpencensusTracerFactory; +import com.google.cloud.bigtable.data.v2.internal.RefreshChannel; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; @@ -149,6 +151,7 @@ public class EnhancedBigtableStubSettings extends StubSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -179,6 +182,7 @@ private EnhancedBigtableStubSettings(Builder builder) { projectId = builder.projectId; instanceId = builder.instanceId; appProfileId = builder.appProfileId; + isRefreshingChannel = builder.isRefreshingChannel; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -210,6 +214,12 @@ public String getAppProfileId() { return appProfileId; } + /** Returns if channels will gracefully refresh connections to Cloud Bigtable service */ + @BetaApi("This API depends on experimental gRPC APIs") + public boolean isRefreshingChannel() { + return isRefreshingChannel; + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -413,6 +423,7 @@ public static class Builder extends StubSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -433,6 +444,7 @@ public static class Builder extends StubSettings.Builder readRowsSettings() { return readRowsSettings; @@ -642,6 +672,18 @@ public EnhancedBigtableStubSettings build() { Preconditions.checkState(projectId != null, "Project id must be set"); Preconditions.checkState(instanceId != null, "Instance id must be set"); + // Set ChannelPrimer on TransportChannelProvider so channels will gracefully refresh + // connections to Cloud Bigtable service + if (isRefreshingChannel) { + Preconditions.checkArgument( + getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider, + "refreshingChannel only works with InstantiatingGrpcChannelProviders"); + InstantiatingGrpcChannelProvider.Builder channelBuilder = + ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()) + .toBuilder() + .setChannelPrimer(new RefreshChannel()); + setTransportChannelProvider(channelBuilder.build()); + } return new EnhancedBigtableStubSettings(this); } // diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java new file mode 100644 index 000000000..c41fa4d2a --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.internal; + +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class RefreshChannelTest { + // RefreshChannel should establish connection to the server through managedChannel.getState(true) + @Test + public void testGetStateIsCalled() { + RefreshChannel refreshChannel = new RefreshChannel(); + ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class); + + Mockito.doReturn(ConnectivityState.READY).when(managedChannel).getState(true); + + refreshChannel.primeChannel(managedChannel); + Mockito.verify(managedChannel, Mockito.atLeastOnce()).getState(true); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 6adf129f9..b9b6a69cd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -61,6 +61,7 @@ public void settingsAreNotLostTest() { String projectId = "my-project"; String instanceId = "my-instance"; String appProfileId = "my-app-profile-id"; + boolean isRefreshingChannel = true; String endpoint = "some.other.host:123"; CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); @@ -71,6 +72,7 @@ public void settingsAreNotLostTest() { .setProjectId(projectId) .setInstanceId(instanceId) .setAppProfileId(appProfileId) + .setRefreshingChannel(isRefreshingChannel) .setEndpoint(endpoint) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) @@ -81,6 +83,7 @@ public void settingsAreNotLostTest() { projectId, instanceId, appProfileId, + isRefreshingChannel, endpoint, credentialsProvider, watchdogProvider, @@ -90,6 +93,7 @@ public void settingsAreNotLostTest() { projectId, instanceId, appProfileId, + isRefreshingChannel, endpoint, credentialsProvider, watchdogProvider, @@ -99,6 +103,7 @@ public void settingsAreNotLostTest() { projectId, instanceId, appProfileId, + isRefreshingChannel, endpoint, credentialsProvider, watchdogProvider, @@ -110,6 +115,7 @@ private void verifyBuilder( String projectId, String instanceId, String appProfileId, + boolean isRefreshingChannel, String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, @@ -117,6 +123,7 @@ private void verifyBuilder( assertThat(builder.getProjectId()).isEqualTo(projectId); assertThat(builder.getInstanceId()).isEqualTo(instanceId); assertThat(builder.getAppProfileId()).isEqualTo(appProfileId); + assertThat(builder.isRefreshingChannel()).isEqualTo(isRefreshingChannel); assertThat(builder.getEndpoint()).isEqualTo(endpoint); assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); @@ -128,6 +135,7 @@ private void verifySettings( String projectId, String instanceId, String appProfileId, + boolean isRefreshingChannel, String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, @@ -135,6 +143,7 @@ private void verifySettings( assertThat(settings.getProjectId()).isEqualTo(projectId); assertThat(settings.getInstanceId()).isEqualTo(instanceId); assertThat(settings.getAppProfileId()).isEqualTo(appProfileId); + assertThat(settings.isRefreshingChannel()).isEqualTo(isRefreshingChannel); assertThat(settings.getEndpoint()).isEqualTo(endpoint); assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); @@ -521,4 +530,31 @@ private void verifyRetrySettingAreSane(Set retryCodes, RetrySettings retry assertThat(retrySettings.getRpcTimeoutMultiplier()).isAtLeast(1.0); assertThat(retrySettings.getMaxRpcTimeout()).isGreaterThan(Duration.ZERO); } + + @Test + public void isRefreshingChannelDefaultValueTest() { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId); + assertThat(builder.isRefreshingChannel()).isFalse(); + assertThat(builder.build().isRefreshingChannel()).isFalse(); + assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse(); + } + + @Test + public void isRefreshingChannelFalseValueTest() { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId) + .setRefreshingChannel(false); + assertThat(builder.isRefreshingChannel()).isFalse(); + assertThat(builder.build().isRefreshingChannel()).isFalse(); + assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse(); + } }