Skip to content

Commit

Permalink
feat: support setting compression option (#192)
Browse files Browse the repository at this point in the history
* feat: support setting compression option

* fix: use correct encoding header

* fix: add project id

* docs: fix typo + missing end tag

* chore: resolve merge conflicts

* tests: fix test failure on emulator
  • Loading branch information
olavloite committed Jun 30, 2020
1 parent df47c13 commit 965e95e
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 3 deletions.
Expand Up @@ -49,6 +49,8 @@
import com.google.spanner.admin.database.v1.RestoreDatabaseRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.grpc.CallCredentials;
import io.grpc.CompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.net.MalformedURLException;
Expand All @@ -58,6 +60,7 @@
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Options for the Cloud Spanner service. */
Expand Down Expand Up @@ -104,6 +107,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final Map<DatabaseId, QueryOptions> mergedQueryOptions;

private final CallCredentialsProvider callCredentialsProvider;
private final String compressorName;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -174,6 +178,7 @@ private SpannerOptions(Builder builder) {
this.mergedQueryOptions = ImmutableMap.copyOf(merged);
}
callCredentialsProvider = builder.callCredentialsProvider;
compressorName = builder.compressorName;
}

/**
Expand Down Expand Up @@ -238,6 +243,7 @@ public static class Builder
private boolean autoThrottleAdministrativeRequests = false;
private Map<DatabaseId, QueryOptions> defaultQueryOptions = new HashMap<>();
private CallCredentialsProvider callCredentialsProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");

private Builder() {
Expand Down Expand Up @@ -309,6 +315,7 @@ private Builder() {
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.defaultQueryOptions = options.defaultQueryOptions;
this.callCredentialsProvider = options.callCredentialsProvider;
this.compressorName = options.compressorName;
this.channelProvider = options.channelProvider;
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
Expand Down Expand Up @@ -558,6 +565,28 @@ public Builder setCallCredentialsProvider(CallCredentialsProvider callCredential
return this;
}

/**
* Sets the compression to use for all gRPC calls. The compressor must be a valid name known in
* the {@link CompressorRegistry}.
*
* <p>Supported values are:
*
* <ul>
* <li>gzip: Enable gzip compression
* <li>identity: Disable compression
* <li><code>null</code>: Use default compression
* </ul>
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public Builder setCompressorName(@Nullable String compressorName) {
Preconditions.checkArgument(
compressorName == null
|| CompressorRegistry.getDefaultInstance().lookupCompressor(compressorName) != null,
String.format("%s is not a known compressor", compressorName));
this.compressorName = compressorName;
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -690,6 +719,10 @@ public CallCredentialsProvider getCallCredentialsProvider() {
return callCredentialsProvider;
}

public String getCompressorName() {
return compressorName;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.spi.v1;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;

class EncodingInterceptor implements ClientInterceptor {
private static final String RESPONSE_ENCODING_KEY_NAME = "x-response-encoding";
private static final Key<String> RESPONSE_ENCODING_KEY =
Metadata.Key.of(RESPONSE_ENCODING_KEY_NAME, Metadata.ASCII_STRING_MARSHALLER);

private final String encoding;

EncodingInterceptor(String encoding) {
this.encoding = encoding;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(RESPONSE_ENCODING_KEY, encoding);
super.start(responseListener, headers);
}
};
}
}
Expand Up @@ -231,6 +231,7 @@ private void awaitTermination() throws InterruptedException {
private final String projectName;
private final SpannerMetadataProvider metadataProvider;
private final CallCredentialsProvider callCredentialsProvider;
private final String compressorName;
private final Duration waitTimeout =
systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS);
private final Duration idleTimeout =
Expand Down Expand Up @@ -284,6 +285,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());
this.callCredentialsProvider = options.getCallCredentialsProvider();
this.compressorName = options.getCompressorName();

// Create a managed executor provider.
this.executorProvider =
Expand Down Expand Up @@ -312,9 +314,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
.setInterceptorProvider(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
.withEncoding(compressorName))
.setHeaderProvider(mergedHeaderProvider)
.build());

Expand Down
Expand Up @@ -45,6 +45,10 @@ public static SpannerInterceptorProvider createDefault() {
return new SpannerInterceptorProvider(defaultInterceptors);
}

static SpannerInterceptorProvider create(GrpcInterceptorProvider provider) {
return new SpannerInterceptorProvider(ImmutableList.copyOf(provider.getInterceptors()));
}

public SpannerInterceptorProvider with(ClientInterceptor clientInterceptor) {
List<ClientInterceptor> interceptors =
ImmutableList.<ClientInterceptor>builder()
Expand All @@ -54,6 +58,13 @@ public SpannerInterceptorProvider with(ClientInterceptor clientInterceptor) {
return new SpannerInterceptorProvider(interceptors);
}

SpannerInterceptorProvider withEncoding(String encoding) {
if (encoding != null) {
return with(new EncodingInterceptor(encoding));
}
return this;
}

@Override
public List<ClientInterceptor> getInterceptors() {
return clientInterceptors;
Expand Down
Expand Up @@ -505,4 +505,35 @@ public String getOptimizerVersion() {
assertThat(options.getDefaultQueryOptions(DatabaseId.of("p", "i", "o")))
.isEqualTo(QueryOptions.newBuilder().setOptimizerVersion("2").build());
}

@Test
public void testCompressorName() {
assertThat(
SpannerOptions.newBuilder()
.setProjectId("p")
.setCompressorName("gzip")
.build()
.getCompressorName())
.isEqualTo("gzip");
assertThat(
SpannerOptions.newBuilder()
.setProjectId("p")
.setCompressorName("identity")
.build()
.getCompressorName())
.isEqualTo("identity");
assertThat(
SpannerOptions.newBuilder()
.setProjectId("p")
.setCompressorName(null)
.build()
.getCompressorName())
.isNull();
try {
SpannerOptions.newBuilder().setCompressorName("foo");
fail("missing expected exception");
} catch (IllegalArgumentException e) {
// ignore, this is the expected exception.
}
}
}
@@ -0,0 +1,68 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.it;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.ParallelIntegrationTest;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
public class ITSpannerOptionsTest {
@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
private static Database db;

@BeforeClass
public static void setUp() throws Exception {
db = env.getTestHelper().createTestDatabase();
}

@AfterClass
public static void tearDown() throws Exception {
db.drop();
}

@Test
public void testCompression() {
for (String compressorName : new String[] {"gzip", "identity", null}) {
SpannerOptions options =
env.getTestHelper().getOptions().toBuilder().setCompressorName(compressorName).build();
try (Spanner spanner = options.getService()) {
DatabaseClient client = spanner.getDatabaseClient(db.getId());
try (ResultSet rs = client.singleUse().executeQuery(Statement.of("SELECT 1 AS COL1"))) {
assertThat(rs.next()).isTrue();
assertThat(rs.getLong(0)).isEqualTo(1L);
assertThat(rs.next()).isFalse();
}
}
}
}
}

0 comments on commit 965e95e

Please sign in to comment.