Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PDML retry settings were not applied for aborted tx #232

Merged
merged 2 commits into from May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -170,5 +170,10 @@
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.retrying.RetrySettings getPartitionedDmlRetrySettings()</method>
</difference>

</differences>
Expand Up @@ -87,7 +87,8 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
}
};
com.google.spanner.v1.ResultSet resultSet =
SpannerRetryHelper.runTxWithRetriesOnAborted(callable);
SpannerRetryHelper.runTxWithRetriesOnAborted(
callable, rpc.getPartitionedDmlRetrySettings());
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"Partitioned DML response missing stats possibly due to non-DML statement as input");
Expand Down
Expand Up @@ -53,6 +53,14 @@ class SpannerRetryHelper {

/** Executes the {@link Callable} and retries if it fails with an {@link AbortedException}. */
static <T> T runTxWithRetriesOnAborted(Callable<T> callable) {
return runTxWithRetriesOnAborted(callable, txRetrySettings);
}

/**
* Executes the {@link Callable} and retries if it fails with an {@link AbortedException} using
* the specific {@link RetrySettings}.
*/
static <T> T runTxWithRetriesOnAborted(Callable<T> callable, RetrySettings retrySettings) {
try {
return RetryHelper.runWithRetries(
callable, txRetrySettings, new TxRetryAlgorithm<>(), NanoClock.getDefaultClock());
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
Expand Down Expand Up @@ -217,6 +218,7 @@ private void awaitTermination() throws InterruptedException {
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final DatabaseAdminStub databaseAdminStub;
Expand Down Expand Up @@ -300,7 +302,7 @@ public GapicSpannerRpc(final SpannerOptions options) {

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS * 1000))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you really intend to set the keepalive time to be over a day? This is a value of 120,000 seconds.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Thanks for noticing. Nope, that's a typical seconds/milliseconds mistake.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Then check if SpannerOptions provides an InterceptorProvider. Create a default
// SpannerInterceptorProvider if none is provided
Expand Down Expand Up @@ -336,21 +338,24 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
.setTotalTimeout(options.getPartitionedDmlTimeout())
.setRpcTimeoutMultiplier(1.0)
.build();
SpannerStubSettings.Builder pdmlSettings = options.getSpannerStubSettings().toBuilder();
pdmlSettings
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
.build());
.setRetrySettings(partitionedDmlRetrySettings);
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
Expand Down Expand Up @@ -1060,6 +1065,11 @@ public ResultSet executePartitionedDml(
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
}

@Override
public RetrySettings getPartitionedDmlRetrySettings() {
return partitionedDmlRetrySettings;
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
Expand Down Expand Up @@ -283,6 +284,8 @@ StreamingCall read(

ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

RetrySettings getPartitionedDmlRetrySettings();

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down