From 3d549ed3a14728aba43a1df3a62922c1b52bce61 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Mon, 30 Nov 2020 22:22:39 +0000 Subject: [PATCH 1/2] chore: add DirectPath fallback integration test --- google-cloud-spanner/pom.xml | 10 + .../spanner/it/ITDirectPathFallback.java | 304 ++++++++++++++++++ 2 files changed, 314 insertions(+) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectPathFallback.java diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index d77fe642bc..3e61c5d5ef 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -142,6 +142,16 @@ io.grpc grpc-protobuf + + io.grpc + grpc-alts + runtime + + io.grpc grpc-stub diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectPathFallback.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectPathFallback.java new file mode 100644 index 0000000000..746214e7f2 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectPathFallback.java @@ -0,0 +1,304 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import static com.google.common.truth.Truth.assertWithMessage; +import static com.google.common.truth.TruthJUnit.assume; + +import com.google.api.core.ApiFunction; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.auth.oauth2.ComputeEngineCredentials; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.spanner.testing.RemoteSpannerHelper; +import com.google.common.base.Stopwatch; +import io.grpc.ManagedChannelBuilder; +import io.grpc.alts.ComputeEngineChannelBuilder; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.channel.ChannelDuplexHandler; +import io.grpc.netty.shaded.io.netty.channel.ChannelFactory; +import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext; +import io.grpc.netty.shaded.io.netty.channel.ChannelPromise; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel; +import io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test DirectPath fallback behavior by injecting a ChannelHandler into the netty stack that will + * disrupt IPv6 communications. + * + *

WARNING: this test can only be run on a GCE VM and will explicitly ignore + * GOOGLE_APPLICATION_CREDENTIALS and use the service account associated with the VM. + */ +@Category(ParallelIntegrationTest.class) +@RunWith(JUnit4.class) +public class ITDirectPathFallback { + // A threshold of completed read calls to observe to ascertain IPv6 is working. + // This was determined experimentally to account for both gRPC-LB RPCs and Bigtable api RPCs. + private static final int MIN_COMPLETE_READ_CALLS = 40; + private static final int NUM_RPCS_TO_SEND = 20; + + // IP address prefixes allocated for DirectPath backends. + private static final String DP_IPV6_PREFIX = "2001:4860:8040"; + private static final String DP_IPV4_PREFIX = "34.126"; + + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); + + private AtomicBoolean blackholeDpAddr = new AtomicBoolean(); + private AtomicInteger numBlocked = new AtomicInteger(); + private AtomicInteger numDpAddrRead = new AtomicInteger(); + private boolean isDpAddr; + + private ChannelFactory channelFactory; + private EventLoopGroup eventLoopGroup; + private RemoteSpannerHelper testHelper; + + private static final String TABLE_NAME = "TestTable"; + private static final List ALL_COLUMNS = Arrays.asList("Key", "StringValue"); + private static Database db; + private static DatabaseClient client; + + // TODO(mohanli): Remove this temporary endpoint once DirectPath goes to public beta. + private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443"; + private static final String ATTEMPT_DIRECT_PATH = "spanner.attempt_directpath"; + + public ITDirectPathFallback() { + // Create a transport channel provider that can intercept ipv6 packets. + channelFactory = new MyChannelFactory(); + eventLoopGroup = new NioEventLoopGroup(); + } + + @Before + public void setup() throws IOException, Throwable { + assume() + .withMessage("DirectPath integration tests can only run against DirectPathEnv") + .that(Boolean.getBoolean(ATTEMPT_DIRECT_PATH)) + .isTrue(); + // Get default spanner options for Ingetration test + SpannerOptions.Builder builder = env.getTestHelper().getOptions().toBuilder(); + // Set instrumented transport provider + builder.setChannelProvider( + InstantiatingGrpcChannelProvider.newBuilder() + .setAttemptDirectPath(true) + .setEndpoint(DIRECT_PATH_ENDPOINT) + .setPoolSize(1) + .setChannelConfigurator( + new ApiFunction() { + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder builder) { + injectNettyChannelHandler(builder); + // Fail fast when blackhole is active + builder.keepAliveTime(1, TimeUnit.SECONDS); + builder.keepAliveTimeout(1, TimeUnit.SECONDS); + return builder; + } + }) + .build()); + // Forcefully ignore GOOGLE_APPLICATION_CREDENTIALS + builder.setCredentials( + FixedCredentialsProvider.create(ComputeEngineCredentials.create()).getCredentials()); + + // Create a new testHelper with the instrumented transport provider + testHelper = RemoteSpannerHelper.create(builder.build(), env.getTestHelper().getInstanceId()); + + db = + testHelper.createTestDatabase( + "CREATE TABLE TestTable (" + + " Key STRING(MAX) NOT NULL," + + " StringValue STRING(MAX)," + + ") PRIMARY KEY (Key)"); + client = testHelper.getDatabaseClient(db); + List mutations = new ArrayList<>(); + for (int i = 0; i < 3; ++i) { + mutations.add( + Mutation.newInsertOrUpdateBuilder(TABLE_NAME) + .set("Key") + .to("k" + i) + .set("StringValue") + .to("v" + i) + .build()); + } + client.write(mutations); + } + + @After + public void teardown() { + if (testHelper != null) { + testHelper.cleanUp(); + testHelper.getClient().close(); + } + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully(); + } + } + + @Test + public void testFallback() throws InterruptedException, TimeoutException { + // Precondition: wait for DirectPath to connect + assertWithMessage("Failed to observe RPCs over DirectPath").that(exerciseDirectPath()).isTrue(); + + // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. + blackholeDpAddr.set(true); + + // Send a request, which should be routed over IPv4 and CFE. + client.singleUse(TimestampBound.strong()).readRow(TABLE_NAME, Key.of("k0"), ALL_COLUMNS); + + // Verify that the above check was meaningful, by verifying that the blackhole actually dropped + // packets. + assertWithMessage("Failed to detect any IPv6 traffic in blackhole") + .that(numBlocked.get()) + .isGreaterThan(0); + + // Make sure that the client will start reading from IPv6 again by sending new requests and + // checking the injected IPv6 counter has been updated. + blackholeDpAddr.set(false); + + assertWithMessage("Failed to upgrade back to DirectPath").that(exerciseDirectPath()).isTrue(); + } + + private boolean exerciseDirectPath() throws InterruptedException, TimeoutException { + Stopwatch stopwatch = Stopwatch.createStarted(); + numDpAddrRead.set(0); + + boolean seenEnough = false; + + while (!seenEnough && stopwatch.elapsed(TimeUnit.MINUTES) < 2) { + for (int i = 0; i < NUM_RPCS_TO_SEND; i++) { + client.singleUse(TimestampBound.strong()).readRow(TABLE_NAME, Key.of("k0"), ALL_COLUMNS); + } + Thread.sleep(100); + seenEnough = numDpAddrRead.get() >= MIN_COMPLETE_READ_CALLS; + } + return seenEnough; + } + + /** + * This is a giant hack to enable testing DirectPath CFE fallback. + * + *

It unwraps the {@link ComputeEngineChannelBuilder} to inject a NettyChannelHandler to signal + * IPv6 packet loss. + */ + private void injectNettyChannelHandler(ManagedChannelBuilder channelBuilder) { + try { + // Extract the delegate NettyChannelBuilder using reflection + Field delegateField = ComputeEngineChannelBuilder.class.getDeclaredField("delegate"); + delegateField.setAccessible(true); + + ComputeEngineChannelBuilder gceChannelBuilder = + ((ComputeEngineChannelBuilder) channelBuilder); + Object delegateChannelBuilder = delegateField.get(gceChannelBuilder); + + NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) delegateChannelBuilder; + nettyChannelBuilder.channelFactory(channelFactory); + nettyChannelBuilder.eventLoopGroup(eventLoopGroup); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Failed to inject the netty ChannelHandler", e); + } + } + + /** @see com.google.cloud.bigtable.data.v2.it.DirectPathFallbackIT.MyChannelHandler */ + private class MyChannelFactory implements ChannelFactory { + @Override + public NioSocketChannel newChannel() { + NioSocketChannel channel = new NioSocketChannel(); + channel.pipeline().addLast(new MyChannelHandler()); + + return channel; + } + } + + /** + * A netty {@link io.grpc.netty.shaded.io.netty.channel.ChannelHandler} that can be instructed to + * make IPv6 packets disappear + */ + private class MyChannelHandler extends ChannelDuplexHandler { + @Override + public void connect( + ChannelHandlerContext ctx, + SocketAddress remoteAddress, + SocketAddress localAddress, + ChannelPromise promise) + throws Exception { + + if (remoteAddress instanceof InetSocketAddress) { + InetAddress inetAddress = ((InetSocketAddress) remoteAddress).getAddress(); + String addr = inetAddress.getHostAddress(); + isDpAddr = addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX); + } + + if (!(isDpAddr && blackholeDpAddr.get())) { + super.connect(ctx, remoteAddress, localAddress, promise); + } else { + // Fail the connection fast + promise.setFailure(new IOException("fake error")); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + boolean dropCall = isDpAddr && blackholeDpAddr.get(); + if (dropCall) { + // Don't notify the next handler and increment counter + numBlocked.incrementAndGet(); + ReferenceCountUtil.release(msg); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + boolean dropCall = isDpAddr && blackholeDpAddr.get(); + if (dropCall) { + // Don't notify the next handler and increment counter + numBlocked.incrementAndGet(); + } else { + if (isDpAddr) { + numDpAddrRead.incrementAndGet(); + } + super.channelReadComplete(ctx); + } + } + } +} From a075cff6bbb93a85b8823119058ce5f25b8b188d Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Tue, 15 Dec 2020 03:12:33 +0000 Subject: [PATCH 2/2] feat: include User agent --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 4d7d836dea..d1455fcd6b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -35,6 +35,7 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.OperationCallable; @@ -70,6 +71,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.iam.v1.GetIamPolicyRequest; @@ -232,6 +234,8 @@ private void awaitTermination() throws InterruptedException { private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60; private static final int DEFAULT_PERIOD_SECONDS = 10; private static final int GRPC_KEEPALIVE_SECONDS = 2 * 60; + private static final String USER_AGENT_KEY = "user-agent"; + private static final String CLIENT_LIBRARY_LANGUAGE = "spanner-java"; // TODO(weiranf): Remove this temporary endpoint once DirectPath goes to public beta. private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443"; @@ -297,9 +301,20 @@ public GapicSpannerRpc(final SpannerOptions options) { .build(); HeaderProvider mergedHeaderProvider = options.getMergedHeaderProvider(internalHeaderProvider); + Map headersWithUserAgent = + ImmutableMap.builder() + .put( + USER_AGENT_KEY, + CLIENT_LIBRARY_LANGUAGE + + "/" + + GaxProperties.getLibraryVersion(GapicSpannerRpc.class)) + .putAll(mergedHeaderProvider.getHeaders()) + .build(); + final HeaderProvider headerProviderWithUserAgent = + FixedHeaderProvider.create(headersWithUserAgent); this.metadataProvider = SpannerMetadataProvider.create( - mergedHeaderProvider.getHeaders(), + headerProviderWithUserAgent.getHeaders(), internalHeaderProviderBuilder.getResourceHeaderKey()); this.callCredentialsProvider = options.getCallCredentialsProvider(); this.compressorName = options.getCompressorName(); @@ -338,7 +353,7 @@ public GapicSpannerRpc(final SpannerOptions options) { options.getInterceptorProvider(), SpannerInterceptorProvider.createDefault())) .withEncoding(compressorName)) - .setHeaderProvider(mergedHeaderProvider); + .setHeaderProvider(headerProviderWithUserAgent); // TODO(weiranf): Set to true by default once DirectPath goes to public beta. if (shouldAttemptDirectPath()) { @@ -478,8 +493,10 @@ private static void checkEmulatorConnection( throw SpannerExceptionFactory.newSpannerException( ErrorCode.UNAVAILABLE, String.format( - "The environment variable SPANNER_EMULATOR_HOST has been set to %s, but no running emulator could be found at that address.\n" - + "Did you forget to start the emulator, or to unset the environment variable?", + "The environment variable SPANNER_EMULATOR_HOST has been set to %s, but no running" + + " emulator could be found at that address.\n" + + "Did you forget to start the emulator, or to unset the environment" + + " variable?", emulatorHost)); } }