diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index c2ef87ebf..6c562e5d1 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -87,6 +87,10 @@ com.google.api.grpc proto-google-iam-v1 + + com.google.auth + google-auth-library-credentials + com.google.auth google-auth-library-oauth2-http diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index 437ebc653..8dd0fa6d9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.common.base.Strings; import io.grpc.ManagedChannelBuilder; +import java.util.List; import java.util.logging.Logger; import javax.annotation.Nonnull; @@ -185,11 +186,20 @@ public String getAppProfileId() { } /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ - @BetaApi("This API depends on experimental gRPC APIs") + @BetaApi("Channel priming is not currently stable and may change in the future") public boolean isRefreshingChannel() { return stubSettings.isRefreshingChannel(); } + /** + * Gets the table ids that will be used to send warmup requests when {@link + * #isRefreshingChannel()} is enabled. + */ + @BetaApi("Channel priming is not currently stable and may change in the future") + public List getPrimingTableIds() { + return stubSettings.getPrimedTableIds(); + } + /** Returns the underlying RPC settings. */ public EnhancedBigtableStubSettings getStubSettings() { return stubSettings; @@ -307,18 +317,40 @@ public CredentialsProvider getCredentialsProvider() { * connections, which causes the client to renegotiate the gRPC connection in the request path, * which causes periodic spikes in latency */ - @BetaApi("This API depends on experimental gRPC APIs") + @BetaApi("Channel priming is not currently stable and may change in the future") public Builder setRefreshingChannel(boolean isRefreshingChannel) { stubSettings.setRefreshingChannel(isRefreshingChannel); return this; } /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ - @BetaApi("This API depends on experimental gRPC APIs") + @BetaApi("Channel priming is not currently stable and may change in the future") public boolean isRefreshingChannel() { return stubSettings.isRefreshingChannel(); } + /** + * Configure the tables that can be used to prime a channel during a refresh. + * + *

These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a + * channel is refreshed, it will send a request to each table to warm up the serverside caches + * before admitting the new channel into the channel pool. + */ + @BetaApi("Channel priming is not currently stable and may change in the future") + public Builder setPrimingTableIds(String... tableIds) { + stubSettings.setPrimedTableIds(tableIds); + return this; + } + + /** + * Gets the table ids that will be used to send warmup requests when {@link + * #setRefreshingChannel(boolean)} is enabled. + */ + @BetaApi("Channel priming is not currently stable and may change in the future") + public List getPrimingTableIds() { + return stubSettings.getPrimedTableIds(); + } + /** * Returns the underlying settings for making RPC calls. The settings should be changed with * care. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java deleted file mode 100644 index e34ecd750..000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.internal; - -import com.google.api.core.BetaApi; -import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.ChannelPrimer; -import io.grpc.ConnectivityState; -import io.grpc.ManagedChannel; -import java.util.concurrent.TimeUnit; - -/** - * Establish a connection to the Cloud Bigtable service on managedChannel - * - *

This class is considered an internal implementation detail and not meant to be used by - * applications. - */ -@BetaApi("This API depends on gRPC experimental API") -@InternalApi -public final class RefreshChannel implements ChannelPrimer { - - /** - * primeChannel establishes a connection to Cloud Bigtable service. This typically take less than - * 1s. In case of service failure, an upper limit of 10s prevents primeChannel from looping - * forever. - */ - @Override - public void primeChannel(ManagedChannel managedChannel) { - for (int i = 0; i < 10; i++) { - ConnectivityState connectivityState = managedChannel.getState(true); - if (connectivityState == ConnectivityState.READY) { - break; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - break; - } - } - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java new file mode 100644 index 000000000..15be8f730 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -0,0 +1,168 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS; + +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.ChannelPrimer; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.auth.Credentials; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.threeten.bp.Duration; + +/** + * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to + * the active {@link com.google.api.gax.grpc.ChannelPool}. + * + *

This implementation is subject to change in the future, but currently it will prime the + * channel by sending a ReadRow request for a hardcoded, non-existent row key. + */ +@BetaApi("Channel priming is not currently stable and might change in the future") +class BigtableChannelPrimer implements ChannelPrimer { + private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); + + static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row"); + private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); + + private final EnhancedBigtableStubSettings settingsTemplate; + private final List tableIds; + + static BigtableChannelPrimer create( + Credentials credentials, + String projectId, + String instanceId, + String appProfileId, + List tableIds) { + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(projectId) + .setInstanceId(instanceId) + .setAppProfileId(appProfileId) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .setExecutorProvider( + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()); + + // Disable retries for priming request + builder + .readRowSettings() + .setRetrySettings( + builder + .readRowSettings() + .getRetrySettings() + .toBuilder() + .setMaxAttempts(1) + .setJittered(false) + .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setTotalTimeout(PRIME_REQUEST_TIMEOUT) + .build()); + return new BigtableChannelPrimer(builder.build(), tableIds); + } + + private BigtableChannelPrimer( + EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); + this.settingsTemplate = settingsTemplate; + this.tableIds = ImmutableList.copyOf(tableIds); + } + + @Override + public void primeChannel(ManagedChannel managedChannel) { + try { + primeChannelUnsafe(managedChannel); + } catch (IOException | RuntimeException e) { + LOG.warning( + String.format("Unexpected error while trying to prime a channel: %s", e.getMessage())); + } + } + + private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException { + if (tableIds.isEmpty()) { + waitForChannelReady(managedChannel); + } else { + sendPrimeRequests(managedChannel); + } + } + + private void waitForChannelReady(ManagedChannel managedChannel) { + for (int i = 0; i < 30; i++) { + ConnectivityState connectivityState = managedChannel.getState(true); + if (connectivityState == ConnectivityState.READY) { + break; + } + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } + + private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { + // Wrap the channel in a temporary stub + EnhancedBigtableStubSettings primingSettings = + settingsTemplate + .toBuilder() + .setTransportChannelProvider( + FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel))) + .build(); + + try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { + Map> primeFutures = new HashMap<>(); + + // Prime all of the table ids in parallel + for (String tableId : tableIds) { + ApiFuture f = + stub.readRowCallable() + .futureCall(Query.create(tableId).rowKey(PRIMING_ROW_KEY).filter(FILTERS.block())); + + primeFutures.put(tableId, f); + } + + // Wait for all of the prime requests to complete. + for (Map.Entry> entry : primeFutures.entrySet()) { + try { + entry.getValue().get(); + } catch (Throwable e) { + if (e instanceof ExecutionException) { + e = e.getCause(); + } + LOG.warning( + String.format( + "Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); + } + } + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 8d9d2fc70..d729d6244 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -20,10 +20,12 @@ import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcRawCallableFactory; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetryingExecutorWithContext; @@ -38,6 +40,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.api.gax.tracing.TracedUnaryCallable; +import com.google.auth.Credentials; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.CheckAndMutateRowRequest; import com.google.bigtable.v2.CheckAndMutateRowResponse; @@ -120,65 +123,93 @@ public class EnhancedBigtableStub implements AutoCloseable { public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { - ClientContext clientContext = ClientContext.create(settings); + settings = finalizeSettings(settings, Tags.getTagger(), Stats.getStatsRecorder()); - return new EnhancedBigtableStub( - settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder()); + return new EnhancedBigtableStub(settings, ClientContext.create(settings)); } - @InternalApi("Visible for testing") - public EnhancedBigtableStub( - EnhancedBigtableStubSettings settings, - ClientContext clientContext, - Tagger tagger, - StatsRecorder statsRecorder) { - this.settings = settings; + public static EnhancedBigtableStubSettings finalizeSettings( + EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats) + throws IOException { + EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); + + // TODO: this implementation is on the cusp of unwieldy, if we end up adding more features + // consider splitting it up by feature. + + // Inject channel priming + if (settings.isRefreshingChannel()) { + // Fix the credentials so that they can be shared + Credentials credentials = null; + if (settings.getCredentialsProvider() != null) { + credentials = settings.getCredentialsProvider().getCredentials(); + } + builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + // Inject the primer + InstantiatingGrpcChannelProvider transportProvider = + (InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider(); + + builder.setTransportChannelProvider( + transportProvider + .toBuilder() + .setChannelPrimer( + BigtableChannelPrimer.create( + credentials, + settings.getProjectId(), + settings.getInstanceId(), + settings.getAppProfileId(), + settings.getPrimedTableIds())) + .build()); + } - this.clientContext = - clientContext - .toBuilder() - .setTracerFactory( - new CompositeTracerFactory( - ImmutableList.of( - // Add OpenCensus Tracing - new OpencensusTracerFactory( - ImmutableMap.builder() - // Annotate traces with the same tags as metrics - .put( - RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), - settings.getProjectId()) - .put( - RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), - settings.getInstanceId()) - .put( - RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), - settings.getAppProfileId()) - // Also annotate traces with library versions - .put("gax", GaxGrpcProperties.getGaxGrpcVersion()) - .put("grpc", GaxGrpcProperties.getGrpcVersion()) - .put( - "gapic", - GaxProperties.getLibraryVersion( - EnhancedBigtableStubSettings.class)) - .build()), - // Add OpenCensus Metrics - MetricsTracerFactory.create( - tagger, - statsRecorder, - ImmutableMap.builder() - .put( - RpcMeasureConstants.BIGTABLE_PROJECT_ID, - TagValue.create(settings.getProjectId())) - .put( - RpcMeasureConstants.BIGTABLE_INSTANCE_ID, - TagValue.create(settings.getInstanceId())) - .put( - RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, - TagValue.create(settings.getAppProfileId())) - .build()), - // Add user configured tracer - clientContext.getTracerFactory()))) - .build(); + // Inject Opencensus instrumentation + builder.setTracerFactory( + new CompositeTracerFactory( + ImmutableList.of( + // Add OpenCensus Tracing + new OpencensusTracerFactory( + ImmutableMap.builder() + // Annotate traces with the same tags as metrics + .put( + RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), + settings.getProjectId()) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), + settings.getInstanceId()) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), + settings.getAppProfileId()) + // Also annotate traces with library versions + .put("gax", GaxGrpcProperties.getGaxGrpcVersion()) + .put("grpc", GaxGrpcProperties.getGrpcVersion()) + .put( + "gapic", + GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class)) + .build()), + // Add OpenCensus Metrics + MetricsTracerFactory.create( + tagger, + stats, + ImmutableMap.builder() + .put( + RpcMeasureConstants.BIGTABLE_PROJECT_ID, + TagValue.create(settings.getProjectId())) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID, + TagValue.create(settings.getInstanceId())) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, + TagValue.create(settings.getAppProfileId())) + .build()), + // Add user configured tracer + settings.getTracerFactory()))); + + return builder.build(); + } + + public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext clientContext) { + this.settings = settings; + this.clientContext = clientContext; this.requestContext = RequestContext.create( settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 1906228a3..d843265d1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -28,7 +28,6 @@ import com.google.api.gax.rpc.StubSettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; -import com.google.cloud.bigtable.data.v2.internal.RefreshChannel; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; @@ -150,6 +149,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -188,6 +188,7 @@ private EnhancedBigtableStubSettings(Builder builder) { instanceId = builder.instanceId; appProfileId = builder.appProfileId; isRefreshingChannel = builder.isRefreshingChannel; + primedTableIds = builder.primedTableIds; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -226,6 +227,12 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } + /** Gets the tables that will be primed during a channel refresh. */ + @BetaApi("Channel priming is not currently stable and might change in the future") + public List getPrimedTableIds() { + return primedTableIds; + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -483,6 +490,7 @@ public static class Builder extends StubSettings.Builder primedTableIds; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -505,6 +513,7 @@ public static class Builder extends StubSettings.BuilderWhen enabled, this will wait for the connection to complete the SSL handshake. The effect + * can be enhanced by configuring table ids that can be used warm serverside caches using {@link + * #setPrimedTableIds(String...)}. * * @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel */ @@ -709,12 +723,25 @@ public Builder setRefreshingChannel(boolean isRefreshingChannel) { return this; } + /** Configures which tables will be primed when a connection is created. */ + @BetaApi("Channel priming is not currently stable and might change in the future") + public Builder setPrimedTableIds(String... tableIds) { + this.primedTableIds = ImmutableList.copyOf(tableIds); + return this; + } + /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ @BetaApi("This API depends on experimental gRPC APIs") public boolean isRefreshingChannel() { return isRefreshingChannel; } + /** Gets the tables that will be primed during a channel refresh. */ + @BetaApi("Channel priming is not currently stable and might change in the future") + public List getPrimedTableIds() { + return primedTableIds; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -760,17 +787,10 @@ public EnhancedBigtableStubSettings build() { Preconditions.checkState(projectId != null, "Project id must be set"); Preconditions.checkState(instanceId != null, "Instance id must be set"); - // Set ChannelPrimer on TransportChannelProvider so channels will gracefully refresh - // connections to Cloud Bigtable service if (isRefreshingChannel) { Preconditions.checkArgument( getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider, "refreshingChannel only works with InstantiatingGrpcChannelProviders"); - InstantiatingGrpcChannelProvider.Builder channelBuilder = - ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()) - .toBuilder() - .setChannelPrimer(new RefreshChannel()); - setTransportChannelProvider(channelBuilder.build()); } return new EnhancedBigtableStubSettings(this); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java deleted file mode 100644 index c41fa4d2a..000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.internal; - -import io.grpc.ConnectivityState; -import io.grpc.ManagedChannel; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -@RunWith(JUnit4.class) -public class RefreshChannelTest { - // RefreshChannel should establish connection to the server through managedChannel.getState(true) - @Test - public void testGetStateIsCalled() { - RefreshChannel refreshChannel = new RefreshChannel(); - ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class); - - Mockito.doReturn(ConnectivityState.READY).when(managedChannel).getState(true); - - refreshChannel.primeChannel(managedChannel); - Mockito.verify(managedChannel, Mockito.atLeastOnce()).getState(true); - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java new file mode 100644 index 000000000..42d13a7ab --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -0,0 +1,234 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFunction; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.RowFilter; +import com.google.bigtable.v2.RowSet; +import com.google.common.collect.ImmutableList; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.logging.Handler; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.ThrowsException; + +@RunWith(JUnit4.class) +public class BigtableChannelPrimerTest { + private static final String TOKEN_VALUE = "fake-token"; + + int port; + Server server; + FakeService fakeService; + MetadataInterceptor metadataInterceptor; + BigtableChannelPrimer primer; + ManagedChannel channel; + private LogHandler logHandler; + + @Before + public void setup() throws IOException { + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } catch (IOException e) { + e.printStackTrace(); + } + + fakeService = new FakeService(); + metadataInterceptor = new MetadataInterceptor(); + server = + ServerBuilder.forPort(port).intercept(metadataInterceptor).addService(fakeService).build(); + server.start(); + + primer = + BigtableChannelPrimer.create( + OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), + "fake-project", + "fake-instance", + "fake-app-profile", + ImmutableList.of("table1", "table2")); + + channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); + + logHandler = new LogHandler(); + Logger.getLogger(BigtableChannelPrimer.class.toString()).addHandler(logHandler); + } + + @After + public void teardown() { + Logger.getLogger(BigtableChannelPrimer.class.toString()).removeHandler(logHandler); + channel.shutdown(); + server.shutdown(); + } + + @Test + public void testCredentials() { + primer.primeChannel(channel); + + for (Metadata metadata : metadataInterceptor.metadataList) { + assertThat(metadata.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))) + .isEqualTo("Bearer " + TOKEN_VALUE); + } + channel.shutdown(); + } + + @Test + public void testRequests() { + final Queue requests = new ConcurrentLinkedQueue<>(); + + fakeService.readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest req) { + requests.add(req); + return ReadRowsResponse.getDefaultInstance(); + } + }; + primer.primeChannel(channel); + + assertThat(requests) + .containsExactly( + ReadRowsRequest.newBuilder() + .setTableName("projects/fake-project/instances/fake-instance/tables/table1") + .setAppProfileId("fake-app-profile") + .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) + .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) + .setRowsLimit(1) + .build(), + ReadRowsRequest.newBuilder() + .setTableName("projects/fake-project/instances/fake-instance/tables/table2") + .setAppProfileId("fake-app-profile") + .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) + .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) + .setRowsLimit(1) + .build()); + } + + @Test + public void testErrorsAreLogged() { + fakeService.readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest req) { + throw new StatusRuntimeException(Status.FAILED_PRECONDITION); + } + }; + primer.primeChannel(channel); + + assertThat(logHandler.logs).hasSize(2); + for (LogRecord log : logHandler.logs) { + assertThat(log.getMessage()).contains("FAILED_PRECONDITION"); + } + } + + @Test + public void testErrorsAreLoggedForBasic() { + BigtableChannelPrimer basicPrimer = + BigtableChannelPrimer.create( + OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), + "fake-project", + "fake-instance", + "fake-app-profile", + ImmutableList.of()); + + ManagedChannel channel = + Mockito.mock( + ManagedChannel.class, new ThrowsException(new UnsupportedOperationException())); + primer.primeChannel(channel); + + assertThat(logHandler.logs).hasSize(1); + for (LogRecord log : logHandler.logs) { + assertThat(log.getMessage()).contains("Unexpected"); + } + } + + private static class MetadataInterceptor implements ServerInterceptor { + ConcurrentLinkedQueue metadataList = new ConcurrentLinkedQueue<>(); + + @Override + public Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + metadataList.add(metadata); + + return serverCallHandler.startCall(serverCall, metadata); + } + } + + static class FakeService extends BigtableImplBase { + private ApiFunction readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { + return ReadRowsResponse.getDefaultInstance(); + } + }; + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + + try { + responseObserver.onNext(readRowsCallback.apply(request)); + responseObserver.onCompleted(); + } catch (RuntimeException e) { + responseObserver.onError(e); + } + } + } + + private static class LogHandler extends Handler { + private ConcurrentLinkedQueue logs = new ConcurrentLinkedQueue<>(); + + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index be2d9c2a0..b823930fb 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -18,14 +18,13 @@ import static com.google.common.truth.Truth.assertThat; import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.testing.InProcessServer; -import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.admin.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; import com.google.cloud.bigtable.data.v2.models.Query; @@ -34,8 +33,11 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; +import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.net.ServerSocket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -49,37 +51,40 @@ public class EnhancedBigtableStubTest { private static final String PROJECT_ID = "fake-project"; private static final String INSTANCE_ID = "fake-instance"; - private static final String FAKE_HOST_NAME = "fake-stub-host:123"; private static final String TABLE_NAME = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); private static final String APP_PROFILE_ID = "app-profile-id"; - private InProcessServer server; + private Server server; private FakeDataService fakeDataService; + private EnhancedBigtableStubSettings defaultSettings; private EnhancedBigtableStub enhancedBigtableStub; @Before public void setUp() throws IOException, IllegalAccessException, InstantiationException { + int port; + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } fakeDataService = new FakeDataService(); - server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME); + server = ServerBuilder.forPort(port).addService(fakeDataService).build(); server.start(); - EnhancedBigtableStubSettings enhancedBigtableStubSettings = - EnhancedBigtableStubSettings.newBuilder() + defaultSettings = + BigtableDataSettings.newBuilderForEmulator(port) .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) .setAppProfileId(APP_PROFILE_ID) .setCredentialsProvider(NoCredentialsProvider.create()) - .setEndpoint(FAKE_HOST_NAME) - .setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME)) - .build(); + .build() + .getStubSettings(); - enhancedBigtableStub = EnhancedBigtableStub.create(enhancedBigtableStubSettings); + enhancedBigtableStub = EnhancedBigtableStub.create(defaultSettings); } @After public void tearDown() { - server.stop(); + server.shutdown(); } @Test @@ -117,6 +122,21 @@ public void testCreateReadRowsRawCallable() throws InterruptedException { assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2); } + @Test + public void testChannelPrimerConfigured() throws IOException { + EnhancedBigtableStubSettings settings = + defaultSettings + .toBuilder() + .setRefreshingChannel(true) + .setPrimedTableIds("table1", "table2") + .build(); + + try (EnhancedBigtableStub ignored = EnhancedBigtableStub.create(settings)) { + // priming will issue a request per table on startup + assertThat(fakeDataService.requests).hasSize(2); + } + } + private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 9fd78c5cd..49ebae81f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -120,14 +120,11 @@ public void setUp() throws Exception { .setInstanceId(INSTANCE_ID) .setAppProfileId(APP_PROFILE_ID) .build(); - EnhancedBigtableStubSettings stubSettings = settings.getStubSettings(); - - stub = - new EnhancedBigtableStub( - stubSettings, - ClientContext.create(stubSettings), - Tags.getTagger(), - localStats.getStatsRecorder()); + EnhancedBigtableStubSettings stubSettings = + EnhancedBigtableStub.finalizeSettings( + settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); + + stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); } @After