Skip to content

Commit

Permalink
perf: use streaming RPC for PDML (#287)
Browse files Browse the repository at this point in the history
* perf: use streaming RPC for PDML

* fix: reset resume token for each tx

* cleanup: remove test code

* fix: retry depening on resume token

* fix: remove unused attempt param

* fix: fix check for resume token

* fix: keep track of total timeout

* fix: clirr build failure

* cleanup: add comments + remove unused code

* tests: add missing exec time

* chore: run formatter

* chore: remove unnecessary null check

* tests: add missing exec time
  • Loading branch information
olavloite committed Jun 30, 2020
1 parent 9483925 commit df47c13
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 43 deletions.
6 changes: 6 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -176,4 +176,10 @@
<method>com.google.api.gax.retrying.RetrySettings getPartitionedDmlRetrySettings()</method>
</difference>

<!-- Streaming PDML -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.rpc.ServerStream executeStreamingPartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
</differences>
Expand Up @@ -18,20 +18,32 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status.Code;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

/** Partitioned DML transaction for bulk updates and deletes. */
class PartitionedDMLTransaction implements SessionTransaction {
private static final Logger log = Logger.getLogger(PartitionedDMLTransaction.class.getName());

private final SessionImpl session;
private final SpannerRpc rpc;
private volatile boolean isValid = true;
Expand Down Expand Up @@ -60,41 +72,88 @@ private ByteString initTransaction() {

/**
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted.
* transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the
* statement, and will retry the stream if an {@link UnavailableException} is thrown, using the
* last seen resume token if the server returns any.
*/
long executePartitionedUpdate(final Statement statement) {
long executeStreamingPartitionedUpdate(final Statement statement, Duration timeout) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
Callable<com.google.spanner.v1.ResultSet> callable =
new Callable<com.google.spanner.v1.ResultSet>() {
@Override
public com.google.spanner.v1.ResultSet call() throws Exception {
ByteString transactionId = initTransaction();
final ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(QueryMode.NORMAL)
.setSession(session.getName())
.setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
log.log(Level.FINER, "Starting PartitionedUpdate statement");
boolean foundStats = false;
long updateCount = 0L;
Duration remainingTimeout = timeout;
Stopwatch stopWatch = Stopwatch.createStarted();
try {
// Loop to catch AbortedExceptions.
while (true) {
ByteString resumeToken = ByteString.EMPTY;
try {
ByteString transactionId = initTransaction();
final ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(QueryMode.NORMAL)
.setSession(session.getName())
.setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
while (true) {
remainingTimeout =
remainingTimeout.minus(stopWatch.elapsed(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS);
try {
builder.setResumeToken(resumeToken);
ServerStream<PartialResultSet> stream =
rpc.executeStreamingPartitionedDml(
builder.build(), session.getOptions(), remainingTimeout);
for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !ByteString.EMPTY.equals(rs.getResumeToken())) {
resumeToken = rs.getResumeToken();
}
if (rs.hasStats()) {
foundStats = true;
updateCount += rs.getStats().getRowCountLowerBound();
}
}
break;
} catch (UnavailableException e) {
// Retry the stream in the same transaction if the stream breaks with
// UnavailableException and we have a resume token. Otherwise, we just retry the
// entire transaction.
if (!ByteString.EMPTY.equals(resumeToken)) {
log.log(
Level.FINER,
"Retrying PartitionedDml stream using resume token '"
+ resumeToken.toStringUtf8()
+ "' because of broken stream",
e);
} else {
throw new com.google.api.gax.rpc.AbortedException(
e, GrpcStatusCode.of(Code.ABORTED), true);
}
}
return rpc.executePartitionedDml(builder.build(), session.getOptions());
}
};
com.google.spanner.v1.ResultSet resultSet =
SpannerRetryHelper.runTxWithRetriesOnAborted(
callable, rpc.getPartitionedDmlRetrySettings());
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"Partitioned DML response missing stats possibly due to non-DML statement as input");
break;
} catch (com.google.api.gax.rpc.AbortedException e) {
// Retry using a new transaction but with the same session if the transaction is aborted.
log.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
}
}
if (!foundStats) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"Partitioned DML response missing stats possibly due to non-DML statement as input");
}
log.log(Level.FINER, "Finished PartitionedUpdate statement");
return updateCount;
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
// For partitioned DML, using the row count lower bound.
return resultSet.getStats().getRowCountLowerBound();
}

@Override
Expand Down
Expand Up @@ -105,7 +105,8 @@ public String getName() {
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
return txn.executePartitionedUpdate(stmt);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout());
}

@Override
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.cloud.spanner.SpannerException.DoNotConstructDirectly;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
Expand Down Expand Up @@ -212,7 +213,14 @@ private static SpannerException newSpannerExceptionPreformatted(
}

private static SpannerException fromApiException(ApiException exception) {
Status.Code code = ((GrpcStatusCode) exception.getStatusCode()).getTransportCode();
Status.Code code;
if (exception.getStatusCode() instanceof GrpcStatusCode) {
code = ((GrpcStatusCode) exception.getStatusCode()).getTransportCode();
} else if (exception instanceof WatchdogTimeoutException) {
code = Status.Code.DEADLINE_EXCEEDED;
} else {
code = Status.Code.UNKNOWN;
}
ErrorCode errorCode = ErrorCode.fromGrpcStatus(Status.fromCode(code));
if (exception.getCause() != null) {
return SpannerExceptionFactory.newSpannerException(
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.rpc.OperationCallable;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -54,6 +55,7 @@
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand Down Expand Up @@ -359,6 +361,21 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(partitionedDmlRetrySettings);
// The stream watchdog will by default only check for a timeout every 10 seconds, so if the
// timeout is less than 10 seconds, it would be ignored for the first 10 seconds unless we
// also change the StreamWatchdogCheckInterval.
if (options
.getPartitionedDmlTimeout()
.dividedBy(10L)
.compareTo(pdmlSettings.getStreamWatchdogCheckInterval())
< 0) {
pdmlSettings.setStreamWatchdogCheckInterval(
options.getPartitionedDmlTimeout().dividedBy(10));
pdmlSettings.setStreamWatchdogProvider(
pdmlSettings
.getStreamWatchdogProvider()
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
}
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
Expand Down Expand Up @@ -1073,6 +1090,14 @@ public RetrySettings getPartitionedDmlRetrySettings() {
return partitionedDmlRetrySettings;
}

@Override
public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
GrpcCallContext context = newCallContext(options, request.getSession());
context = context.withStreamWaitTimeout(timeout);
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
Expand Down Expand Up @@ -286,6 +288,9 @@ StreamingCall read(

RetrySettings getPartitionedDmlRetrySettings();

ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Expand Up @@ -169,7 +169,7 @@ public void testExecutePartitionedDmlAborted() {
* A valid query that returns a {@link ResultSet} should not be accepted by a partitioned dml
* transaction.
*/
@Test(expected = IllegalArgumentException.class)
@Test(expected = SpannerException.class)
public void testExecutePartitionedDmlWithQuery() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
Expand Down Expand Up @@ -234,20 +234,22 @@ public Void run(TransactionContext transaction) {

@Test
public void testPartitionedDmlWithLowerTimeout() {
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set PDML timeout value.
builder.setPartitionedDmlTimeout(Duration.ofMillis(100L));
builder.setPartitionedDmlTimeout(Duration.ofMillis(10L));
try (Spanner spanner = builder.build().getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
assertThat(spanner.getOptions().getPartitionedDmlTimeout())
.isEqualTo(Duration.ofMillis(100L));
assertThat(spanner.getOptions().getPartitionedDmlTimeout()).isEqualTo(Duration.ofMillis(10L));
// PDML should timeout with these settings.
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
try {
client.executePartitionedUpdate(UPDATE_STATEMENT);
fail("expected DEADLINE_EXCEEDED");
Expand Down Expand Up @@ -275,7 +277,8 @@ public Long run(TransactionContext transaction) {

@Test
public void testPartitionedDmlWithHigherTimeout() {
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
Expand Down Expand Up @@ -307,6 +310,7 @@ public void testPartitionedDmlWithHigherTimeout() {
long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT);

// Normal DML should timeout as it should use the ExecuteSQL RPC settings.
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
try {
client
.readWriteTransaction()
Expand Down

0 comments on commit df47c13

Please sign in to comment.