Skip to content

Commit

Permalink
feat: support setting compression option
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Jun 17, 2020
1 parent b7bac19 commit 2a4c19e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 0 deletions.
Expand Up @@ -49,6 +49,8 @@
import com.google.spanner.admin.database.v1.RestoreDatabaseRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.grpc.CallCredentials;
import io.grpc.CompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.net.MalformedURLException;
Expand All @@ -58,6 +60,7 @@
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Options for the Cloud Spanner service. */
Expand Down Expand Up @@ -104,6 +107,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final Map<DatabaseId, QueryOptions> mergedQueryOptions;

private final CallCredentialsProvider callCredentialsProvider;
private final String compressorName;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -174,6 +178,7 @@ private SpannerOptions(Builder builder) {
this.mergedQueryOptions = ImmutableMap.copyOf(merged);
}
callCredentialsProvider = builder.callCredentialsProvider;
compressorName = builder.compressorName;
}

/**
Expand Down Expand Up @@ -238,6 +243,7 @@ public static class Builder
private boolean autoThrottleAdministrativeRequests = false;
private Map<DatabaseId, QueryOptions> defaultQueryOptions = new HashMap<>();
private CallCredentialsProvider callCredentialsProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");

private Builder() {
Expand Down Expand Up @@ -309,6 +315,7 @@ private Builder() {
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.defaultQueryOptions = options.defaultQueryOptions;
this.callCredentialsProvider = options.callCredentialsProvider;
this.compressorName = options.compressorName;
this.channelProvider = options.channelProvider;
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
Expand Down Expand Up @@ -558,6 +565,28 @@ public Builder setCallCredentialsProvider(CallCredentialsProvider callCredential
return this;
}

/**
* Sets the compression to use for all gRPC calls. The compressor must be a valid name known in
* the {@link CompressorRegistry}.
*
* <p>Supported values are:
*
* <ul>
* <li>gzip: Enable gzip compression
* <li>identity: Disable compression
* <li><code>null</code>: Use default comporession
* <ul>
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public Builder setCompressorName(@Nullable String compressorName) {
Preconditions.checkArgument(
compressorName == null
|| CompressorRegistry.getDefaultInstance().lookupCompressor(compressorName) != null,
String.format("%s is not a known compressor", compressorName));
this.compressorName = compressorName;
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -690,6 +719,10 @@ public CallCredentialsProvider getCallCredentialsProvider() {
return callCredentialsProvider;
}

public String getCompressorName() {
return compressorName;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Expand Up @@ -226,6 +226,7 @@ private void awaitTermination() throws InterruptedException {
private final String projectName;
private final SpannerMetadataProvider metadataProvider;
private final CallCredentialsProvider callCredentialsProvider;
private final String compressorName;
private final Duration waitTimeout =
systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS);
private final Duration idleTimeout =
Expand Down Expand Up @@ -279,6 +280,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());
this.callCredentialsProvider = options.getCallCredentialsProvider();
this.compressorName = options.getCompressorName();

// Create a managed executor provider.
this.executorProvider =
Expand Down Expand Up @@ -1239,6 +1241,9 @@ GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource
context.withCallOptions(context.getCallOptions().withCallCredentials(callCredentials));
}
}
if (compressorName != null) {
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
}
return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(idleTimeout);
}

Expand Down
Expand Up @@ -505,4 +505,21 @@ public String getOptimizerVersion() {
assertThat(options.getDefaultQueryOptions(DatabaseId.of("p", "i", "o")))
.isEqualTo(QueryOptions.newBuilder().setOptimizerVersion("2").build());
}

@Test
public void testCompressorName() {
assertThat(SpannerOptions.newBuilder().setCompressorName("gzip").build().getCompressorName())
.isEqualTo("gzip");
assertThat(
SpannerOptions.newBuilder().setCompressorName("identity").build().getCompressorName())
.isEqualTo("identity");
assertThat(SpannerOptions.newBuilder().setCompressorName(null).build().getCompressorName())
.isNull();
try {
SpannerOptions.newBuilder().setCompressorName("foo");
fail("missing expected exception");
} catch (IllegalArgumentException e) {
// ignore, this is the expected exception.
}
}
}
Expand Up @@ -168,6 +168,27 @@ public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedExcep
assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount);
}

@Test
public void testCompression() {
for (String compressorName : new String[] {"gzip", "identity", null}) {
SpannerOptions options =
env.getTestHelper().getOptions().toBuilder().setCompressorName(compressorName).build();
try (Spanner spanner = options.getService()) {
DatabaseClient client = spanner.getDatabaseClient(db.getId());
try (ResultSet rs =
client
.singleUse()
.executeQuery(Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL2"))) {
assertThat(rs.next()).isTrue();
assertThat(rs.getLong(0)).isEqualTo(1L);
assertThat(rs.next()).isTrue();
assertThat(rs.getLong(0)).isEqualTo(2L);
assertThat(rs.next()).isFalse();
}
}
}
}

private void waitForStartup() throws InterruptedException {
// Wait until the IT environment has already started all base worker threads.
int threadCount;
Expand Down

0 comments on commit 2a4c19e

Please sign in to comment.