Skip to content

Commit

Permalink
fix: PDML retry settings were not applied for aborted tx (#232)
Browse files Browse the repository at this point in the history
* fix: PDML retry settings were not applied for aborted tx

The PartitionedDML retry settings were only applied for the RPC, and not
for the generic retryer that would retry the PDML transaction if it was
aborted by Spanner. This could cause long-running PDML transactions to
fail with an Aborted exception.

Fixes #199

* fix: add ignored diff to clirr
  • Loading branch information
olavloite committed May 20, 2020
1 parent be7d713 commit 308a465
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 11 deletions.
5 changes: 5 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -170,5 +170,10 @@
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.retrying.RetrySettings getPartitionedDmlRetrySettings()</method>
</difference>

</differences>
Expand Up @@ -87,7 +87,8 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
}
};
com.google.spanner.v1.ResultSet resultSet =
SpannerRetryHelper.runTxWithRetriesOnAborted(callable);
SpannerRetryHelper.runTxWithRetriesOnAborted(
callable, rpc.getPartitionedDmlRetrySettings());
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"Partitioned DML response missing stats possibly due to non-DML statement as input");
Expand Down
Expand Up @@ -53,6 +53,14 @@ class SpannerRetryHelper {

/** Executes the {@link Callable} and retries if it fails with an {@link AbortedException}. */
static <T> T runTxWithRetriesOnAborted(Callable<T> callable) {
return runTxWithRetriesOnAborted(callable, txRetrySettings);
}

/**
* Executes the {@link Callable} and retries if it fails with an {@link AbortedException} using
* the specific {@link RetrySettings}.
*/
static <T> T runTxWithRetriesOnAborted(Callable<T> callable, RetrySettings retrySettings) {
try {
return RetryHelper.runWithRetries(
callable, txRetrySettings, new TxRetryAlgorithm<>(), NanoClock.getDefaultClock());
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
Expand Down Expand Up @@ -217,6 +218,7 @@ private void awaitTermination() throws InterruptedException {
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final DatabaseAdminStub databaseAdminStub;
Expand Down Expand Up @@ -300,7 +302,7 @@ public GapicSpannerRpc(final SpannerOptions options) {

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS * 1000))

// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
Expand Down Expand Up @@ -336,21 +338,24 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
.setTotalTimeout(options.getPartitionedDmlTimeout())
.setRpcTimeoutMultiplier(1.0)
.build();
SpannerStubSettings.Builder pdmlSettings = options.getSpannerStubSettings().toBuilder();
pdmlSettings
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
.build());
.setRetrySettings(partitionedDmlRetrySettings);
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
Expand Down Expand Up @@ -1060,6 +1065,11 @@ public ResultSet executePartitionedDml(
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
}

@Override
public RetrySettings getPartitionedDmlRetrySettings() {
return partitionedDmlRetrySettings;
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
Expand Down Expand Up @@ -283,6 +284,8 @@ StreamingCall read(

ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

RetrySettings getPartitionedDmlRetrySettings();

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down

0 comments on commit 308a465

Please sign in to comment.