Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add experimental DirectPath support #396

Merged
merged 2 commits into from Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions google-cloud-spanner/pom.xml
Expand Up @@ -15,8 +15,10 @@
</parent>
<properties>
<site.installationModule>google-cloud-spanner</site.installationModule>
<skipUTs>false</skipUTs>
</properties>


<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -49,6 +51,7 @@
<id>default-test</id>
<configuration>
<excludedGroups>com.google.cloud.spanner.TracerTest,com.google.cloud.spanner.IntegrationTest</excludedGroups>
<skipTests>${skipUTs}</skipTests>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see this being set to false. Do we need this configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Jeff, I added this configuration just for our internal test to be able to use -DskipUTs=true. Currently we are using mvn verify -am -pl google-cloud-spanner -B -Penable-integration-tests,spanner-directpath-it -DskipUTs=true -D... ... for our internal E2E DP tests, and meanwhile I don't want to change the behavior for other tests. Do you think this is the right way for approaching this purpose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might add a note somewhere so we don't inadvertently break this.

We might also want to add something like this to our shared setup.

</configuration>
</execution>
<execution>
Expand Down Expand Up @@ -363,5 +366,26 @@
</plugins>
</build>
</profile>
<profile>
<id>spanner-directpath-it</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<spanner.testenv.config.class>com.google.cloud.spanner.GceTestEnvConfig</spanner.testenv.config.class>
<spanner.testenv.instance>projects/directpath-prod-manual-testing/instances/spanner-testing</spanner.testenv.instance>
<spanner.gce.config.project_id>directpath-prod-manual-testing</spanner.gce.config.project_id>
<spanner.attempt_directpath>true</spanner.attempt_directpath>
<spanner.directpath_test_scenario>ipv4</spanner.directpath_test_scenario>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>3000</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Expand Up @@ -223,6 +223,9 @@ private void awaitTermination() throws InterruptedException {
private static final int DEFAULT_PERIOD_SECONDS = 10;
private static final int GRPC_KEEPALIVE_SECONDS = 2 * 60;

// TODO(weiranf): Remove this temporary endpoint once DirectPath goes to public beta.
private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
Expand Down Expand Up @@ -300,31 +303,37 @@ public GapicSpannerRpc(final SpannerOptions options) {
.build());
// First check if SpannerOptions provides a TransportChannerProvider. Create one
// with information gathered from SpannerOptions if none is provided
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(options.getChannelConfigurator())
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutor(executorProvider.getExecutor())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
.withEncoding(compressorName))
.setHeaderProvider(mergedHeaderProvider);

// TODO(weiranf): Set to true by default once DirectPath goes to public beta.
if (shouldAttemptDirectPath()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that currently we are mostly enabling features / configurations through environment variables (which should be standardised through all of our clients) or through the SpannerOptions class.

I imagine we could add an option like useDirectPath() in the SpannerOptions.Builder class. I guess that even if we do not use an environment variable as well (and go with the property), it would be nice to concentrated the configuration directly in the SpannerOptions class.

Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Thiago, thanks for the comment! So our original idea is, the DirectPath is a network side feature, it should be agnostic to the end users. Currently we add this shouldAttemptDirectPath temporarily so our test can start exercising DirectPath. Once DirectPath is going public, we will remove these temporary checker and set the client to use DirectPath by default. And correct me if I'm wrong, this SpannerOptions seems like a configurable option provided to end users, but in fact we don't actually want users to config DirectPath attempt, which should only be controlled by service owner/SREs via Access Control List. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks for the response.

A couple of questions on the approach:

  1. Aren't we enabling this to a few customers at first (before being the default)? If so, is the expectation that they would have to execute with the property set?
  2. Should we override the endpoint even when the user provides a custom channel provider? I don't know what is best here, but I imagine that it might be confusing if the user executes their program with the property set and a custom channel provider, but do not get the benefits of the direct path.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good questions! To answer them:
Actually we would need to remove these temporary checker and enable DirectPath with a client lib release before we engage real customers, because DirectPath should be fully controlled by the ACL config (i.e. initially only a few customers will get ALLOW from ACL check, all others will get DENY). So this property set should only be used by our tests.

This sandbox endpoint is only used for our testing purposes (as right now DirectPath-ipv4 is only available in certain cell in prod) Once we've done enough tests and DirectPath is fully ready, Spanner will make its default endpoint to be capable of handling DirectPath traffic. So by that time, we will use the same endpoint no matter the client is using DirectPath or Not.

defaultChannelProviderBuilder.setEndpoint(DIRECT_PATH_ENDPOINT).setAttemptDirectPath(true);
}

TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(),
InstantiatingGrpcChannelProvider.newBuilder()
.setChannelConfigurator(options.getChannelConfigurator())
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())
.setExecutor(executorProvider.getExecutor())

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
.withEncoding(compressorName))
.setHeaderProvider(mergedHeaderProvider)
.build());
options.getChannelProvider(), defaultChannelProviderBuilder.build());

CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);
Expand Down Expand Up @@ -415,6 +424,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
}
}

// TODO(weiranf): Remove this once DirectPath goes to public beta.
private static boolean shouldAttemptDirectPath() {
return Boolean.getBoolean("spanner.attempt_directpath");
}

private static void checkEmulatorConnection(
SpannerOptions options,
TransportChannelProvider channelProvider,
Expand Down
Expand Up @@ -26,11 +26,15 @@
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -41,6 +45,12 @@ public class GceTestEnvConfig implements TestEnvConfig {
public static final String GCE_CREDENTIALS_FILE = "spanner.gce.config.credentials_file";
public static final String GCE_STREAM_BROKEN_PROBABILITY =
"spanner.gce.config.stream_broken_probability";
public static final String ATTEMPT_DIRECT_PATH = "spanner.attempt_directpath";
public static final String DIRECT_PATH_TEST_SCENARIO = "spanner.directpath_test_scenario";

// IP address prefixes allocated for DirectPath backends.
public static final String DP_IPV6_PREFIX = "2001:4860:8040";
public static final String DP_IPV4_PREFIX = "34.126";
chingor13 marked this conversation as resolved.
Show resolved Hide resolved

private final SpannerOptions options;

Expand All @@ -51,6 +61,8 @@ public GceTestEnvConfig() {
double errorProbability =
Double.parseDouble(System.getProperty(GCE_STREAM_BROKEN_PROBABILITY, "0.0"));
checkState(errorProbability <= 1.0);
boolean attemptDirectPath = Boolean.getBoolean(ATTEMPT_DIRECT_PATH);
chingor13 marked this conversation as resolved.
Show resolved Hide resolved
String directPathTestScenario = System.getProperty(DIRECT_PATH_TEST_SCENARIO, "");
SpannerOptions.Builder builder =
SpannerOptions.newBuilder().setAutoThrottleAdministrativeRequests();
if (!projectId.isEmpty()) {
Expand All @@ -66,12 +78,14 @@ public GceTestEnvConfig() {
throw new RuntimeException(e);
}
}
options =
builder
.setInterceptorProvider(
SpannerInterceptorProvider.createDefault()
.with(new GrpcErrorInjector(errorProbability)))
.build();
SpannerInterceptorProvider interceptorProvider =
SpannerInterceptorProvider.createDefault().with(new GrpcErrorInjector(errorProbability));
if (attemptDirectPath) {
interceptorProvider =
interceptorProvider.with(new DirectPathAddressCheckInterceptor(directPathTestScenario));
}
builder.setInterceptorProvider(interceptorProvider);
options = builder.build();
}

@Override
Expand All @@ -87,6 +101,7 @@ public void tearDown() {}

/** Injects errors in streaming calls to simulate call restarts */
private static class GrpcErrorInjector implements ClientInterceptor {

private final double errorProbability;
private final Random random = new Random();

Expand Down Expand Up @@ -140,4 +155,64 @@ private boolean mayInjectError() {
return random.nextDouble() < errorProbability;
}
}

/**
* Captures the request attributes "Grpc.TRANSPORT_ATTR_REMOTE_ADDR" when connection is
* established and verifies if the remote address is a DirectPath address. This is only used for
* DirectPath testing. {@link ClientCall#getAttributes()}
*/
private static class DirectPathAddressCheckInterceptor implements ClientInterceptor {
private final String directPathTestScenario;

DirectPathAddressCheckInterceptor(String directPathTestScenario) {
this.directPathTestScenario = directPathTestScenario;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// Check peer IP after connection is established.
SocketAddress remoteAddr =
clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (!verifyRemoteAddress(remoteAddr)) {
throw new RuntimeException(
String.format(
"Synthetically aborting the current request because it did not adhere"
+ " to the test environment's requirement for DirectPath."
+ " Expected test for DirectPath %s scenario,"
+ " but RPC was destined for %s",
directPathTestScenario, remoteAddr.toString()));
}
super.onHeaders(headers);
}
},
headers);
}
};
}

private boolean verifyRemoteAddress(SocketAddress remoteAddr) {
if (remoteAddr instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) remoteAddr).getAddress();
String addr = inetAddress.getHostAddress();
if (directPathTestScenario.equals("ipv4")) {
// For ipv4-only VM, client should connect to ipv4 DirectPath addresses.
return addr.startsWith(DP_IPV4_PREFIX);
} else if (directPathTestScenario.equals("ipv6")) {
// For ipv6-enabled VM, client could connect to either ipv4 or ipv6 DirectPath addresses.
return addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX);
}
}
// For all other scenarios(e.g. fallback), we should allow both DirectPath and CFE addresses.
return true;
}
}
}