Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add gRPC-GCP channel pool as an option #1227

Merged
merged 18 commits into from Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-spanner/pom.xml
Expand Up @@ -119,6 +119,12 @@
</build>

<dependencies>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.grpc.gcp.GcpManagedChannelOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
Expand Down Expand Up @@ -103,6 +104,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final InstanceAdminStubSettings instanceAdminStubSettings;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final Duration partitionedDmlTimeout;
private final boolean useGrpcGcpExtension;
private final GcpManagedChannelOptions grpcGcpOptions;
private final boolean autoThrottleAdministrativeRequests;
private final boolean trackTransactionStarter;
/**
Expand Down Expand Up @@ -557,6 +560,8 @@ private SpannerOptions(Builder builder) {
throw SpannerExceptionFactory.newSpannerException(e);
}
partitionedDmlTimeout = builder.partitionedDmlTimeout;
useGrpcGcpExtension = builder.useGrpcGcpExtension;
grpcGcpOptions = builder.grpcGcpOptions;
autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
trackTransactionStarter = builder.trackTransactionStarter;
defaultQueryOptions = builder.defaultQueryOptions;
Expand Down Expand Up @@ -636,6 +641,8 @@ public static class Builder
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
DatabaseAdminStubSettings.newBuilder();
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
private boolean useGrpcGcpExtension = false;
private GcpManagedChannelOptions grpcGcpOptions;
private boolean autoThrottleAdministrativeRequests = false;
private boolean trackTransactionStarter = false;
private Map<DatabaseId, QueryOptions> defaultQueryOptions = new HashMap<>();
Expand Down Expand Up @@ -683,6 +690,8 @@ private Builder() {
this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder();
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
this.useGrpcGcpExtension = options.useGrpcGcpExtension;
this.grpcGcpOptions = options.grpcGcpOptions;
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.trackTransactionStarter = options.trackTransactionStarter;
this.defaultQueryOptions = options.defaultQueryOptions;
Expand Down Expand Up @@ -999,6 +1008,26 @@ public Builder setHost(String host) {
return this;
}

/**
* Sets the preference for gRPC-GCP extension (default: false). When enabled the gRPC-GCP
* channel pool will be used.
*/
public Builder setUseGrpcGcpExtension(boolean useGrpcGcpExtension) {
nimf marked this conversation as resolved.
Show resolved Hide resolved
this.useGrpcGcpExtension = useGrpcGcpExtension;
return this;
}

/**
* Sets the options for gRPC-GCP extension. The metric registry and default Spanner metric
* labels will be added automatically.
*
* <p>Note that gRPC-GCP extension must be enabled first with {@code setUseGrpcGcpExtension}.
*/
public Builder setGrpcGcpOptions(GcpManagedChannelOptions options) {
this.grpcGcpOptions = options;
return this;
}

/**
* Sets the host of an emulator to use. By default the value is read from an environment
* variable. If the environment variable is not set, this will be <code>null</code>.
Expand Down Expand Up @@ -1099,6 +1128,14 @@ public Duration getPartitionedDmlTimeout() {
return partitionedDmlTimeout;
}

public boolean isUseGrpcGcpExtension() {
return useGrpcGcpExtension;
}

public GcpManagedChannelOptions getGrpcGcpOptions() {
return grpcGcpOptions;
}

public boolean isAutoThrottleAdministrativeRequests() {
return autoThrottleAdministrativeRequests;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
Expand Down Expand Up @@ -82,6 +83,9 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.grpc.gcp.GcpManagedChannelBuilder;
import com.google.grpc.gcp.GcpManagedChannelOptions;
import com.google.grpc.gcp.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.iam.v1.GetIamPolicyRequest;
import com.google.iam.v1.Policy;
import com.google.iam.v1.SetIamPolicyRequest;
Expand Down Expand Up @@ -156,8 +160,13 @@
import com.google.spanner.v1.Transaction;
import io.grpc.CallCredentials;
import io.grpc.Context;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.opencensus.metrics.LabelKey;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.Metrics;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -248,6 +257,7 @@ private void awaitTermination() throws InterruptedException {
private static final String CLIENT_LIBRARY_LANGUAGE = "spanner-java";
public static final String DEFAULT_USER_AGENT =
CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class);
private static final String API_FILE = "grpc-gcp-apiconfig.json";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
Expand Down Expand Up @@ -360,6 +370,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
// whether the attempt is allowed is totally controlled by service owner.
.setAttemptDirectPath(true);

// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(), defaultChannelProviderBuilder.build());
Expand Down Expand Up @@ -491,6 +504,73 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
}
}

private static String parseGrpcGcpApiConfig() {
InputStream inputStream = GapicSpannerRpc.class.getResourceAsStream(API_FILE);
nimf marked this conversation as resolved.
Show resolved Hide resolved
StringBuilder sb = new StringBuilder();
try {
for (int ch; (ch = inputStream.read()) != -1; ) {
sb.append((char) ch);
}
} catch (IOException e) {
throw newSpannerException(e);
}
return sb.toString();
}

// Enhance metric options for gRPC-GCP extension. Adds metric registry if not specified. Adds
// more labels on top of provided.
private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) {
GcpManagedChannelOptions grpcGcpOptions =
MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions());
GcpMetricsOptions metricsOptions = MoreObjects.firstNonNull(
grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build());
GcpMetricsOptions.Builder metricsOptionsBuilder = GcpMetricsOptions.newBuilder(metricsOptions);
if (metricsOptions.getMetricRegistry() == null) {
metricsOptionsBuilder.withMetricRegistry(Metrics.getMetricRegistry());
}
List<LabelKey> labelKeys = metricsOptions.getLabelKeys();
List<LabelValue> labelValues = metricsOptions.getLabelValues();
// TODO: Add default labels with values: client_id, database, instance_id.
nimf marked this conversation as resolved.
Show resolved Hide resolved
labelKeys.add(LabelKey.create("grpc_gcp_version", "gRPC-GCP library version"));
String gRpcGcpVersion = GcpManagedChannelBuilder.class.getPackage().getImplementationVersion();
if (gRpcGcpVersion == null) {
gRpcGcpVersion = "";
}
labelValues.add(LabelValue.create(gRpcGcpVersion));
metricsOptionsBuilder.withLabels(labelKeys, labelValues);
if (metricsOptions.getNamePrefix().equals("")) {
metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/");
nimf marked this conversation as resolved.
Show resolved Hide resolved
}
return GcpManagedChannelOptions.newBuilder(grpcGcpOptions)
.withMetricsOptions(metricsOptionsBuilder.build())
.build();
}

@SuppressWarnings("rawtypes")
private static void maybeEnableGrpcGcpExtension(InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder, final SpannerOptions options) {
if (!options.isUseGrpcGcpExtension()) {
return;
}

final String jsonApiConfig = parseGrpcGcpApiConfig();
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options);

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
channelBuilder -> {
if (options.getChannelConfigurator() != null) {
channelBuilder = options.getChannelConfigurator().apply(channelBuilder);
}
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(grpcGcpOptions)
.setPoolSize(options.getNumChannels());
};

// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
// Enable gRPC-GCP channel pool via the channel configurator.
defaultChannelProviderBuilder.setPoolSize(1).setChannelConfigurator(apiFunction);
}

private static HeaderProvider headerProviderWithUserAgentFrom(HeaderProvider headerProvider) {
final Map<String, String> headersWithUserAgent = new HashMap<>(headerProvider.getHeaders());
final String userAgent = headersWithUserAgent.get(USER_AGENT_KEY);
Expand Down
@@ -0,0 +1,106 @@
{
"channelPool": {
"maxSize": 3,
nimf marked this conversation as resolved.
Show resolved Hide resolved
"maxConcurrentStreamsLowWatermark": 1
},
"method": [
{
"name": ["google.spanner.v1.Spanner/CreateSession"],
"affinity" : {
"command": "BIND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/BatchCreateSessions"],
nimf marked this conversation as resolved.
Show resolved Hide resolved
nimf marked this conversation as resolved.
Show resolved Hide resolved
"affinity" : {
"command": "BIND",
"affinityKey": "session.name"
}
},
{
"name": ["google.spanner.v1.Spanner/GetSession"],
"affinity": {
"command": "BOUND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/DeleteSession"],
"affinity": {
"command": "UNBIND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteSql"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteBatchDml"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteStreamingSql"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Read"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/StreamingRead"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/BeginTransaction"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Commit"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/PartitionRead"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/PartitionQuery"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Rollback"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
}
]
}