Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use streaming RPC for PDML #287

Merged
merged 14 commits into from Jun 30, 2020
6 changes: 6 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -176,4 +176,10 @@
<method>com.google.api.gax.retrying.RetrySettings getPartitionedDmlRetrySettings()</method>
</difference>

<!-- Streaming PDML -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.rpc.ServerStream executeStreamingPartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
</differences>
Expand Up @@ -18,20 +18,32 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.grpc.Status.Code;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

/** Partitioned DML transaction for bulk updates and deletes. */
class PartitionedDMLTransaction implements SessionTransaction {
private static final Logger log = Logger.getLogger(PartitionedDMLTransaction.class.getName());

private final SessionImpl session;
private final SpannerRpc rpc;
private volatile boolean isValid = true;
Expand Down Expand Up @@ -60,41 +72,88 @@ private ByteString initTransaction() {

/**
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted.
* transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the
* statement, and will retry the stream if an {@link UnavailableException} is thrown, using the
* last seen resume token if the server returns any.
*/
long executePartitionedUpdate(final Statement statement) {
long executeStreamingPartitionedUpdate(final Statement statement, Duration timeout) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
Callable<com.google.spanner.v1.ResultSet> callable =
new Callable<com.google.spanner.v1.ResultSet>() {
@Override
public com.google.spanner.v1.ResultSet call() throws Exception {
ByteString transactionId = initTransaction();
final ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(QueryMode.NORMAL)
.setSession(session.getName())
.setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
log.log(Level.FINER, "Starting PartitionedUpdate statement");
boolean foundStats = false;
long updateCount = 0L;
Duration remainingTimeout = timeout;
Stopwatch stopWatch = Stopwatch.createStarted();
try {
// Loop to catch AbortedExceptions.
while (true) {
ByteString resumeToken = ByteString.EMPTY;
try {
ByteString transactionId = initTransaction();
final ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(QueryMode.NORMAL)
.setSession(session.getName())
.setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
while (true) {
remainingTimeout =
remainingTimeout.minus(stopWatch.elapsed(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS);
try {
builder.setResumeToken(resumeToken);
ServerStream<PartialResultSet> stream =
rpc.executeStreamingPartitionedDml(
builder.build(), session.getOptions(), remainingTimeout);
for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !ByteString.EMPTY.equals(rs.getResumeToken())) {
resumeToken = rs.getResumeToken();
}
if (rs.hasStats()) {
foundStats = true;
updateCount += rs.getStats().getRowCountLowerBound();
}
}
break;
} catch (UnavailableException e) {
// Retry the stream in the same transaction if the stream breaks with
// UnavailableException and we have a resume token. Otherwise, we just retry the
// entire transaction.
if (resumeToken != null && !ByteString.EMPTY.equals(resumeToken)) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
log.log(
Level.FINER,
"Retrying PartitionedDml stream using resume token '"
+ resumeToken.toStringUtf8()
+ "' because of broken stream",
e);
} else {
throw new com.google.api.gax.rpc.AbortedException(
e, GrpcStatusCode.of(Code.ABORTED), true);
}
}
return rpc.executePartitionedDml(builder.build(), session.getOptions());
}
};
com.google.spanner.v1.ResultSet resultSet =
SpannerRetryHelper.runTxWithRetriesOnAborted(
callable, rpc.getPartitionedDmlRetrySettings());
if (!resultSet.hasStats()) {
throw new IllegalArgumentException(
"Partitioned DML response missing stats possibly due to non-DML statement as input");
break;
} catch (com.google.api.gax.rpc.AbortedException e) {
// Retry using a new transaction but with the same session if the transaction is aborted.
log.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
}
}
if (!foundStats) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"Partitioned DML response missing stats possibly due to non-DML statement as input");
}
log.log(Level.FINER, "Finished PartitionedUpdate statement");
return updateCount;
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
// For partitioned DML, using the row count lower bound.
return resultSet.getStats().getRowCountLowerBound();
}

@Override
Expand Down
Expand Up @@ -105,7 +105,8 @@ public String getName() {
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
return txn.executePartitionedUpdate(stmt);
return txn.executeStreamingPartitionedUpdate(
stmt, spanner.getOptions().getPartitionedDmlTimeout());
}

@Override
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.cloud.spanner.SpannerException.DoNotConstructDirectly;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
Expand Down Expand Up @@ -212,7 +213,14 @@ private static SpannerException newSpannerExceptionPreformatted(
}

private static SpannerException fromApiException(ApiException exception) {
Status.Code code = ((GrpcStatusCode) exception.getStatusCode()).getTransportCode();
Status.Code code;
if (exception.getStatusCode() instanceof GrpcStatusCode) {
code = ((GrpcStatusCode) exception.getStatusCode()).getTransportCode();
} else if (exception instanceof WatchdogTimeoutException) {
code = Status.Code.DEADLINE_EXCEEDED;
} else {
code = Status.Code.UNKNOWN;
}
ErrorCode errorCode = ErrorCode.fromGrpcStatus(Status.fromCode(code));
if (exception.getCause() != null) {
return SpannerExceptionFactory.newSpannerException(
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.rpc.OperationCallable;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -54,6 +55,7 @@
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand Down Expand Up @@ -356,6 +358,21 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(partitionedDmlRetrySettings);
// The stream watchdog will by default only check for a timeout every 10 seconds, so if the
// timeout is less than 10 seconds, it would be ignored for the first 10 seconds unless we
// also change the StreamWatchdogCheckInterval.
if (options
.getPartitionedDmlTimeout()
.dividedBy(10L)
.compareTo(pdmlSettings.getStreamWatchdogCheckInterval())
< 0) {
pdmlSettings.setStreamWatchdogCheckInterval(
options.getPartitionedDmlTimeout().dividedBy(10));
pdmlSettings.setStreamWatchdogProvider(
pdmlSettings
.getStreamWatchdogProvider()
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
}
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
Expand Down Expand Up @@ -1070,6 +1087,14 @@ public RetrySettings getPartitionedDmlRetrySettings() {
return partitionedDmlRetrySettings;
}

@Override
public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
GrpcCallContext context = newCallContext(options, request.getSession());
context = context.withStreamWaitTimeout(timeout);
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
}

@Override
public StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
Expand Down Expand Up @@ -286,6 +288,9 @@ StreamingCall read(

RetrySettings getPartitionedDmlRetrySettings();

ServerStream<PartialResultSet> executeStreamingPartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Expand Up @@ -169,7 +169,7 @@ public void testExecutePartitionedDmlAborted() {
* A valid query that returns a {@link ResultSet} should not be accepted by a partitioned dml
* transaction.
*/
@Test(expected = IllegalArgumentException.class)
@Test(expected = SpannerException.class)
public void testExecutePartitionedDmlWithQuery() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
Expand Down Expand Up @@ -234,19 +234,19 @@ public Void run(TransactionContext transaction) throws Exception {

@Test
public void testPartitionedDmlWithLowerTimeout() throws Exception {
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set PDML timeout value.
builder.setPartitionedDmlTimeout(Duration.ofMillis(100L));
builder.setPartitionedDmlTimeout(Duration.ofMillis(10L));
try (Spanner spanner = builder.build().getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
assertThat(spanner.getOptions().getPartitionedDmlTimeout())
.isEqualTo(Duration.ofMillis(100L));
assertThat(spanner.getOptions().getPartitionedDmlTimeout()).isEqualTo(Duration.ofMillis(10L));
// PDML should timeout with these settings.
try {
client.executePartitionedUpdate(UPDATE_STATEMENT);
Expand Down Expand Up @@ -275,7 +275,8 @@ public Long run(TransactionContext transaction) throws Exception {

@Test
public void testPartitionedDmlWithHigherTimeout() throws Exception {
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
Expand Down Expand Up @@ -307,6 +308,7 @@ public void testPartitionedDmlWithHigherTimeout() throws Exception {
long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT);

// Normal DML should timeout as it should use the ExecuteSQL RPC settings.
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
try {
client
.readWriteTransaction()
Expand Down