Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support setting compression option #192

Merged
merged 6 commits into from Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -49,6 +49,8 @@
import com.google.spanner.admin.database.v1.RestoreDatabaseRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.grpc.CallCredentials;
import io.grpc.CompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.net.MalformedURLException;
Expand All @@ -58,6 +60,7 @@
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Options for the Cloud Spanner service. */
Expand Down Expand Up @@ -104,6 +107,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final Map<DatabaseId, QueryOptions> mergedQueryOptions;

private final CallCredentialsProvider callCredentialsProvider;
private final String compressorName;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -174,6 +178,7 @@ private SpannerOptions(Builder builder) {
this.mergedQueryOptions = ImmutableMap.copyOf(merged);
}
callCredentialsProvider = builder.callCredentialsProvider;
compressorName = builder.compressorName;
}

/**
Expand Down Expand Up @@ -238,6 +243,7 @@ public static class Builder
private boolean autoThrottleAdministrativeRequests = false;
private Map<DatabaseId, QueryOptions> defaultQueryOptions = new HashMap<>();
private CallCredentialsProvider callCredentialsProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");

private Builder() {
Expand Down Expand Up @@ -309,6 +315,7 @@ private Builder() {
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.defaultQueryOptions = options.defaultQueryOptions;
this.callCredentialsProvider = options.callCredentialsProvider;
this.compressorName = options.compressorName;
this.channelProvider = options.channelProvider;
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
Expand Down Expand Up @@ -558,6 +565,28 @@ public Builder setCallCredentialsProvider(CallCredentialsProvider callCredential
return this;
}

/**
* Sets the compression to use for all gRPC calls. The compressor must be a valid name known in
* the {@link CompressorRegistry}.
*
* <p>Supported values are:
*
* <ul>
* <li>gzip: Enable gzip compression
* <li>identity: Disable compression
* <li><code>null</code>: Use default compression
* </ul>
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public Builder setCompressorName(@Nullable String compressorName) {
Preconditions.checkArgument(
compressorName == null
|| CompressorRegistry.getDefaultInstance().lookupCompressor(compressorName) != null,
String.format("%s is not a known compressor", compressorName));
this.compressorName = compressorName;
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -690,6 +719,10 @@ public CallCredentialsProvider getCallCredentialsProvider() {
return callCredentialsProvider;
}

public String getCompressorName() {
return compressorName;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.spi.v1;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;

class EncodingInterceptor implements ClientInterceptor {
private static final String RESPONSE_ENCODING_KEY_NAME = "x-response-encoding";
private static final Key<String> RESPONSE_ENCODING_KEY =
Metadata.Key.of(RESPONSE_ENCODING_KEY_NAME, Metadata.ASCII_STRING_MARSHALLER);

private final String encoding;

EncodingInterceptor(String encoding) {
this.encoding = encoding;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(RESPONSE_ENCODING_KEY, encoding);
super.start(responseListener, headers);
}
};
}
}
Expand Up @@ -229,6 +229,7 @@ private void awaitTermination() throws InterruptedException {
private final String projectName;
private final SpannerMetadataProvider metadataProvider;
private final CallCredentialsProvider callCredentialsProvider;
private final String compressorName;
private final Duration waitTimeout =
systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS);
private final Duration idleTimeout =
Expand Down Expand Up @@ -282,6 +283,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());
this.callCredentialsProvider = options.getCallCredentialsProvider();
this.compressorName = options.getCompressorName();

// Create a managed executor provider.
this.executorProvider =
Expand Down Expand Up @@ -310,9 +312,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
.withEncoding(compressorName))
.setHeaderProvider(mergedHeaderProvider)
.build());

Expand Down
Expand Up @@ -45,6 +45,10 @@ public static SpannerInterceptorProvider createDefault() {
return new SpannerInterceptorProvider(defaultInterceptors);
}

static SpannerInterceptorProvider create(GrpcInterceptorProvider provider) {
return new SpannerInterceptorProvider(ImmutableList.copyOf(provider.getInterceptors()));
}

public SpannerInterceptorProvider with(ClientInterceptor clientInterceptor) {
List<ClientInterceptor> interceptors =
ImmutableList.<ClientInterceptor>builder()
Expand All @@ -54,6 +58,13 @@ public SpannerInterceptorProvider with(ClientInterceptor clientInterceptor) {
return new SpannerInterceptorProvider(interceptors);
}

SpannerInterceptorProvider withEncoding(String encoding) {
if (encoding != null) {
return with(new EncodingInterceptor(encoding));
}
return this;
}

@Override
public List<ClientInterceptor> getInterceptors() {
return clientInterceptors;
Expand Down
Expand Up @@ -505,4 +505,35 @@ public String getOptimizerVersion() {
assertThat(options.getDefaultQueryOptions(DatabaseId.of("p", "i", "o")))
.isEqualTo(QueryOptions.newBuilder().setOptimizerVersion("2").build());
}

@Test
public void testCompressorName() {
assertThat(
SpannerOptions.newBuilder()
.setProjectId("p")
.setCompressorName("gzip")
.build()
.getCompressorName())
.isEqualTo("gzip");
assertThat(
SpannerOptions.newBuilder()
.setProjectId("p")
.setCompressorName("identity")
.build()
.getCompressorName())
.isEqualTo("identity");
assertThat(
SpannerOptions.newBuilder()
.setProjectId("p")
.setCompressorName(null)
.build()
.getCompressorName())
.isNull();
try {
SpannerOptions.newBuilder().setCompressorName("foo");
fail("missing expected exception");
} catch (IllegalArgumentException e) {
// ignore, this is the expected exception.
}
}
}
@@ -0,0 +1,68 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.it;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.ParallelIntegrationTest;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
public class ITSpannerOptionsTest {
@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
private static Database db;

@BeforeClass
public static void setUp() throws Exception {
db = env.getTestHelper().createTestDatabase();
}

@AfterClass
public static void tearDown() throws Exception {
db.drop();
}

@Test
public void testCompression() {
for (String compressorName : new String[] {"gzip", "identity", null}) {
SpannerOptions options =
env.getTestHelper().getOptions().toBuilder().setCompressorName(compressorName).build();
try (Spanner spanner = options.getService()) {
DatabaseClient client = spanner.getDatabaseClient(db.getId());
try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1 AS COL1"))) {
assertThat(rs.next()).isTrue();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to test an end to end compression and verify that the compressor is actually used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes absolutely. Once the support is in the staging environment, we can run this IT against it to test the change. @olavloite I'll give you a heads up when the feature is available in staging.

assertThat(rs.getLong(0)).isEqualTo(1L);
assertThat(rs.next()).isFalse();
}
}
}
}
}