Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add experimental channel refreshing #115

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -184,6 +184,12 @@ public String getAppProfileId() {
return stubSettings.getAppProfileId();
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
public boolean isRefreshingChannel() {
return stubSettings.isRefreshingChannel();
}

/** Returns the underlying RPC settings. */
public EnhancedBigtableStubSettings getStubSettings() {
return stubSettings;
Expand Down Expand Up @@ -275,6 +281,26 @@ public CredentialsProvider getCredentialsProvider() {
return stubSettings.getCredentialsProvider();
}

/**
* Configure periodic gRPC channel refreshes.
*
* <p>This feature will gracefully refresh connections to the Cloud Bigtable service. This is an
* experimental feature to address tail latency caused by the service dropping long lived gRPC
* connections, which causes the client to renegotiate the gRPC connection in the request path,
* which causes periodic spikes in latency
*/
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
@BetaApi("This API depends on experimental gRPC APIs")
public Builder setRefreshingChannel(boolean isRefreshingChannel) {
stubSettings.setRefreshingChannel(isRefreshingChannel);
return this;
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
public boolean isRefreshingChannel() {
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
return stubSettings.isRefreshingChannel();
}

/**
* Returns the underlying settings for making RPC calls. The settings should be changed with
* care.
Expand Down
@@ -0,0 +1,54 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.internal;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelPrimer;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.util.concurrent.TimeUnit;

/**
* Establish a connection to the Cloud Bigtable service on managedChannel
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@BetaApi("This API depends on gRPC experimental API")
@InternalApi
public final class RefreshChannel implements ChannelPrimer {

/**
* primeChannel establishes a connection to Cloud Bigtable service. This typically take less than
* 1s. In case of service failure, an upper limit of 10s prevents primeChannel from looping
* forever.
*/
@Override
public void primeChannel(ManagedChannel managedChannel) {
for (int i = 0; i < 10; i++) {
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
ConnectivityState connectivityState = managedChannel.getState(true);
if (connectivityState == ConnectivityState.READY) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand All @@ -29,6 +30,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.RefreshChannel;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
Expand Down Expand Up @@ -149,6 +151,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String projectId;
private final String instanceId;
private final String appProfileId;
private final boolean isRefreshingChannel;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -179,6 +182,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
projectId = builder.projectId;
instanceId = builder.instanceId;
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -210,6 +214,12 @@ public String getAppProfileId() {
return appProfileId;
}

/** Returns if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
public boolean isRefreshingChannel() {
return isRefreshingChannel;
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -413,6 +423,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String projectId;
private String instanceId;
private String appProfileId;
private boolean isRefreshingChannel;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand All @@ -433,6 +444,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
*/
private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
Expand Down Expand Up @@ -515,6 +527,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
projectId = settings.projectId;
instanceId = settings.instanceId;
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -602,6 +615,23 @@ public String getAppProfileId() {
return appProfileId;
}

/**
* Sets if channels will gracefully refresh connections to Cloud Bigtable service
*
* @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel
*/
@BetaApi("This API depends on experimental gRPC APIs")
public Builder setRefreshingChannel(boolean isRefreshingChannel) {
this.isRefreshingChannel = isRefreshingChannel;
return this;
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
public boolean isRefreshingChannel() {
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
return isRefreshingChannel;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -642,6 +672,18 @@ public EnhancedBigtableStubSettings build() {
Preconditions.checkState(projectId != null, "Project id must be set");
Preconditions.checkState(instanceId != null, "Instance id must be set");

// Set ChannelPrimer on TransportChannelProvider so channels will gracefully refresh
// connections to Cloud Bigtable service
if (isRefreshingChannel) {
Preconditions.checkArgument(
getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider,
"refreshingChannel only works with InstantiatingGrpcChannelProviders");
InstantiatingGrpcChannelProvider.Builder channelBuilder =
((InstantiatingGrpcChannelProvider) getTransportChannelProvider())
.toBuilder()
.setChannelPrimer(new RefreshChannel());
setTransportChannelProvider(channelBuilder.build());
}
return new EnhancedBigtableStubSettings(this);
}
// </editor-fold>
Expand Down
@@ -0,0 +1,38 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.internal;

import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class RefreshChannelTest {
// RefreshChannel should establish connection to the server through managedChannel.getState(true)
@Test
public void testGetStateIsCalled() {
RefreshChannel refreshChannel = new RefreshChannel();
ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);

Mockito.doReturn(ConnectivityState.READY).when(managedChannel).getState(true);

refreshChannel.primeChannel(managedChannel);
Mockito.verify(managedChannel, Mockito.atLeastOnce()).getState(true);
}
}
Expand Up @@ -61,6 +61,7 @@ public void settingsAreNotLostTest() {
String projectId = "my-project";
String instanceId = "my-instance";
String appProfileId = "my-app-profile-id";
boolean isRefreshingChannel = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a small test for the default (unset) refreshingChannel as well and that it is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 2 tests for default and false

String endpoint = "some.other.host:123";
CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class);
WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class);
Expand All @@ -71,6 +72,7 @@ public void settingsAreNotLostTest() {
.setProjectId(projectId)
.setInstanceId(instanceId)
.setAppProfileId(appProfileId)
.setRefreshingChannel(isRefreshingChannel)
.setEndpoint(endpoint)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
Expand All @@ -81,6 +83,7 @@ public void settingsAreNotLostTest() {
projectId,
instanceId,
appProfileId,
isRefreshingChannel,
endpoint,
credentialsProvider,
watchdogProvider,
Expand All @@ -90,6 +93,7 @@ public void settingsAreNotLostTest() {
projectId,
instanceId,
appProfileId,
isRefreshingChannel,
endpoint,
credentialsProvider,
watchdogProvider,
Expand All @@ -99,6 +103,7 @@ public void settingsAreNotLostTest() {
projectId,
instanceId,
appProfileId,
isRefreshingChannel,
endpoint,
credentialsProvider,
watchdogProvider,
Expand All @@ -110,13 +115,15 @@ private void verifyBuilder(
String projectId,
String instanceId,
String appProfileId,
boolean isRefreshingChannel,
String endpoint,
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
Duration watchdogInterval) {
assertThat(builder.getProjectId()).isEqualTo(projectId);
assertThat(builder.getInstanceId()).isEqualTo(instanceId);
assertThat(builder.getAppProfileId()).isEqualTo(appProfileId);
assertThat(builder.isRefreshingChannel()).isEqualTo(isRefreshingChannel);
assertThat(builder.getEndpoint()).isEqualTo(endpoint);
assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
Expand All @@ -128,13 +135,15 @@ private void verifySettings(
String projectId,
String instanceId,
String appProfileId,
boolean isRefreshingChannel,
String endpoint,
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
Duration watchdogInterval) {
assertThat(settings.getProjectId()).isEqualTo(projectId);
assertThat(settings.getInstanceId()).isEqualTo(instanceId);
assertThat(settings.getAppProfileId()).isEqualTo(appProfileId);
assertThat(settings.isRefreshingChannel()).isEqualTo(isRefreshingChannel);
assertThat(settings.getEndpoint()).isEqualTo(endpoint);
assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
Expand Down Expand Up @@ -521,4 +530,31 @@ private void verifyRetrySettingAreSane(Set<Code> retryCodes, RetrySettings retry
assertThat(retrySettings.getRpcTimeoutMultiplier()).isAtLeast(1.0);
assertThat(retrySettings.getMaxRpcTimeout()).isGreaterThan(Duration.ZERO);
}

@Test
public void isRefreshingChannelDefaultValueTest() {
String dummyProjectId = "my-project";
String dummyInstanceId = "my-instance";
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(dummyProjectId)
.setInstanceId(dummyInstanceId);
assertThat(builder.isRefreshingChannel()).isFalse();
assertThat(builder.build().isRefreshingChannel()).isFalse();
assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse();
}

@Test
public void isRefreshingChannelFalseValueTest() {
String dummyProjectId = "my-project";
String dummyInstanceId = "my-instance";
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(dummyProjectId)
.setInstanceId(dummyInstanceId)
.setRefreshingChannel(false);
assertThat(builder.isRefreshingChannel()).isFalse();
assertThat(builder.build().isRefreshingChannel()).isFalse();
assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse();
}
}