Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Partitioned DML timeout was not always respected #203

Merged
merged 3 commits into from May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -159,4 +159,16 @@
<method>com.google.longrunning.Operation getOperation(java.lang.String)</method>
</difference>

<!-- Fix Partitioned DML timeout settings. -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>

</differences>
Expand Up @@ -29,7 +29,6 @@
import com.google.spanner.v1.TransactionSelector;
import java.util.Map;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

/** Partitioned DML transaction for bulk updates and deletes. */
class PartitionedDMLTransaction implements SessionTransaction {
Expand Down Expand Up @@ -63,7 +62,7 @@ private ByteString initTransaction() {
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted.
*/
long executePartitionedUpdate(final Statement statement, final Duration timeout) {
long executePartitionedUpdate(final Statement statement) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
Callable<com.google.spanner.v1.ResultSet> callable =
new Callable<com.google.spanner.v1.ResultSet>() {
Expand All @@ -84,7 +83,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout);
return rpc.executePartitionedDml(builder.build(), session.getOptions());
}
};
com.google.spanner.v1.ResultSet resultSet =
Expand Down
Expand Up @@ -105,7 +105,7 @@ public String getName() {
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout());
return txn.executePartitionedUpdate(stmt);
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -54,6 +55,7 @@
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -153,6 +155,7 @@
import org.threeten.bp.Duration;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@InternalApi
public class GapicSpannerRpc implements SpannerRpc {
/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
Expand Down Expand Up @@ -207,6 +210,7 @@ private synchronized void shutdown() {
private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final DatabaseAdminStub databaseAdminStub;
Expand Down Expand Up @@ -326,6 +330,22 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
SpannerStubSettings.Builder pdmlSettings = options.getSpannerStubSettings().toBuilder();
pdmlSettings
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
Comment on lines +345 to +346
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are PDML requests not meant to be retried? I'm just curious why the initial and max timeouts are the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question.

I would say it's a a matter of probabilities and a little bit opinion:

  1. PDML is designed to accept long running update statements. That means that we should not automatically retry the statement if it takes longer than X time, as it would be impossible to find a good global value for X, as we don't know whether the statement is taking a long time because of a network problem or because the statement itself is taking a long time. Re-sending a long-running statement that is still running on the server would only hurt performance. My opinion is that for a PDML statement the chance that the statement is actually taking a long time is more probable than a network problem.
  2. The PDML documentation also clearly states that the update statement should be idempotent. This means that we should automatically retry it if the server is unavailable.

I think we should consider adding an additional method to the public API which would allow the users to specify a lower timeout for a specific statement, in addition to the current global setting. That would give the user more control over the timeout setting for statements that are known to take less time than the global setting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an extra test case to ensure that PDML is retried on UNAVAILABLE.

.build());
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
GrpcInstanceAdminStub.create(
Expand Down Expand Up @@ -1029,9 +1049,9 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?

@Override
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout) {
GrpcCallContext context = newCallContext(options, request.getSession(), timeout);
return get(spannerStub.executeSqlCallable().futureCall(request, context));
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
}

@Override
Expand Down Expand Up @@ -1191,19 +1211,11 @@ private static <T> T get(final Future<T> future) throws SpannerException {

@VisibleForTesting
GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource) {
return newCallContext(options, resource, null);
}

private GrpcCallContext newCallContext(
@Nullable Map<Option, ?> options, String resource, Duration timeout) {
GrpcCallContext context = GrpcCallContext.createDefault();
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (timeout != null) {
context = context.withTimeout(timeout);
}
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
if (callCredentials != null) {
Expand Down
Expand Up @@ -57,7 +57,6 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
Expand Down Expand Up @@ -282,8 +281,7 @@ StreamingCall read(

ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
Expand Down