Skip to content

Commit

Permalink
feat: support setting timeout per RPC
Browse files Browse the repository at this point in the history
The Spanner client allows a user to set custom timeouts while creating a
SpannerOptions instance, but these timeouts are static and are applied to
all invocations of the RPCs. This change introduces the possibility to set
custom timeouts and other call options on a per-RPC basis.

Fixes #378
  • Loading branch information
olavloite committed Aug 10, 2020
1 parent 6f47b8a commit ec4cc30
Show file tree
Hide file tree
Showing 6 changed files with 870 additions and 95 deletions.
Expand Up @@ -18,9 +18,11 @@

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceDefaults;
Expand All @@ -29,6 +31,8 @@
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.InstanceAdminSettings;
Expand All @@ -44,11 +48,15 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
import io.grpc.CallCredentials;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -121,6 +129,324 @@ public static interface CallCredentialsProvider {
CallCredentials getCallCredentials();
}

/** Context key for the {@link CallContextConfigurator} to use. */
public static final Context.Key<CallContextConfigurator> CALL_CONTEXT_CONFIGURATOR_KEY =
Context.key("call-context-configurator");

/**
* {@link CallContextConfigurator} can be used to modify the {@link ApiCallContext} for one or
* more specific RPCs. This can be used to set specific timeout value for RPCs or use specific
* {@link CallCredentials} for an RPC. The {@link CallContextConfigurator} must be set as a value
* on the {@link Context} using the {@link SpannerOptions#CALL_CONTEXT_CONFIGURATOR_KEY} key.
*
* <p>This API is meant for advanced users. Most users should instead use the {@link
* SpannerCallContextTimeoutConfigurator} for setting timeouts per RPC.
*
* <p>Example usage:
*
* <pre>{@code
* CallContextConfigurator configurator =
* new CallContextConfigurator() {
* public <ReqT, RespT> ApiCallContext configure(
* ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
* if (method == SpannerGrpc.getExecuteBatchDmlMethod()) {
* return GrpcCallContext.createDefault()
* .withCallOptions(CallOptions.DEFAULT.withDeadlineAfter(60L, TimeUnit.SECONDS));
* }
* return null;
* }
* };
* Context context =
* Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
* context.run(
* new Runnable() {
* public void run() {
* try {
* client
* .readWriteTransaction()
* .run(
* new TransactionCallable<long[]>() {
* public long[] run(TransactionContext transaction) throws Exception {
* return transaction.batchUpdate(
* ImmutableList.of(statement1, statement2));
* }
* });
* } catch (SpannerException e) {
* if (e.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED) {
* // handle timeout exception.
* }
* }
* }
* });
* }</pre>
*/
public static interface CallContextConfigurator {
/**
* Configure a {@link ApiCallContext} for a specific RPC call.
*
* @param context The default context. This can be used to inspect the current values.
* @param request The request that will be sent.
* @param method The method that is being called.
* @return An {@link ApiCallContext} that will be merged with the default {@link
* ApiCallContext}. If <code>null</code> is returned, no changes to the default {@link
* ApiCallContext} will be made.
*/
@Nullable
<ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method);
}

private enum SpannerMethod {
COMMIT {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
return method == SpannerGrpc.getCommitMethod();
}
},
ROLLBACK {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
return method == SpannerGrpc.getRollbackMethod();
}
},

EXECUTE_QUERY {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
// This also matches with Partitioned DML calls, but that call will override any timeout
// settings anyway.
return method == SpannerGrpc.getExecuteStreamingSqlMethod();
}
},
READ {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
return method == SpannerGrpc.getStreamingReadMethod();
}
},
EXECUTE_UPDATE {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
if (method == SpannerGrpc.getExecuteSqlMethod()) {
ExecuteSqlRequest sqlRequest = (ExecuteSqlRequest) request;
return sqlRequest.getSeqno() != 0L;
}
return false;
}
},
BATCH_UPDATE {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
return method == SpannerGrpc.getExecuteBatchDmlMethod();
}
},

PARTITION_QUERY {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
return method == SpannerGrpc.getPartitionQueryMethod();
}
},
PARTITION_READ {
@Override
<ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method) {
return method == SpannerGrpc.getPartitionReadMethod();
}
};

abstract <ReqT, RespT> boolean isMethod(ReqT request, MethodDescriptor<ReqT, RespT> method);

static <ReqT, RespT> SpannerMethod valueOf(ReqT request, MethodDescriptor<ReqT, RespT> method) {
for (SpannerMethod m : SpannerMethod.values()) {
if (m.isMethod(request, method)) {
return m;
}
}
return null;
}
}

/**
* Helper class to configure timeouts for specific Spanner RPCs. The {@link
* SpannerCallContextTimeoutConfigurator} must be set as a value on the {@link Context} using the
* {@link SpannerOptions#CALL_CONTEXT_CONFIGURATOR_KEY} key.
*
* <p>Example usage:
*
* <pre>{@code
* // Create a context with a ExecuteQuery timeout of 10 seconds.
* Context context =
* Context.current()
* .withValue(
* SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
* SpannerCallContextTimeoutConfigurator.create()
* .withExecuteQueryTimeout(Duration.ofSeconds(10L)));
* context.run(
* new Runnable() {
* public void run() {
* try (ResultSet rs =
* client
* .singleUse()
* .executeQuery(
* Statement.of(
* "SELECT SingerId, FirstName, LastName FROM Singers ORDER BY LastName"))) {
* while (rs.next()) {
* System.out.printf("%d %s %s%n", rs.getLong(0), rs.getString(1), rs.getString(2));
* }
* } catch (SpannerException e) {
* if (e.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED) {
* // Handle timeout.
* }
* }
* }
* });
* }</pre>
*/
public static class SpannerCallContextTimeoutConfigurator implements CallContextConfigurator {
private Duration commitTimeout;
private Duration rollbackTimeout;

private Duration executeQueryTimeout;
private Duration executeUpdateTimeout;
private Duration batchUpdateTimeout;
private Duration readTimeout;

private Duration partitionQueryTimeout;
private Duration partitionReadTimeout;

public static SpannerCallContextTimeoutConfigurator create() {
return new SpannerCallContextTimeoutConfigurator();
}

private SpannerCallContextTimeoutConfigurator() {}

@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
SpannerMethod spannerMethod = SpannerMethod.valueOf(request, method);
if (spannerMethod == null) {
return null;
}
switch (SpannerMethod.valueOf(request, method)) {
case BATCH_UPDATE:
return batchUpdateTimeout == null
? null
: GrpcCallContext.createDefault().withTimeout(batchUpdateTimeout);
case COMMIT:
return commitTimeout == null
? null
: GrpcCallContext.createDefault().withTimeout(commitTimeout);
case EXECUTE_QUERY:
return executeQueryTimeout == null
? null
: GrpcCallContext.createDefault()
.withTimeout(executeQueryTimeout)
.withStreamWaitTimeout(executeQueryTimeout);
case EXECUTE_UPDATE:
return executeUpdateTimeout == null
? null
: GrpcCallContext.createDefault().withTimeout(executeUpdateTimeout);
case PARTITION_QUERY:
return partitionQueryTimeout == null
? null
: GrpcCallContext.createDefault().withTimeout(partitionQueryTimeout);
case PARTITION_READ:
return partitionReadTimeout == null
? null
: GrpcCallContext.createDefault().withTimeout(partitionReadTimeout);
case READ:
return readTimeout == null
? null
: GrpcCallContext.createDefault()
.withTimeout(readTimeout)
.withStreamWaitTimeout(readTimeout);
case ROLLBACK:
return rollbackTimeout == null
? null
: GrpcCallContext.createDefault().withTimeout(rollbackTimeout);
default:
}
return null;
}

public Duration getCommitTimeout() {
return commitTimeout;
}

public SpannerCallContextTimeoutConfigurator withCommitTimeout(Duration commitTimeout) {
this.commitTimeout = commitTimeout;
return this;
}

public Duration getRollbackTimeout() {
return rollbackTimeout;
}

public SpannerCallContextTimeoutConfigurator withRollbackTimeout(Duration rollbackTimeout) {
this.rollbackTimeout = rollbackTimeout;
return this;
}

public Duration getExecuteQueryTimeout() {
return executeQueryTimeout;
}

public SpannerCallContextTimeoutConfigurator withExecuteQueryTimeout(
Duration executeQueryTimeout) {
this.executeQueryTimeout = executeQueryTimeout;
return this;
}

public Duration getExecuteUpdateTimeout() {
return executeUpdateTimeout;
}

public SpannerCallContextTimeoutConfigurator withExecuteUpdateTimeout(
Duration executeUpdateTimeout) {
this.executeUpdateTimeout = executeUpdateTimeout;
return this;
}

public Duration getBatchUpdateTimeout() {
return batchUpdateTimeout;
}

public SpannerCallContextTimeoutConfigurator withBatchUpdateTimeout(
Duration batchUpdateTimeout) {
this.batchUpdateTimeout = batchUpdateTimeout;
return this;
}

public Duration getReadTimeout() {
return readTimeout;
}

public SpannerCallContextTimeoutConfigurator withReadTimeout(Duration readTimeout) {
this.readTimeout = readTimeout;
return this;
}

public Duration getPartitionQueryTimeout() {
return partitionQueryTimeout;
}

public SpannerCallContextTimeoutConfigurator withPartitionQueryTimeout(
Duration partitionQueryTimeout) {
this.partitionQueryTimeout = partitionQueryTimeout;
return this;
}

public Duration getPartitionReadTimeout() {
return partitionReadTimeout;
}

public SpannerCallContextTimeoutConfigurator withPartitionReadTimeout(
Duration partitionReadTimeout) {
this.partitionReadTimeout = partitionReadTimeout;
return this;
}
}

/** Default implementation of {@code SpannerFactory}. */
private static class DefaultSpannerFactory implements SpannerFactory {
private static final DefaultSpannerFactory INSTANCE = new DefaultSpannerFactory();
Expand Down

0 comments on commit ec4cc30

Please sign in to comment.