diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 5faa1d6515..8a6ac6f066 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -159,4 +159,16 @@ com.google.longrunning.Operation getOperation(java.lang.String) + + + 7004 + com/google/cloud/spanner/spi/v1/SpannerRpc + com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration) + + + 7004 + com/google/cloud/spanner/spi/v1/GapicSpannerRpc + com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration) + + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java index ded74ce85a..1c67a7d75c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java @@ -29,7 +29,6 @@ import com.google.spanner.v1.TransactionSelector; import java.util.Map; import java.util.concurrent.Callable; -import org.threeten.bp.Duration; /** Partitioned DML transaction for bulk updates and deletes. */ class PartitionedDMLTransaction implements SessionTransaction { @@ -63,7 +62,7 @@ private ByteString initTransaction() { * Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the * transaction was aborted. */ - long executePartitionedUpdate(final Statement statement, final Duration timeout) { + long executePartitionedUpdate(final Statement statement) { checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); Callable callable = new Callable() { @@ -84,7 +83,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception { builder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); } } - return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout); + return rpc.executePartitionedDml(builder.build(), session.getOptions()); } }; com.google.spanner.v1.ResultSet resultSet = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 015e1862d6..d1de6e204f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -105,7 +105,7 @@ public String getName() { public long executePartitionedUpdate(Statement stmt) { setActive(null); PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc()); - return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout()); + return txn.executePartitionedUpdate(stmt); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 181a09d6c7..001ffc1239 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -19,6 +19,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; import com.google.api.core.NanoClock; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; @@ -54,6 +55,7 @@ import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; import com.google.cloud.spanner.v1.stub.GrpcSpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStub; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.MoreObjects; @@ -153,6 +155,7 @@ import org.threeten.bp.Duration; /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ +@InternalApi public class GapicSpannerRpc implements SpannerRpc { /** * {@link ExecutorProvider} that keeps track of the executors that are created and shuts these @@ -207,6 +210,7 @@ private synchronized void shutdown() { private final ManagedInstantiatingExecutorProvider executorProvider; private boolean rpcIsClosed; private final SpannerStub spannerStub; + private final SpannerStub partitionedDmlStub; private final InstanceAdminStub instanceAdminStub; private final DatabaseAdminStubSettings databaseAdminStubSettings; private final DatabaseAdminStub databaseAdminStub; @@ -326,6 +330,22 @@ public GapicSpannerRpc(final SpannerOptions options) { .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) .build()); + SpannerStubSettings.Builder pdmlSettings = options.getSpannerStubSettings().toBuilder(); + pdmlSettings + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) + .executeSqlSettings() + .setRetrySettings( + options + .getSpannerStubSettings() + .executeSqlSettings() + .getRetrySettings() + .toBuilder() + .setInitialRpcTimeout(options.getPartitionedDmlTimeout()) + .setMaxRpcTimeout(options.getPartitionedDmlTimeout()) + .build()); + this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build()); this.instanceAdminStub = GrpcInstanceAdminStub.create( @@ -1029,9 +1049,9 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options, Duration timeout) { - GrpcCallContext context = newCallContext(options, request.getSession(), timeout); - return get(spannerStub.executeSqlCallable().futureCall(request, context)); + ExecuteSqlRequest request, @Nullable Map options) { + GrpcCallContext context = newCallContext(options, request.getSession()); + return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context)); } @Override @@ -1191,19 +1211,11 @@ private static T get(final Future future) throws SpannerException { @VisibleForTesting GrpcCallContext newCallContext(@Nullable Map options, String resource) { - return newCallContext(options, resource, null); - } - - private GrpcCallContext newCallContext( - @Nullable Map options, String resource, Duration timeout) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); - if (timeout != null) { - context = context.withTimeout(timeout); - } if (callCredentialsProvider != null) { CallCredentials callCredentials = callCredentialsProvider.getCallCredentials(); if (callCredentials != null) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 497be948cc..753d97b87e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -57,7 +57,6 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** * Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use @@ -282,8 +281,7 @@ StreamingCall read( ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options); - ResultSet executePartitionedDml( - ExecuteSqlRequest request, @Nullable Map options, Duration timeout); + ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map options); StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 8bfdd305fa..3b61e0cb92 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -17,9 +17,6 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; import com.google.api.gax.grpc.testing.LocalChannelProvider; @@ -154,7 +151,7 @@ public void testExecutePartitionedDml() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); - assertThat(updateCount, is(equalTo(UPDATE_COUNT))); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); } /** {@link AbortedException} should automatically be retried. */ @@ -164,7 +161,7 @@ public void testExecutePartitionedDmlAborted() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); mockSpanner.abortNextTransaction(); long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); - assertThat(updateCount, is(equalTo(UPDATE_COUNT))); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); } /** @@ -207,12 +204,11 @@ public void testPartitionedDmlDoesNotTimeout() throws Exception { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - assertThat( - spanner.getOptions().getPartitionedDmlTimeout(), is(equalTo(Duration.ofHours(2L)))); + assertThat(spanner.getOptions().getPartitionedDmlTimeout()).isEqualTo(Duration.ofHours(2L)); // PDML should not timeout with these settings. long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); - assertThat(updateCount, is(equalTo(UPDATE_COUNT))); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); // Normal DML should timeout. try { @@ -236,7 +232,7 @@ public Void run(TransactionContext transaction) throws Exception { } @Test - public void testPartitionedDmlWithTimeout() throws Exception { + public void testPartitionedDmlWithLowerTimeout() throws Exception { mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0)); SpannerOptions.Builder builder = SpannerOptions.newBuilder() @@ -248,8 +244,8 @@ public void testPartitionedDmlWithTimeout() throws Exception { try (Spanner spanner = builder.build().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - assertThat( - spanner.getOptions().getPartitionedDmlTimeout(), is(equalTo(Duration.ofMillis(100L)))); + assertThat(spanner.getOptions().getPartitionedDmlTimeout()) + .isEqualTo(Duration.ofMillis(100L)); // PDML should timeout with these settings. try { client.executePartitionedUpdate(UPDATE_STATEMENT); @@ -272,7 +268,76 @@ public Long run(TransactionContext transaction) throws Exception { return transaction.executeUpdate(UPDATE_STATEMENT); } }); - assertThat(updateCount, is(equalTo(UPDATE_COUNT))); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); + } + } + + @Test + public void testPartitionedDmlWithHigherTimeout() throws Exception { + mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0)); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + // Set PDML timeout value to a value that should allow the statement to be executed. + builder.setPartitionedDmlTimeout(Duration.ofMillis(5000L)); + // Set the ExecuteSql RPC timeout value to a value lower than the time needed to execute the + // statement. The higher timeout value that is set above should be respected, and the value for + // the ExecuteSQL RPC should be ignored specifically for Partitioned DML. + builder + .getSpannerStubSettingsBuilder() + .executeSqlSettings() + .setRetrySettings( + builder + .getSpannerStubSettingsBuilder() + .executeSqlSettings() + .getRetrySettings() + .toBuilder() + .setInitialRpcTimeout(Duration.ofMillis(10L)) + .setMaxRpcTimeout(Duration.ofMillis(10L)) + .setInitialRetryDelay(Duration.ofMillis(1L)) + .setMaxRetryDelay(Duration.ofMillis(1L)) + .build()); + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // PDML should not timeout with these settings. + long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); + + // Normal DML should timeout as it should use the ExecuteSQL RPC settings. + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(UPDATE_STATEMENT); + } + }); + fail("missing expected DEADLINE_EXCEEDED exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED); + } + assertThat(updateCount).isEqualTo(UPDATE_COUNT); + } + } + + @Test + public void testPartitionedDmlRetriesOnUnavailable() throws Exception { + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); + assertThat(updateCount).isEqualTo(UPDATE_COUNT); } } @@ -312,8 +377,8 @@ public void testDatabaseOrInstanceDoesNotExistOnPrepareSession() throws Exceptio && dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) { Thread.sleep(1L); } - assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); - assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared()).isEqualTo(0); + assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions()).isEqualTo(0); int currentNumRequest = mockSpanner.getRequests().size(); try { dbClient @@ -365,8 +430,8 @@ public void testDatabaseOrInstanceDoesNotExistOnInitialization() throws Exceptio Thread.sleep(1L); } // All session creation should fail and stop trying. - assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0))); - assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsInPool()).isEqualTo(0); + assertThat(dbClient.pool.getNumberOfSessionsBeingCreated()).isEqualTo(0); mockSpanner.reset(); mockSpanner.removeAllExecutionTimes(); } @@ -448,8 +513,8 @@ public void testDatabaseOrInstanceDoesNotExistOnReplenish() throws Exception { Thread.sleep(1L); } // All session creation should fail and stop trying. - assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0))); - assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsInPool()).isEqualTo(0); + assertThat(dbClient.pool.getNumberOfSessionsBeingCreated()).isEqualTo(0); // Force a maintainer run. This should schedule new session creation. dbClient.pool.poolMaintainer.maintainPool(); // Wait until the replenish has finished. @@ -459,8 +524,8 @@ public void testDatabaseOrInstanceDoesNotExistOnReplenish() throws Exception { Thread.sleep(1L); } // All session creation from replenishPool should fail and stop trying. - assertThat(dbClient.pool.getNumberOfSessionsInPool(), is(equalTo(0))); - assertThat(dbClient.pool.getNumberOfSessionsBeingCreated(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsInPool()).isEqualTo(0); + assertThat(dbClient.pool.getNumberOfSessionsBeingCreated()).isEqualTo(0); } mockSpanner.reset(); mockSpanner.removeAllExecutionTimes(); @@ -504,8 +569,8 @@ private void testExceptionOnPrepareSession(StatusRuntimeException exception) && dbClient.pool.getNumberOfSessionsBeingPrepared() > 0) { Thread.sleep(1L); } - assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); - assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions(), is(equalTo(0))); + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared()).isEqualTo(0); + assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions()).isEqualTo(0); try { dbClient .readWriteTransaction() @@ -518,7 +583,7 @@ public Void run(TransactionContext transaction) throws Exception { }); fail(String.format("missing expected %s exception", exception.getStatus().getCode().name())); } catch (SpannerException e) { - assertThat(e.getErrorCode(), is(equalTo(ErrorCode.fromGrpcStatus(exception.getStatus())))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.fromGrpcStatus(exception.getStatus())); } // Remove the semi-permanent error condition. Getting a read/write transaction should now // succeed, and the automatic preparing of sessions should be restarted. @@ -545,10 +610,9 @@ public Void run(TransactionContext transaction) throws Exception { && dbClient.pool.getNumberOfAvailableWritePreparedSessions() < expectedPreparedSessions) { Thread.sleep(1L); } - assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared(), is(equalTo(0))); - assertThat( - dbClient.pool.getNumberOfAvailableWritePreparedSessions(), - is(equalTo(expectedPreparedSessions))); + assertThat(dbClient.pool.getNumberOfSessionsBeingPrepared()).isEqualTo(0); + assertThat(dbClient.pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo(expectedPreparedSessions); } /** @@ -664,7 +728,7 @@ public void testAllowNestedTransactions() throws InterruptedException { && client.pool.getNumberOfSessionsInPool() < minSessions) { Thread.sleep(1L); } - assertThat(client.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); + assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); Long res = client .readWriteTransaction() @@ -673,13 +737,12 @@ public void testAllowNestedTransactions() throws InterruptedException { new TransactionCallable() { @Override public Long run(TransactionContext transaction) throws Exception { - assertThat( - client.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions - 1))); + assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); return transaction.executeUpdate(UPDATE_STATEMENT); } }); - assertThat(res, is(equalTo(UPDATE_COUNT))); - assertThat(client.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); + assertThat(res).isEqualTo(UPDATE_COUNT); + assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); } @Test @@ -699,8 +762,8 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio || client2.pool.getNumberOfSessionsInPool() < minSessions)) { Thread.sleep(1L); } - assertThat(client1.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); - assertThat(client2.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); + assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); Long res = client1 .readWriteTransaction() @@ -711,9 +774,8 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio public Long run(TransactionContext transaction) throws Exception { // Client1 should have 1 session checked out. // Client2 should have 0 sessions checked out. - assertThat( - client1.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions - 1))); - assertThat(client2.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); + assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); Long add = client2 .readWriteTransaction() @@ -722,12 +784,10 @@ public Long run(TransactionContext transaction) throws Exception { @Override public Long run(TransactionContext transaction) throws Exception { // Both clients should now have 1 session checked out. - assertThat( - client1.pool.getNumberOfSessionsInPool(), - is(equalTo(minSessions - 1))); - assertThat( - client2.pool.getNumberOfSessionsInPool(), - is(equalTo(minSessions - 1))); + assertThat(client1.pool.getNumberOfSessionsInPool()) + .isEqualTo(minSessions - 1); + assertThat(client2.pool.getNumberOfSessionsInPool()) + .isEqualTo(minSessions - 1); try (ResultSet rs = transaction.executeQuery(SELECT1)) { if (rs.next()) { return rs.getLong(0); @@ -744,10 +804,10 @@ public Long run(TransactionContext transaction) throws Exception { } } }); - assertThat(res, is(equalTo(2L))); + assertThat(res).isEqualTo(2L); // All sessions should now be checked back in to the pools. - assertThat(client1.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); - assertThat(client2.pool.getNumberOfSessionsInPool(), is(equalTo(minSessions))); + assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); } @Test