Skip to content

Commit

Permalink
fix: Partitioned DML timeout was not always respected (#203)
Browse files Browse the repository at this point in the history
* fix: Partitioned DML timeout was not always respected

Setting a timeout value for Partitioned DML would not be respected if the timeout
value was higher than the timeout value set for the ExecuteSql RPC on the SpannerStub.
Lower timeout values would be respected.

Fixes #199

* fix: add ignored changes + InternalApi

* tests: add test for retry on UNAVAILABLE
  • Loading branch information
olavloite committed May 14, 2020
1 parent 50cb174 commit 13cb37e
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 64 deletions.
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -159,4 +159,16 @@
<method>com.google.longrunning.Operation getOperation(java.lang.String)</method>
</difference>

<!-- Fix Partitioned DML timeout settings. -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>

</differences>
Expand Up @@ -29,7 +29,6 @@
import com.google.spanner.v1.TransactionSelector;
import java.util.Map;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

/** Partitioned DML transaction for bulk updates and deletes. */
class PartitionedDMLTransaction implements SessionTransaction {
Expand Down Expand Up @@ -63,7 +62,7 @@ private ByteString initTransaction() {
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted.
*/
long executePartitionedUpdate(final Statement statement, final Duration timeout) {
long executePartitionedUpdate(final Statement statement) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
Callable<com.google.spanner.v1.ResultSet> callable =
new Callable<com.google.spanner.v1.ResultSet>() {
Expand All @@ -84,7 +83,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout);
return rpc.executePartitionedDml(builder.build(), session.getOptions());
}
};
com.google.spanner.v1.ResultSet resultSet =
Expand Down
Expand Up @@ -105,7 +105,7 @@ public String getName() {
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout());
return txn.executePartitionedUpdate(stmt);
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -54,6 +55,7 @@
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -153,6 +155,7 @@
import org.threeten.bp.Duration;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@InternalApi
public class GapicSpannerRpc implements SpannerRpc {
/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
Expand Down Expand Up @@ -207,6 +210,7 @@ private synchronized void shutdown() {
private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final DatabaseAdminStub databaseAdminStub;
Expand Down Expand Up @@ -326,6 +330,22 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
SpannerStubSettings.Builder pdmlSettings = options.getSpannerStubSettings().toBuilder();
pdmlSettings
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
.build());
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
GrpcInstanceAdminStub.create(
Expand Down Expand Up @@ -1029,9 +1049,9 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?

@Override
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout) {
GrpcCallContext context = newCallContext(options, request.getSession(), timeout);
return get(spannerStub.executeSqlCallable().futureCall(request, context));
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
}

@Override
Expand Down Expand Up @@ -1191,19 +1211,11 @@ private static <T> T get(final Future<T> future) throws SpannerException {

@VisibleForTesting
GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource) {
return newCallContext(options, resource, null);
}

private GrpcCallContext newCallContext(
@Nullable Map<Option, ?> options, String resource, Duration timeout) {
GrpcCallContext context = GrpcCallContext.createDefault();
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (timeout != null) {
context = context.withTimeout(timeout);
}
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
if (callCredentials != null) {
Expand Down
Expand Up @@ -57,7 +57,6 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
Expand Down Expand Up @@ -282,8 +281,7 @@ StreamingCall read(

ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
Expand Down

0 comments on commit 13cb37e

Please sign in to comment.