diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml
index b52389bf73..a4c4657ca6 100644
--- a/google-cloud-spanner/pom.xml
+++ b/google-cloud-spanner/pom.xml
@@ -15,8 +15,10 @@
google-cloud-spanner
+ false
+
@@ -49,6 +51,7 @@
default-test
com.google.cloud.spanner.TracerTest,com.google.cloud.spanner.IntegrationTest
+ ${skipUTs}
@@ -360,5 +363,26 @@
+
+ spanner-directpath-it
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+
+
+ com.google.cloud.spanner.GceTestEnvConfig
+ projects/directpath-prod-manual-testing/instances/spanner-testing
+ directpath-prod-manual-testing
+ true
+ ipv4
+
+ 3000
+
+
+
+
+
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index 43678ae4d2..152c8f0b34 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -230,6 +230,9 @@ private void awaitTermination() throws InterruptedException {
private static final int DEFAULT_PERIOD_SECONDS = 10;
private static final int GRPC_KEEPALIVE_SECONDS = 2 * 60;
+ // TODO(weiranf): Remove this temporary endpoint once DirectPath goes to public beta.
+ private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443";
+
private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
@@ -307,31 +310,37 @@ public GapicSpannerRpc(final SpannerOptions options) {
.build());
// First check if SpannerOptions provides a TransportChannerProvider. Create one
// with information gathered from SpannerOptions if none is provided
+ InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
+ InstantiatingGrpcChannelProvider.newBuilder()
+ .setChannelConfigurator(options.getChannelConfigurator())
+ .setEndpoint(options.getEndpoint())
+ .setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
+ .setMaxInboundMetadataSize(MAX_METADATA_SIZE)
+ .setPoolSize(options.getNumChannels())
+ .setExecutor(executorProvider.getExecutor())
+
+ // Set a keepalive time of 120 seconds to help long running
+ // commit GRPC calls succeed
+ .setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
+
+ // Then check if SpannerOptions provides an InterceptorProvider. Create a default
+ // SpannerInterceptorProvider if none is provided
+ .setInterceptorProvider(
+ SpannerInterceptorProvider.create(
+ MoreObjects.firstNonNull(
+ options.getInterceptorProvider(),
+ SpannerInterceptorProvider.createDefault()))
+ .withEncoding(compressorName))
+ .setHeaderProvider(mergedHeaderProvider);
+
+ // TODO(weiranf): Set to true by default once DirectPath goes to public beta.
+ if (shouldAttemptDirectPath()) {
+ defaultChannelProviderBuilder.setEndpoint(DIRECT_PATH_ENDPOINT).setAttemptDirectPath(true);
+ }
+
TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
- options.getChannelProvider(),
- InstantiatingGrpcChannelProvider.newBuilder()
- .setChannelConfigurator(options.getChannelConfigurator())
- .setEndpoint(options.getEndpoint())
- .setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
- .setMaxInboundMetadataSize(MAX_METADATA_SIZE)
- .setPoolSize(options.getNumChannels())
- .setExecutor(executorProvider.getExecutor())
-
- // Set a keepalive time of 120 seconds to help long running
- // commit GRPC calls succeed
- .setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
-
- // Then check if SpannerOptions provides an InterceptorProvider. Create a default
- // SpannerInterceptorProvider if none is provided
- .setInterceptorProvider(
- SpannerInterceptorProvider.create(
- MoreObjects.firstNonNull(
- options.getInterceptorProvider(),
- SpannerInterceptorProvider.createDefault()))
- .withEncoding(compressorName))
- .setHeaderProvider(mergedHeaderProvider)
- .build());
+ options.getChannelProvider(), defaultChannelProviderBuilder.build());
CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);
@@ -422,6 +431,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
}
}
+ // TODO(weiranf): Remove this once DirectPath goes to public beta.
+ private static boolean shouldAttemptDirectPath() {
+ return Boolean.getBoolean("spanner.attempt_directpath");
+ }
+
private static void checkEmulatorConnection(
SpannerOptions options,
TransportChannelProvider channelProvider,
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java
index e5c0959b58..8a6bf4052f 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java
@@ -26,11 +26,15 @@
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,6 +45,12 @@ public class GceTestEnvConfig implements TestEnvConfig {
public static final String GCE_CREDENTIALS_FILE = "spanner.gce.config.credentials_file";
public static final String GCE_STREAM_BROKEN_PROBABILITY =
"spanner.gce.config.stream_broken_probability";
+ public static final String ATTEMPT_DIRECT_PATH = "spanner.attempt_directpath";
+ public static final String DIRECT_PATH_TEST_SCENARIO = "spanner.directpath_test_scenario";
+
+ // IP address prefixes allocated for DirectPath backends.
+ public static final String DP_IPV6_PREFIX = "2001:4860:8040";
+ public static final String DP_IPV4_PREFIX = "34.126";
private final SpannerOptions options;
@@ -51,6 +61,8 @@ public GceTestEnvConfig() {
double errorProbability =
Double.parseDouble(System.getProperty(GCE_STREAM_BROKEN_PROBABILITY, "0.0"));
checkState(errorProbability <= 1.0);
+ boolean attemptDirectPath = Boolean.getBoolean(ATTEMPT_DIRECT_PATH);
+ String directPathTestScenario = System.getProperty(DIRECT_PATH_TEST_SCENARIO, "");
SpannerOptions.Builder builder =
SpannerOptions.newBuilder().setAutoThrottleAdministrativeRequests();
if (!projectId.isEmpty()) {
@@ -66,12 +78,14 @@ public GceTestEnvConfig() {
throw new RuntimeException(e);
}
}
- options =
- builder
- .setInterceptorProvider(
- SpannerInterceptorProvider.createDefault()
- .with(new GrpcErrorInjector(errorProbability)))
- .build();
+ SpannerInterceptorProvider interceptorProvider =
+ SpannerInterceptorProvider.createDefault().with(new GrpcErrorInjector(errorProbability));
+ if (attemptDirectPath) {
+ interceptorProvider =
+ interceptorProvider.with(new DirectPathAddressCheckInterceptor(directPathTestScenario));
+ }
+ builder.setInterceptorProvider(interceptorProvider);
+ options = builder.build();
}
@Override
@@ -87,6 +101,7 @@ public void tearDown() {}
/** Injects errors in streaming calls to simulate call restarts */
private static class GrpcErrorInjector implements ClientInterceptor {
+
private final double errorProbability;
private final Random random = new Random();
@@ -140,4 +155,64 @@ private boolean mayInjectError() {
return random.nextDouble() < errorProbability;
}
}
+
+ /**
+ * Captures the request attributes "Grpc.TRANSPORT_ATTR_REMOTE_ADDR" when connection is
+ * established and verifies if the remote address is a DirectPath address. This is only used for
+ * DirectPath testing. {@link ClientCall#getAttributes()}
+ */
+ private static class DirectPathAddressCheckInterceptor implements ClientInterceptor {
+ private final String directPathTestScenario;
+
+ DirectPathAddressCheckInterceptor(String directPathTestScenario) {
+ this.directPathTestScenario = directPathTestScenario;
+ }
+
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor method, CallOptions callOptions, Channel next) {
+ final ClientCall clientCall = next.newCall(method, callOptions);
+ return new SimpleForwardingClientCall(clientCall) {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ super.start(
+ new SimpleForwardingClientCallListener(responseListener) {
+ @Override
+ public void onHeaders(Metadata headers) {
+ // Check peer IP after connection is established.
+ SocketAddress remoteAddr =
+ clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ if (!verifyRemoteAddress(remoteAddr)) {
+ throw new RuntimeException(
+ String.format(
+ "Synthetically aborting the current request because it did not adhere"
+ + " to the test environment's requirement for DirectPath."
+ + " Expected test for DirectPath %s scenario,"
+ + " but RPC was destined for %s",
+ directPathTestScenario, remoteAddr.toString()));
+ }
+ super.onHeaders(headers);
+ }
+ },
+ headers);
+ }
+ };
+ }
+
+ private boolean verifyRemoteAddress(SocketAddress remoteAddr) {
+ if (remoteAddr instanceof InetSocketAddress) {
+ InetAddress inetAddress = ((InetSocketAddress) remoteAddr).getAddress();
+ String addr = inetAddress.getHostAddress();
+ if (directPathTestScenario.equals("ipv4")) {
+ // For ipv4-only VM, client should connect to ipv4 DirectPath addresses.
+ return addr.startsWith(DP_IPV4_PREFIX);
+ } else if (directPathTestScenario.equals("ipv6")) {
+ // For ipv6-enabled VM, client could connect to either ipv4 or ipv6 DirectPath addresses.
+ return addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX);
+ }
+ }
+ // For all other scenarios(e.g. fallback), we should allow both DirectPath and CFE addresses.
+ return true;
+ }
+ }
}