From 4669c02a24e0f7b1d53c9edf5ab7b146b4116960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 13 Apr 2020 11:00:12 +0200 Subject: [PATCH] fix: retry non-idempotent long-running RPCs (#141) RPCs returning a long-running operation, such as CreateDatabase, CreateBackup and RestoreDatabase, are non-idempotent and cannot be retried automatically by gax. This means that these RPCs sometimes fail with transient errors, such as UNAVAILABLE or DEADLINE_EXCEEDED. This change introduces automatic retries of these RPCs using the following logic: 1. Execute the RPC and wait for the operation to be returned. 2. If a transient error occurs while waiting for the operation, the client library queries the backend for the corresponding operation. If the operation is found, the resumes the tracking of the existing operation and returns that to the user. 3. If no corresponding operation is found in step 2, the client library retries the RPC from step 1. Fixes https://github.com/GoogleCloudPlatform/java-docs-samples/issues/2571 --- .../google/cloud/spanner/SpannerOptions.java | 5 +- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 336 ++++++++++++++- .../spanner/DatabaseAdminClientTest.java | 200 ++++++++- .../cloud/spanner/DatabaseAdminGaxTest.java | 56 --- .../spanner/MockDatabaseAdminServiceImpl.java | 392 ++++++++++++------ .../cloud/spanner/MockSpannerServiceImpl.java | 2 +- .../cloud/spanner/it/ITDatabaseAdminTest.java | 234 +++++++++++ 7 files changed, 1001 insertions(+), 224 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 32dc3b7157..84a1e96049 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -21,6 +21,7 @@ import com.google.api.gax.longrunning.OperationSnapshot; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.NoCredentials; @@ -291,7 +292,8 @@ private Builder() { .setRetrySettings(longRunningRetrySettings); databaseAdminStubSettingsBuilder .updateBackupSettings() - .setRetrySettings(longRunningRetrySettings); + .setRetrySettings(longRunningRetrySettings) + .setRetryableCodes(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE); } Builder(SpannerOptions options) { @@ -581,6 +583,7 @@ public Builder setEmulatorHost(String emulatorHost) { return this; } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { // Set the host of emulator has been set. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index de1a09158c..2719724b83 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -27,30 +27,38 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.OperationCallable; import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.api.pathtemplate.PathTemplate; +import com.google.cloud.RetryHelper; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; +import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub; import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub; import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; import com.google.cloud.spanner.v1.stub.GrpcSpannerStub; import com.google.cloud.spanner.v1.stub.SpannerStub; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.iam.v1.GetIamPolicyRequest; @@ -63,6 +71,9 @@ import com.google.longrunning.Operation; import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Timestamp; import com.google.spanner.admin.database.v1.Backup; import com.google.spanner.admin.database.v1.CreateBackupMetadata; import com.google.spanner.admin.database.v1.CreateBackupRequest; @@ -123,9 +134,12 @@ import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -134,6 +148,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -193,6 +208,7 @@ private synchronized void shutdown() { private boolean rpcIsClosed; private final SpannerStub spannerStub; private final InstanceAdminStub instanceAdminStub; + private final DatabaseAdminStubSettings databaseAdminStubSettings; private final DatabaseAdminStub databaseAdminStub; private final String projectId; private final String projectName; @@ -321,20 +337,173 @@ public GapicSpannerRpc(final SpannerOptions options) { .setStreamWatchdogProvider(watchdogProvider) .build()); - this.databaseAdminStub = - GrpcDatabaseAdminStub.create( - options - .getDatabaseAdminStubSettings() - .toBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .setStreamWatchdogProvider(watchdogProvider) - .build()); + this.databaseAdminStubSettings = + options + .getDatabaseAdminStubSettings() + .toBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) + .build(); + this.databaseAdminStub = GrpcDatabaseAdminStub.create(this.databaseAdminStubSettings); } catch (Exception e) { throw newSpannerException(e); } } + private static final class OperationFutureRetryAlgorithm + implements ResultRetryAlgorithm> { + private static final ImmutableList RETRYABLE_CODES = + ImmutableList.of(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE); + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, + OperationFuture prevResponse, + TimedAttemptSettings prevSettings) { + // Use default retry settings. + return null; + } + + @Override + public boolean shouldRetry( + Throwable prevThrowable, OperationFuture prevResponse) + throws CancellationException { + if (prevThrowable instanceof ApiException) { + ApiException e = (ApiException) prevThrowable; + return RETRYABLE_CODES.contains(e.getStatusCode().getCode()); + } + if (prevResponse != null) { + try { + prevResponse.getInitialFuture().get(); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof ApiException) { + ApiException e = (ApiException) cause; + return RETRYABLE_CODES.contains(e.getStatusCode().getCode()); + } + } catch (InterruptedException e) { + return false; + } + } + return false; + } + } + + private final class OperationFutureCallable + implements Callable> { + final OperationCallable operationCallable; + final RequestT initialRequest; + final String instanceName; + final OperationsLister lister; + final Function getStartTimeFunction; + Timestamp initialCallTime; + boolean isRetry = false; + + OperationFutureCallable( + OperationCallable operationCallable, + RequestT initialRequest, + String instanceName, + OperationsLister lister, + Function getStartTimeFunction) { + this.operationCallable = operationCallable; + this.initialRequest = initialRequest; + this.instanceName = instanceName; + this.lister = lister; + this.getStartTimeFunction = getStartTimeFunction; + } + + @Override + public OperationFuture call() throws Exception { + acquireAdministrativeRequestsRateLimiter(); + + String operationName = null; + if (isRetry) { + // Query the backend to see if the operation was actually created, and that the + // problem was caused by a network problem or other transient problem client side. + Operation operation = mostRecentOperation(lister, getStartTimeFunction, initialCallTime); + if (operation != null) { + // Operation found, resume tracking that operation. + operationName = operation.getName(); + } + } else { + initialCallTime = + Timestamp.newBuilder() + .setSeconds( + TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)) + .build(); + } + isRetry = true; + + if (operationName == null) { + GrpcCallContext context = newCallContext(null, instanceName); + return operationCallable.futureCall(initialRequest, context); + } else { + return operationCallable.resumeFutureCall(operationName); + } + } + } + + private interface OperationsLister { + Paginated listOperations(String nextPageToken); + } + + private Operation mostRecentOperation( + OperationsLister lister, + Function getStartTimeFunction, + Timestamp initialCallTime) + throws InvalidProtocolBufferException { + Operation res = null; + Timestamp currMaxStartTime = null; + String nextPageToken = null; + Paginated operations; + do { + operations = lister.listOperations(nextPageToken); + for (Operation op : operations.getResults()) { + Timestamp startTime = getStartTimeFunction.apply(op); + if (res == null + || (TimestampComparator.INSTANCE.compare(startTime, currMaxStartTime) > 0 + && TimestampComparator.INSTANCE.compare(startTime, initialCallTime) >= 0)) { + currMaxStartTime = startTime; + res = op; + } + // If the operation does not report any start time, then the operation that is not yet done + // is the one that is the most recent. + if (startTime == null && currMaxStartTime == null && !op.getDone()) { + res = op; + break; + } + } + } while (operations.getNextPageToken() != null); + return res; + } + + private static final class TimestampComparator implements Comparator { + private static final TimestampComparator INSTANCE = new TimestampComparator(); + + @Override + public int compare(Timestamp t1, Timestamp t2) { + if (t1 == null && t2 == null) { + return 0; + } + if (t1 != null && t2 == null) { + return 1; + } + if (t1 == null && t2 != null) { + return -1; + } + if (t1.getSeconds() > t2.getSeconds() + || (t1.getSeconds() == t2.getSeconds() && t1.getNanos() > t2.getNanos())) { + return 1; + } + if (t1.getSeconds() < t2.getSeconds() + || (t1.getSeconds() == t2.getSeconds() && t1.getNanos() < t2.getNanos())) { + return -1; + } + return 0; + } + } + private void acquireAdministrativeRequestsRateLimiter() { if (throttleAdministrativeRequests) { RateLimiter limiter = ADMINISTRATIVE_REQUESTS_RATE_LIMITERS.get(this.projectName); @@ -508,17 +677,66 @@ public Paginated listDatabases( @Override public OperationFuture createDatabase( - String instanceName, String createDatabaseStatement, Iterable additionalStatements) + final String instanceName, + String createDatabaseStatement, + Iterable additionalStatements) throws SpannerException { - acquireAdministrativeRequestsRateLimiter(); + final String databaseId = + createDatabaseStatement.substring( + "CREATE DATABASE `".length(), createDatabaseStatement.length() - 1); CreateDatabaseRequest request = CreateDatabaseRequest.newBuilder() .setParent(instanceName) .setCreateStatement(createDatabaseStatement) .addAllExtraStatements(additionalStatements) .build(); - GrpcCallContext context = newCallContext(null, instanceName); - return databaseAdminStub.createDatabaseOperationCallable().futureCall(request, context); + + OperationFutureCallable callable = + new OperationFutureCallable( + databaseAdminStub.createDatabaseOperationCallable(), + request, + instanceName, + new OperationsLister() { + @Override + public Paginated listOperations(String nextPageToken) { + return listDatabaseOperations( + instanceName, + 0, + String.format( + "(name:%s/operations/) AND (metadata.@type:type.googleapis.com/%s)", + String.format("%s/databases/%s", instanceName, databaseId), + CreateDatabaseMetadata.getDescriptor().getFullName()), + nextPageToken); + } + }, + new Function() { + @Override + public Timestamp apply(Operation input) { + if (input.getDone() && input.hasResponse()) { + try { + Timestamp createTime = + input.getResponse().unpack(Database.class).getCreateTime(); + if (Timestamp.getDefaultInstance().equals(createTime)) { + // Create time was not returned by the server (proto objects never return + // null, instead they return the default instance). Return null from this + // method to indicate that there is no known create time. + return null; + } + } catch (InvalidProtocolBufferException e) { + return null; + } + } + return null; + } + }); + return RetryHelper.runWithRetries( + callable, + databaseAdminStubSettings + .createDatabaseOperationSettings() + .getInitialCallSettings() + .getRetrySettings(), + new OperationFutureRetryAlgorithm<>(), + NanoClock.getDefaultClock()); } @Override @@ -584,30 +802,106 @@ public List getDatabaseDdl(String databaseName) throws SpannerException @Override public OperationFuture createBackup( - String instanceName, String backupId, Backup backup) throws SpannerException { - acquireAdministrativeRequestsRateLimiter(); + final String instanceName, final String backupId, final Backup backup) + throws SpannerException { CreateBackupRequest request = CreateBackupRequest.newBuilder() .setParent(instanceName) .setBackupId(backupId) .setBackup(backup) .build(); - GrpcCallContext context = newCallContext(null, instanceName); - return databaseAdminStub.createBackupOperationCallable().futureCall(request, context); + OperationFutureCallable callable = + new OperationFutureCallable( + databaseAdminStub.createBackupOperationCallable(), + request, + instanceName, + new OperationsLister() { + @Override + public Paginated listOperations(String nextPageToken) { + return listBackupOperations( + instanceName, + 0, + String.format( + "(metadata.name:%s) AND (metadata.@type:type.googleapis.com/%s)", + String.format("%s/backups/%s", instanceName, backupId), + CreateBackupMetadata.getDescriptor().getFullName()), + nextPageToken); + } + }, + new Function() { + @Override + public Timestamp apply(Operation input) { + try { + return input + .getMetadata() + .unpack(CreateBackupMetadata.class) + .getProgress() + .getStartTime(); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + }); + return RetryHelper.runWithRetries( + callable, + databaseAdminStubSettings + .createBackupOperationSettings() + .getInitialCallSettings() + .getRetrySettings(), + new OperationFutureRetryAlgorithm<>(), + NanoClock.getDefaultClock()); } @Override public final OperationFuture restoreDatabase( - String databaseInstanceName, String databaseId, String backupName) { - acquireAdministrativeRequestsRateLimiter(); + final String databaseInstanceName, final String databaseId, String backupName) { RestoreDatabaseRequest request = RestoreDatabaseRequest.newBuilder() .setParent(databaseInstanceName) .setDatabaseId(databaseId) .setBackup(backupName) .build(); - GrpcCallContext context = newCallContext(null, databaseInstanceName); - return databaseAdminStub.restoreDatabaseOperationCallable().futureCall(request, context); + + OperationFutureCallable callable = + new OperationFutureCallable( + databaseAdminStub.restoreDatabaseOperationCallable(), + request, + databaseInstanceName, + new OperationsLister() { + @Override + public Paginated listOperations(String nextPageToken) { + return listDatabaseOperations( + databaseInstanceName, + 0, + String.format( + "(metadata.name:%s) AND (metadata.@type:type.googleapis.com/%s)", + String.format("%s/databases/%s", databaseInstanceName, databaseId), + RestoreDatabaseMetadata.getDescriptor().getFullName()), + nextPageToken); + } + }, + new Function() { + @Override + public Timestamp apply(Operation input) { + try { + return input + .getMetadata() + .unpack(RestoreDatabaseMetadata.class) + .getProgress() + .getStartTime(); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + }); + return RetryHelper.runWithRetries( + callable, + databaseAdminStubSettings + .restoreDatabaseOperationSettings() + .getInitialCallSettings() + .getRetrySettings(), + new OperationFutureRetryAlgorithm<>(), + NanoClock.getDefaultClock()); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java index 38bcd8437d..4d3abc10fd 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientTest.java @@ -19,29 +19,37 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertThat; -import com.google.api.gax.grpc.testing.LocalChannelProvider; -import com.google.api.gax.grpc.testing.MockGrpcService; -import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.core.ApiFunction; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.longrunning.OperationSnapshot; import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.paging.Page; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.Identity; import com.google.cloud.NoCredentials; import com.google.cloud.Policy; import com.google.cloud.Role; import com.google.cloud.Timestamp; import com.google.cloud.spanner.DatabaseInfo.State; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.longrunning.Operation; import com.google.protobuf.InvalidProtocolBufferException; import com.google.spanner.admin.database.v1.CreateBackupMetadata; +import com.google.spanner.admin.database.v1.CreateBackupRequest; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import com.google.spanner.admin.database.v1.CreateDatabaseRequest; import com.google.spanner.admin.database.v1.OptimizeRestoredDatabaseMetadata; import com.google.spanner.admin.database.v1.RestoreDatabaseMetadata; +import com.google.spanner.admin.database.v1.RestoreDatabaseRequest; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -104,8 +112,9 @@ public void describeTo(Description description) { private static MockOperationsServiceImpl mockOperations; private static MockDatabaseAdminServiceImpl mockDatabaseAdmin; - private static MockServiceHelper serviceHelper; - private LocalChannelProvider channelProvider; + private static Server server; + private static InetSocketAddress address; + private Spanner spanner; private DatabaseAdminClient client; @Rule public ExpectedException exception = ExpectedException.none(); @@ -114,25 +123,48 @@ public void describeTo(Description description) { private OperationFuture restoreDatabaseOperation; @BeforeClass - public static void startStaticServer() { + public static void startStaticServer() throws Exception { mockOperations = new MockOperationsServiceImpl(); mockDatabaseAdmin = new MockDatabaseAdminServiceImpl(mockOperations); - serviceHelper = - new MockServiceHelper( - "in-process-1", Arrays.asList(mockOperations, mockDatabaseAdmin)); - serviceHelper.start(); + // This test uses a NettyServer to properly test network and timeout issues. + address = new InetSocketAddress("localhost", 0); + server = + NettyServerBuilder.forAddress(address) + .addService(mockOperations) + .addService(mockDatabaseAdmin) + .build() + .start(); } @AfterClass - public static void stopServer() { - serviceHelper.stop(); + public static void stopServer() throws Exception { + server.shutdown(); + server.awaitTermination(); } + @SuppressWarnings("rawtypes") @Before public void setUp() throws IOException { - serviceHelper.reset(); - channelProvider = serviceHelper.createChannelProvider(); + mockDatabaseAdmin.reset(); + mockOperations.reset(); SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + RetrySettings longRunningInitialRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(60L)) + .setMaxRpcTimeout(Duration.ofMillis(600L)) + .setInitialRetryDelay(Duration.ofMillis(20L)) + .setMaxRetryDelay(Duration.ofMillis(45L)) + .setRetryDelayMultiplier(1.5) + .setRpcTimeoutMultiplier(1.5) + .setTotalTimeout(Duration.ofMinutes(48L)) + .build(); + builder + .getDatabaseAdminStubSettingsBuilder() + .createBackupOperationSettings() + .setInitialCallSettings( + UnaryCallSettings.newUnaryCallSettingsBuilder() + .setRetrySettings(longRunningInitialRetrySettings) + .build()); builder .getDatabaseAdminStubSettingsBuilder() .createBackupOperationSettings() @@ -148,6 +180,15 @@ public void setUp() throws IOException { .setRetryDelayMultiplier(1.3) .setRpcTimeoutMultiplier(1.3) .build())); + + builder + .getDatabaseAdminStubSettingsBuilder() + .createDatabaseOperationSettings() + .setInitialCallSettings( + UnaryCallSettings + .newUnaryCallSettingsBuilder() + .setRetrySettings(longRunningInitialRetrySettings) + .build()); builder .getDatabaseAdminStubSettingsBuilder() .createDatabaseOperationSettings() @@ -163,6 +204,14 @@ public void setUp() throws IOException { .setRetryDelayMultiplier(1.3) .setRpcTimeoutMultiplier(1.3) .build())); + builder + .getDatabaseAdminStubSettingsBuilder() + .restoreDatabaseOperationSettings() + .setInitialCallSettings( + UnaryCallSettings + .newUnaryCallSettingsBuilder() + .setRetrySettings(longRunningInitialRetrySettings) + .build()); builder .getDatabaseAdminStubSettingsBuilder() .restoreDatabaseOperationSettings() @@ -180,7 +229,14 @@ public void setUp() throws IOException { .build())); spanner = builder - .setChannelProvider(channelProvider) + .setHost("http://localhost:" + server.getPort()) + .setChannelConfigurator( + new ApiFunction() { + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder input) { + return input.usePlaintext(); + } + }) .setCredentials(NoCredentials.getInstance()) .setProjectId(PROJECT_ID) .build() @@ -193,7 +249,9 @@ public void setUp() throws IOException { @After public void tearDown() throws Exception { - serviceHelper.reset(); + mockDatabaseAdmin.reset(); + mockDatabaseAdmin.removeAllExecutionTimes(); + mockOperations.reset(); spanner.close(); } @@ -762,4 +820,114 @@ private void restoreTestBackup() { throw SpannerExceptionFactory.newSpannerException(e); } } + + @Test + public void retryCreateBackupSlowResponse() throws Exception { + // Throw a DEADLINE_EXCEEDED after the operation has been created. This should cause the retry + // to pick up the existing operation. + mockDatabaseAdmin.setCreateBackupResponseExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String backupId = "other-backup-id"; + OperationFuture op = + client.createBackup(INSTANCE_ID, backupId, DB_ID, after7Days()); + Backup backup = op.get(); + assertThat(backup.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/backups/%s", PROJECT_ID, INSTANCE_ID, backupId)); + assertThat(client.getBackup(INSTANCE_ID, backupId)).isEqualTo(backup); + // There should be at exactly 2 requests. One from this test case and one from the setup of the + // test which also creates a test backup. + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateBackupRequest.class)).isEqualTo(2); + } + + @Test + public void retryCreateBackupSlowStartup() throws Exception { + mockDatabaseAdmin.setCreateBackupStartupExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String backupId = "other-backup-id"; + OperationFuture op = + client.createBackup(INSTANCE_ID, backupId, DB_ID, after7Days()); + Backup backup = op.get(); + assertThat(backup.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/backups/%s", PROJECT_ID, INSTANCE_ID, backupId)); + assertThat(client.getBackup(INSTANCE_ID, backupId)).isEqualTo(backup); + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateBackupRequest.class)).isAtLeast(3); + } + + @Test + public void retryCreateDatabaseSlowResponse() throws Exception { + // Throw a DEADLINE_EXCEEDED after the operation has been created. This should cause the retry + // to pick up the existing operation. + mockDatabaseAdmin.setCreateDatabaseResponseExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.createDatabase(INSTANCE_ID, databaseId, Collections.emptyList()); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + assertThat(client.getDatabase(INSTANCE_ID, databaseId)).isEqualTo(database); + // There should be at exactly 2 requests. One from this test case and one from the setup of the + // test which also creates a test database. + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateDatabaseRequest.class)).isEqualTo(2); + } + + @Test + public void retryCreateDatabaseSlowStartup() throws Exception { + mockDatabaseAdmin.setCreateDatabaseStartupExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.createDatabase(INSTANCE_ID, databaseId, Collections.emptyList()); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + assertThat(client.getDatabase(INSTANCE_ID, databaseId)).isEqualTo(database); + assertThat(mockDatabaseAdmin.countRequestsOfType(CreateDatabaseRequest.class)).isAtLeast(3); + } + + @Test + public void retryRestoreDatabaseSlowResponse() throws Exception { + // Throw a DEADLINE_EXCEEDED after the operation has been created. This should cause the retry + // to pick up the existing operation. + mockDatabaseAdmin.setRestoreDatabaseResponseExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.restoreDatabase(INSTANCE_ID, BCK_ID, INSTANCE_ID, databaseId); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + Database retrieved = client.getDatabase(INSTANCE_ID, databaseId); + assertThat(retrieved.getCreateTime()).isEqualTo(database.getCreateTime()); + // There should be exactly 2 requests. One from this test case and one from the setup of the + // test which also restores a test database. + assertThat(mockDatabaseAdmin.countRequestsOfType(RestoreDatabaseRequest.class)).isEqualTo(2); + } + + @Test + public void retryRestoreDatabaseSlowStartup() throws Exception { + mockDatabaseAdmin.setRestoreDatabaseStartupExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + final String databaseId = "other-database-id"; + OperationFuture op = + client.restoreDatabase(INSTANCE_ID, BCK_ID, INSTANCE_ID, databaseId); + Database database = op.get(); + assertThat(database.getId().getName()) + .isEqualTo( + String.format( + "projects/%s/instances/%s/databases/%s", PROJECT_ID, INSTANCE_ID, databaseId)); + Database retrieved = client.getDatabase(INSTANCE_ID, databaseId); + assertThat(retrieved.getCreateTime()).isEqualTo(database.getCreateTime()); + assertThat(mockDatabaseAdmin.countRequestsOfType(RestoreDatabaseRequest.class)).isAtLeast(3); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java index d24d136448..f635a0757a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java @@ -16,8 +16,6 @@ package com.google.cloud.spanner; -import static org.junit.Assert.fail; - import com.google.api.core.ApiFunction; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.longrunning.OperationFuture; @@ -32,7 +30,6 @@ import com.google.protobuf.AbstractMessage; import com.google.protobuf.Any; import com.google.protobuf.Empty; -import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import com.google.spanner.admin.database.v1.DatabaseName; import com.google.spanner.admin.database.v1.ListDatabasesRequest; import com.google.spanner.admin.database.v1.ListDatabasesResponse; @@ -381,59 +378,6 @@ public void getDatabaseTest() { Assert.assertEquals(2, actualRequests.size()); } - @Test - public void createDatabaseTest() throws Exception { - Exception exception = setupException(); - DatabaseName name = DatabaseName.of(PROJECT, INSTANCE, "DATABASE"); - com.google.spanner.admin.database.v1.Database expectedResponse = - com.google.spanner.admin.database.v1.Database.newBuilder().setName(name.toString()).build(); - com.google.longrunning.Operation resultOperation = - com.google.longrunning.Operation.newBuilder() - .setName("createDatabaseTest") - .setDone(true) - .setResponse(Any.pack(expectedResponse)) - .build(); - if (exceptionAtCall == 0) { - mockDatabaseAdmin.addException(exception); - } - mockDatabaseAdmin.addResponse(resultOperation); - if (exceptionAtCall == 1) { - mockDatabaseAdmin.addException(exception); - } - mockDatabaseAdmin.addResponse(resultOperation); - - boolean methodIsIdempotent = - !spanner - .getOptions() - .getDatabaseAdminStubSettings() - .createDatabaseOperationSettings() - .getInitialCallSettings() - .getRetryableCodes() - .isEmpty(); - for (int i = 0; i < 2; i++) { - OperationFuture actualResponse = - client.createDatabase(INSTANCE, "DATABASE", Arrays.asList()); - try { - Database returnedInstance = actualResponse.get(); - if (!methodIsIdempotent && i == exceptionAtCall) { - fail("missing expected exception"); - } - Assert.assertEquals(name.toString(), returnedInstance.getId().getName()); - } catch (ExecutionException e) { - if (!exceptionType.isRetryable() || methodIsIdempotent || i != exceptionAtCall) { - Throwables.throwIfUnchecked(e.getCause()); - throw e; - } - } - } - List actualRequests = mockDatabaseAdmin.getRequests(); - if (methodIsIdempotent) { - Assert.assertEquals(2, actualRequests.size()); - } else { - Assert.assertEquals(1, actualRequests.size()); - } - } - @Test public void updateDatabaseDdlTest() throws Exception { Exception exception = setupException(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java index c198bc4f56..67a180e4f9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java @@ -17,7 +17,10 @@ package com.google.cloud.spanner; import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.common.base.Predicate; import com.google.common.base.Strings; +import com.google.common.collect.Collections2; import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.Policy; import com.google.iam.v1.SetIamPolicyRequest; @@ -66,7 +69,6 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; @@ -77,6 +79,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class MockDatabaseAdminServiceImpl extends DatabaseAdminImplBase implements MockGrpcService { private static final class MockDatabase { @@ -253,9 +259,9 @@ public Backup call() throws Exception { Operation operation = operations.get(operationName); for (int progress = 1; progress <= 100; progress++) { operation = operations.get(operationName); - long sleep = createBackupExecutionTime / 100; + long sleep = createBackupOperationExecutionTime / 100; if (progress == 100) { - sleep += createBackupExecutionTime % 100; + sleep += createBackupOperationExecutionTime % 100; } Thread.sleep(sleep); if (operation != null) { @@ -318,9 +324,9 @@ public Database call() throws Exception { Database proto = db.toProto(); Operation operation = operations.get(operationName); for (int progress = 1; progress <= 100; progress++) { - long sleep = restoreDatabaseExecutionTime / 100; + long sleep = restoreDatabaseOperationExecutionTime / 100; if (progress == 100) { - sleep += restoreDatabaseExecutionTime % 100; + sleep += restoreDatabaseOperationExecutionTime % 100; } Thread.sleep(sleep); if (operation != null) { @@ -391,7 +397,7 @@ public Database call() throws Exception { Thread.sleep(10L); restoreOperation = operations.get(restoreOperationName); } - Thread.sleep(optimizeDatabaseExecutionTime); + Thread.sleep(optimizeDatabaseOperationExecutionTime); db.state = State.READY; Database proto = db.toProto(); if (operation != null) { @@ -423,18 +429,30 @@ private com.google.rpc.Status fromException(Exception e) { return com.google.rpc.Status.newBuilder().setCode(code).setMessage(e.getMessage()).build(); } + private final Queue requests = new ConcurrentLinkedQueue<>(); private ConcurrentMap policies = new ConcurrentHashMap<>(); private static final String EXPIRE_TIME_MASK = "expire_time"; private static final Random RND = new Random(); private final Queue exceptions = new ConcurrentLinkedQueue<>(); + private final ReadWriteLock freezeLock = new ReentrantReadWriteLock(); private final ConcurrentMap databases = new ConcurrentHashMap<>(); private final ConcurrentMap backups = new ConcurrentHashMap<>(); private final ConcurrentMap> filterMatches = new ConcurrentHashMap<>(); private final MockOperationsServiceImpl operations; - private long createBackupExecutionTime; - private long restoreDatabaseExecutionTime; - private long optimizeDatabaseExecutionTime; + private long createBackupOperationExecutionTime; + private long restoreDatabaseOperationExecutionTime; + private long optimizeDatabaseOperationExecutionTime; + + private SimulatedExecutionTime createBackupStartupExecutionTime = SimulatedExecutionTime.none(); + private SimulatedExecutionTime createBackupResponseExecutionTime = SimulatedExecutionTime.none(); + private SimulatedExecutionTime createDatabaseStartupExecutionTime = SimulatedExecutionTime.none(); + private SimulatedExecutionTime createDatabaseResponseExecutionTime = + SimulatedExecutionTime.none(); + private SimulatedExecutionTime restoreDatabaseStartupExecutionTime = + SimulatedExecutionTime.none(); + private SimulatedExecutionTime restoreDatabaseResponseExecutionTime = + SimulatedExecutionTime.none(); public MockDatabaseAdminServiceImpl(MockOperationsServiceImpl operations) { this.operations = operations; @@ -443,36 +461,44 @@ public MockDatabaseAdminServiceImpl(MockOperationsServiceImpl operations) { @Override public void createDatabase( CreateDatabaseRequest request, StreamObserver responseObserver) { - String id = request.getCreateStatement().replace("CREATE DATABASE ", ""); - if (id.startsWith("`") && id.endsWith("`")) { - id = id.substring(1, id.length() - 1); - } - String name = String.format("%s/databases/%s", request.getParent(), id); - MockDatabase db = new MockDatabase(name, request.getExtraStatementsList(), null); - if (databases.putIfAbsent(name, db) == null) { - CreateDatabaseMetadata metadata = - CreateDatabaseMetadata.newBuilder().setDatabase(name).build(); - Database database = Database.newBuilder().setName(name).setState(db.state).build(); - Operation operation = - Operation.newBuilder() - .setMetadata(Any.pack(metadata)) - .setResponse(Any.pack(database)) - .setDone(false) - .setName(operations.generateOperationName(name)) - .build(); - operations.addOperation(operation, new CreateDatabaseCallable(operation.getName(), name)); - responseObserver.onNext(operation); - responseObserver.onCompleted(); - } else { - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription(String.format("Database with name %s already exists", name)) - .asRuntimeException()); + requests.add(request); + try { + createDatabaseStartupExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + String id = request.getCreateStatement().replace("CREATE DATABASE ", ""); + if (id.startsWith("`") && id.endsWith("`")) { + id = id.substring(1, id.length() - 1); + } + String name = String.format("%s/databases/%s", request.getParent(), id); + MockDatabase db = new MockDatabase(name, request.getExtraStatementsList(), null); + if (databases.putIfAbsent(name, db) == null) { + CreateDatabaseMetadata metadata = + CreateDatabaseMetadata.newBuilder().setDatabase(name).build(); + Database database = Database.newBuilder().setName(name).setState(db.state).build(); + Operation operation = + Operation.newBuilder() + .setMetadata(Any.pack(metadata)) + .setResponse(Any.pack(database)) + .setDone(false) + .setName(operations.generateOperationName(name)) + .build(); + operations.addOperation(operation, new CreateDatabaseCallable(operation.getName(), name)); + createDatabaseResponseExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + responseObserver.onNext(operation); + responseObserver.onCompleted(); + } else { + responseObserver.onError( + Status.ALREADY_EXISTS + .withDescription(String.format("Database with name %s already exists", name)) + .asRuntimeException()); + } + } catch (Throwable t) { + responseObserver.onError(t); } } @Override public void dropDatabase(DropDatabaseRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getDatabase()); if (databases.remove(request.getDatabase(), db)) { responseObserver.onNext(Empty.getDefaultInstance()); @@ -484,6 +510,7 @@ public void dropDatabase(DropDatabaseRequest request, StreamObserver resp @Override public void getDatabase(GetDatabaseRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getName()); if (db != null) { responseObserver.onNext( @@ -501,6 +528,7 @@ public void getDatabase(GetDatabaseRequest request, StreamObserver res @Override public void getDatabaseDdl( GetDatabaseDdlRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getDatabase()); if (db != null) { responseObserver.onNext(GetDatabaseDdlResponse.newBuilder().addAllStatements(db.ddl).build()); @@ -513,6 +541,7 @@ public void getDatabaseDdl( @Override public void listDatabases( ListDatabasesRequest request, StreamObserver responseObserver) { + requests.add(request); List dbs = new ArrayList<>(databases.size()); for (Entry entry : databases.entrySet()) { dbs.add( @@ -530,6 +559,7 @@ public void listDatabases( public void listDatabaseOperations( ListDatabaseOperationsRequest request, StreamObserver responseObserver) { + requests.add(request); ListDatabaseOperationsResponse.Builder builder = ListDatabaseOperationsResponse.newBuilder(); try { for (Operation op : operations.iterable()) { @@ -554,6 +584,36 @@ private boolean matchesFilter(Object obj, String filter) throws Exception { String name = (String) obj.getClass().getMethod("getName").invoke(obj); return matches.contains(name); } + if (obj instanceof Operation) { + Operation operation = (Operation) obj; + Pattern pattern = + Pattern.compile( + "(?:\\(metadata.(?:name|database):(.*)\\)|\\(name:(.*)/operations/\\)) AND \\(metadata.@type:type.googleapis.com/(.*)\\)"); + Matcher matcher = pattern.matcher(filter); + if (matcher.matches()) { + String objectName = matcher.group(1); + if (objectName == null) { + objectName = matcher.group(2); + } + String type = matcher.group(3); + Any anyMetadata = operation.getMetadata(); + if (anyMetadata.getTypeUrl().endsWith(type)) { + if (type.equals(CreateBackupMetadata.getDescriptor().getFullName())) { + CreateBackupMetadata metadata = + operation.getMetadata().unpack(CreateBackupMetadata.class); + return metadata.getName().equals(objectName); + } else if (type.equals(CreateDatabaseMetadata.getDescriptor().getFullName())) { + CreateDatabaseMetadata metadata = + operation.getMetadata().unpack(CreateDatabaseMetadata.class); + return metadata.getDatabase().equals(objectName); + } else if (type.equals(RestoreDatabaseMetadata.getDescriptor().getFullName())) { + RestoreDatabaseMetadata metadata = + operation.getMetadata().unpack(RestoreDatabaseMetadata.class); + return metadata.getName().equals(objectName); + } + } + } + } return false; } return true; @@ -562,6 +622,7 @@ private boolean matchesFilter(Object obj, String filter) throws Exception { @Override public void updateDatabaseDdl( UpdateDatabaseDdlRequest request, StreamObserver responseObserver) { + requests.add(request); MockDatabase db = databases.get(request.getDatabase()); if (db != null) { db.ddl.addAll(request.getStatementsList()); @@ -588,50 +649,59 @@ public void updateDatabaseDdl( @Override public void createBackup( CreateBackupRequest request, StreamObserver responseObserver) { - String name = String.format("%s/backups/%s", request.getParent(), request.getBackupId()); - MockDatabase db = databases.get(request.getBackup().getDatabase()); - if (db == null) { - responseObserver.onError( - Status.NOT_FOUND - .withDescription( - String.format( - "Database with name %s not found", request.getBackup().getDatabase())) - .asRuntimeException()); - return; - } - MockBackup bck = new MockBackup(name, request.getBackup(), db); - if (backups.putIfAbsent(name, bck) == null) { - CreateBackupMetadata metadata = - CreateBackupMetadata.newBuilder() - .setName(name) - .setDatabase(bck.database) - .setProgress( - OperationProgress.newBuilder() - .setStartTime( - Timestamp.newBuilder() - .setSeconds(System.currentTimeMillis() / 1000L) - .build()) - .setProgressPercent(0)) - .build(); - Operation operation = - Operation.newBuilder() - .setMetadata(Any.pack(metadata)) - .setResponse(Any.pack(bck.toProto())) - .setName(operations.generateOperationName(name)) - .build(); - operations.addOperation(operation, new CreateBackupCallable(operation.getName(), name)); - responseObserver.onNext(operation); - responseObserver.onCompleted(); - } else { - responseObserver.onError( - Status.ALREADY_EXISTS - .withDescription(String.format("Backup with name %s already exists", name)) - .asRuntimeException()); + requests.add(request); + try { + createBackupStartupExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + String name = String.format("%s/backups/%s", request.getParent(), request.getBackupId()); + MockDatabase db = databases.get(request.getBackup().getDatabase()); + if (db == null) { + responseObserver.onError( + Status.NOT_FOUND + .withDescription( + String.format( + "Database with name %s not found", request.getBackup().getDatabase())) + .asRuntimeException()); + return; + } + MockBackup bck = new MockBackup(name, request.getBackup(), db); + if (backups.putIfAbsent(name, bck) == null) { + CreateBackupMetadata metadata = + CreateBackupMetadata.newBuilder() + .setName(name) + .setDatabase(bck.database) + .setProgress( + OperationProgress.newBuilder() + .setStartTime( + Timestamp.newBuilder() + .setSeconds(System.currentTimeMillis() / 1000L) + .build()) + .setProgressPercent(0)) + .build(); + Operation operation = + Operation.newBuilder() + .setMetadata(Any.pack(metadata)) + .setResponse(Any.pack(bck.toProto())) + .setName(operations.generateOperationName(name)) + .build(); + operations.addOperation(operation, new CreateBackupCallable(operation.getName(), name)); + + createBackupResponseExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + responseObserver.onNext(operation); + responseObserver.onCompleted(); + } else { + responseObserver.onError( + Status.ALREADY_EXISTS + .withDescription(String.format("Backup with name %s already exists", name)) + .asRuntimeException()); + } + } catch (Throwable t) { + responseObserver.onError(t); } } @Override public void deleteBackup(DeleteBackupRequest request, StreamObserver responseObserver) { + requests.add(request); MockBackup bck = backups.get(request.getName()); if (backups.remove(request.getName(), bck)) { responseObserver.onNext(Empty.getDefaultInstance()); @@ -643,6 +713,7 @@ public void deleteBackup(DeleteBackupRequest request, StreamObserver resp @Override public void getBackup(GetBackupRequest request, StreamObserver responseObserver) { + requests.add(request); MockBackup bck = backups.get(request.getName()); if (bck != null) { responseObserver.onNext( @@ -663,6 +734,7 @@ public void getBackup(GetBackupRequest request, StreamObserver responseO @Override public void listBackups( ListBackupsRequest request, StreamObserver responseObserver) { + requests.add(request); List bcks = new ArrayList<>(backups.size()); try { for (Entry entry : backups.entrySet()) { @@ -689,6 +761,7 @@ public void listBackups( public void listBackupOperations( ListBackupOperationsRequest request, StreamObserver responseObserver) { + requests.add(request); ListBackupOperationsResponse.Builder builder = ListBackupOperationsResponse.newBuilder(); try { for (Operation op : operations.iterable()) { @@ -708,6 +781,7 @@ public void listBackupOperations( @Override public void updateBackup(UpdateBackupRequest request, StreamObserver responseObserver) { + requests.add(request); MockBackup bck = backups.get(request.getBackup().getName()); if (bck != null) { if (request.getUpdateMask().getPathsList().contains(EXPIRE_TIME_MASK)) { @@ -731,70 +805,80 @@ public void updateBackup(UpdateBackupRequest request, StreamObserver res @Override public void restoreDatabase( RestoreDatabaseRequest request, StreamObserver responseObserver) { - MockBackup bck = backups.get(request.getBackup()); - if (bck != null) { - String name = String.format("%s/databases/%s", request.getParent(), request.getDatabaseId()); - MockDatabase db = - new MockDatabase( - name, - bck.ddl, - RestoreInfo.newBuilder() + requests.add(request); + try { + restoreDatabaseStartupExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + MockBackup bck = backups.get(request.getBackup()); + if (bck != null) { + String name = + String.format("%s/databases/%s", request.getParent(), request.getDatabaseId()); + MockDatabase db = + new MockDatabase( + name, + bck.ddl, + RestoreInfo.newBuilder() + .setBackupInfo(bck.toBackupInfo()) + .setSourceType(RestoreSourceType.BACKUP) + .build()); + if (databases.putIfAbsent(name, db) == null) { + bck.referencingDatabases.add(db.name); + Operation optimizeOperation = + Operation.newBuilder() + .setDone(false) + .setName(operations.generateOperationName(name)) + .setMetadata( + Any.pack( + OptimizeRestoredDatabaseMetadata.newBuilder() + .setName(name) + .setProgress( + OperationProgress.newBuilder() + .setStartTime(currentTime()) + .setProgressPercent(0) + .build()) + .build())) + .setResponse(Any.pack(db.toProto())) + .build(); + RestoreDatabaseMetadata metadata = + RestoreDatabaseMetadata.newBuilder() .setBackupInfo(bck.toBackupInfo()) + .setName(name) + .setProgress( + OperationProgress.newBuilder() + .setStartTime(currentTime()) + .setProgressPercent(0) + .build()) + .setOptimizeDatabaseOperationName(optimizeOperation.getName()) .setSourceType(RestoreSourceType.BACKUP) - .build()); - if (databases.putIfAbsent(name, db) == null) { - bck.referencingDatabases.add(db.name); - Operation optimizeOperation = - Operation.newBuilder() - .setDone(false) - .setName(operations.generateOperationName(name)) - .setMetadata( - Any.pack( - OptimizeRestoredDatabaseMetadata.newBuilder() - .setName(name) - .setProgress( - OperationProgress.newBuilder() - .setStartTime(currentTime()) - .setProgressPercent(0) - .build()) - .build())) - .setResponse(Any.pack(db.toProto())) - .build(); - RestoreDatabaseMetadata metadata = - RestoreDatabaseMetadata.newBuilder() - .setBackupInfo(bck.toBackupInfo()) - .setName(name) - .setProgress( - OperationProgress.newBuilder() - .setStartTime(currentTime()) - .setProgressPercent(0) - .build()) - .setOptimizeDatabaseOperationName(optimizeOperation.getName()) - .setSourceType(RestoreSourceType.BACKUP) - .build(); - Operation operation = - Operation.newBuilder() - .setMetadata(Any.pack(metadata)) - .setResponse(Any.pack(db.toProto())) - .setDone(false) - .setName(operations.generateOperationName(name)) - .build(); - operations.addOperation(operation, new RestoreDatabaseCallable(operation.getName(), name)); - operations.addOperation( - optimizeOperation, - new OptimizeDatabaseCallable(optimizeOperation.getName(), operation.getName(), name)); - responseObserver.onNext(operation); - responseObserver.onCompleted(); + .build(); + Operation operation = + Operation.newBuilder() + .setMetadata(Any.pack(metadata)) + .setResponse(Any.pack(db.toProto())) + .setDone(false) + .setName(operations.generateOperationName(name)) + .build(); + operations.addOperation( + operation, new RestoreDatabaseCallable(operation.getName(), name)); + operations.addOperation( + optimizeOperation, + new OptimizeDatabaseCallable(optimizeOperation.getName(), operation.getName(), name)); + restoreDatabaseResponseExecutionTime.simulateExecutionTime(exceptions, false, freezeLock); + responseObserver.onNext(operation); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException()); + } } else { - responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException()); + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); } - } else { - responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); + } catch (Throwable t) { + responseObserver.onError(t); } } @Override public void getIamPolicy(GetIamPolicyRequest request, StreamObserver responseObserver) { + requests.add(request); Policy policy = policies.get(request.getResource()); if (policy != null) { responseObserver.onNext(policy); @@ -806,6 +890,7 @@ public void getIamPolicy(GetIamPolicyRequest request, StreamObserver res @Override public void setIamPolicy(SetIamPolicyRequest request, StreamObserver responseObserver) { + requests.add(request); policies.put(request.getResource(), request.getPolicy()); responseObserver.onNext(request.getPolicy()); responseObserver.onCompleted(); @@ -815,6 +900,7 @@ public void setIamPolicy(SetIamPolicyRequest request, StreamObserver res public void testIamPermissions( TestIamPermissionsRequest request, StreamObserver responseObserver) { + requests.add(request); // Just return the same permissions as in the request, as we don't have any credentials. responseObserver.onNext( TestIamPermissionsResponse.newBuilder() @@ -825,7 +911,19 @@ public void testIamPermissions( @Override public List getRequests() { - return Collections.emptyList(); + return new ArrayList<>(requests); + } + + public int countRequestsOfType(final Class type) { + return Collections2.filter( + getRequests(), + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input.getClass().equals(type); + } + }) + .size(); } @Override @@ -858,6 +956,7 @@ public ServerServiceDefinition getServiceDefinition() { @Override public void reset() { + requests.clear(); exceptions.clear(); policies.clear(); databases.clear(); @@ -865,7 +964,42 @@ public void reset() { filterMatches.clear(); } + public void removeAllExecutionTimes() { + createBackupStartupExecutionTime = SimulatedExecutionTime.none(); + createBackupResponseExecutionTime = SimulatedExecutionTime.none(); + createBackupOperationExecutionTime = 0L; + createDatabaseStartupExecutionTime = SimulatedExecutionTime.none(); + createDatabaseResponseExecutionTime = SimulatedExecutionTime.none(); + restoreDatabaseStartupExecutionTime = SimulatedExecutionTime.none(); + restoreDatabaseResponseExecutionTime = SimulatedExecutionTime.none(); + restoreDatabaseOperationExecutionTime = 0L; + } + private Timestamp currentTime() { return Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000L).build(); } + + public void setCreateBackupStartupExecutionTime(SimulatedExecutionTime exec) { + this.createBackupStartupExecutionTime = exec; + } + + public void setCreateBackupResponseExecutionTime(SimulatedExecutionTime exec) { + this.createBackupResponseExecutionTime = exec; + } + + public void setCreateDatabaseStartupExecutionTime(SimulatedExecutionTime exec) { + this.createDatabaseStartupExecutionTime = exec; + } + + public void setCreateDatabaseResponseExecutionTime(SimulatedExecutionTime exec) { + this.createDatabaseResponseExecutionTime = exec; + } + + public void setRestoreDatabaseStartupExecutionTime(SimulatedExecutionTime exec) { + this.restoreDatabaseStartupExecutionTime = exec; + } + + public void setRestoreDatabaseResponseExecutionTime(SimulatedExecutionTime exec) { + this.restoreDatabaseResponseExecutionTime = exec; + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index e3d002948e..069242d5a7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -418,7 +418,7 @@ private SimulatedExecutionTime( this.stickyException = stickyException; } - private void simulateExecutionTime( + void simulateExecutionTime( Queue globalExceptions, boolean stickyGlobalExceptions, ReadWriteLock freezeLock) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java index 574d9c9b15..2755123ea0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java @@ -19,22 +19,46 @@ import static com.google.cloud.spanner.SpannerMatchers.isSpannerException; import static com.google.common.truth.Truth.assertThat; +import com.google.api.gax.grpc.GrpcInterceptorProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Backup; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.testing.RemoteSpannerHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.spanner.admin.database.v1.CreateBackupMetadata; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import com.google.spanner.admin.database.v1.RestoreDatabaseMetadata; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientCall.Listener; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -173,4 +197,214 @@ public void listPagination() throws Exception { } assertThat(dbIdsGot).containsAtLeastElementsIn(dbIds); } + + private static final class InjectErrorInterceptorProvider implements GrpcInterceptorProvider { + final AtomicBoolean injectError = new AtomicBoolean(true); + final AtomicInteger getOperationCount = new AtomicInteger(); + final AtomicInteger methodCount = new AtomicInteger(); + final String methodName; + + private InjectErrorInterceptorProvider(String methodName) { + this.methodName = methodName; + } + + @Override + public List getInterceptors() { + ClientInterceptor interceptor = + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (method.getFullMethodName().contains("GetOperation")) { + getOperationCount.incrementAndGet(); + } + if (!method.getFullMethodName().contains(methodName)) { + return next.newCall(method, callOptions); + } + + methodCount.incrementAndGet(); + final AtomicBoolean errorInjected = new AtomicBoolean(); + final ClientCall clientCall = next.newCall(method, callOptions); + + return new SimpleForwardingClientCall(clientCall) { + @Override + public void start(Listener responseListener, Metadata headers) { + super.start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + if (injectError.getAndSet(false)) { + errorInjected.set(true); + clientCall.cancel("Cancelling call for injected error", null); + } else { + super.onMessage(message); + } + } + + @Override + public void onClose(Status status, Metadata metadata) { + if (errorInjected.get()) { + status = Status.UNAVAILABLE.augmentDescription("INJECTED BY TEST"); + } + super.onClose(status, metadata); + } + }, + headers); + } + }; + } + }; + return Collections.singletonList(interceptor); + } + } + + @Test + public void testRetryNonIdempotentRpcsReturningLongRunningOperations() throws Exception { + // RPCs that return a long-running operation such as CreateDatabase, CreateBackup and + // RestoreDatabase are non-idempotent and can normally not be automatically retried in case of a + // transient failure. The client library will however automatically query the backend to check + // whether the corresponding operation was started or not, and if it was, it will pick up the + // existing operation. If no operation is found, a new RPC call will be executed to start the + // operation. + + List databases = new ArrayList<>(); + List backups = new ArrayList<>(); + String initialDatabaseId; + Timestamp initialDbCreateTime; + + try { + // CreateDatabase + InjectErrorInterceptorProvider createDbInterceptor = + new InjectErrorInterceptorProvider("CreateDatabase"); + SpannerOptions options = + testHelper.getOptions().toBuilder().setInterceptorProvider(createDbInterceptor).build(); + try (Spanner spanner = options.getService()) { + initialDatabaseId = testHelper.getUniqueDatabaseId(); + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + OperationFuture op = + client.createDatabase( + testHelper.getInstanceId().getInstance(), + initialDatabaseId, + Collections.emptyList()); + databases.add(op.get()); + // Keep track of the original create time of this database, as we will drop this database + // later and create another one with the exact same name. That means that the ListOperations + // call will return at least two CreateDatabase operations. The retry logic should always + // pick the last one. + initialDbCreateTime = op.get().getCreateTime(); + // Assert that the CreateDatabase RPC was called only once, and that the operation tracking + // was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + } + + // CreateBackup + InjectErrorInterceptorProvider createBackupInterceptor = + new InjectErrorInterceptorProvider("CreateBackup"); + options = + testHelper + .getOptions() + .toBuilder() + .setInterceptorProvider(createBackupInterceptor) + .build(); + try (Spanner spanner = options.getService()) { + String databaseId = databases.get(0).getId().getDatabase(); + String backupId = String.format("test-bck-%08d", new Random().nextInt(100000000)); + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + OperationFuture op = + client.createBackup( + testHelper.getInstanceId().getInstance(), + backupId, + databaseId, + Timestamp.ofTimeSecondsAndNanos( + Timestamp.now().getSeconds() + TimeUnit.SECONDS.convert(7L, TimeUnit.DAYS), 0)); + backups.add(op.get()); + // Assert that the CreateBackup RPC was called only once, and that the operation tracking + // was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + } + + // RestoreBackup + int attempts = 0; + while (true) { + InjectErrorInterceptorProvider restoreBackupInterceptor = + new InjectErrorInterceptorProvider("RestoreBackup"); + options = + testHelper + .getOptions() + .toBuilder() + .setInterceptorProvider(restoreBackupInterceptor) + .build(); + try (Spanner spanner = options.getService()) { + String backupId = backups.get(0).getId().getBackup(); + String restoredDbId = testHelper.getUniqueDatabaseId(); + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + OperationFuture op = + client.restoreDatabase( + testHelper.getInstanceId().getInstance(), + backupId, + testHelper.getInstanceId().getInstance(), + restoredDbId); + databases.add(op.get()); + // Assert that the RestoreDatabase RPC was called only once, and that the operation + // tracking was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + break; + } catch (ExecutionException e) { + if (e.getCause() instanceof SpannerException + && ((SpannerException) e.getCause()).getErrorCode() == ErrorCode.FAILED_PRECONDITION + && e.getCause() + .getMessage() + .contains("Please retry the operation once the pending restores complete")) { + attempts++; + if (attempts == 10) { + // Still same error after 10 attempts. Ignore. + break; + } + // wait and then retry. + Thread.sleep(60_000L); + } else { + throw e; + } + } + } + + // Create another database with the exact same name as the first database. + createDbInterceptor = new InjectErrorInterceptorProvider("CreateDatabase"); + options = + testHelper.getOptions().toBuilder().setInterceptorProvider(createDbInterceptor).build(); + try (Spanner spanner = options.getService()) { + DatabaseAdminClient client = spanner.getDatabaseAdminClient(); + // First drop the initial database. + client.dropDatabase(testHelper.getInstanceId().getInstance(), initialDatabaseId); + // Now re-create a database with the exact same name. + OperationFuture op = + client.createDatabase( + testHelper.getInstanceId().getInstance(), + initialDatabaseId, + Collections.emptyList()); + // Check that the second database was created and has a greater creation time than the + // first. + Timestamp secondCreationTime = op.get().getCreateTime(); + // TODO: Change this to greaterThan when the create time of a database is reported back by + // the server. + assertThat(secondCreationTime).isAtLeast(initialDbCreateTime); + // Assert that the CreateDatabase RPC was called only once, and that the operation tracking + // was resumed through a GetOperation call. + assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1); + assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1); + } + } finally { + DatabaseAdminClient client = testHelper.getClient().getDatabaseAdminClient(); + for (Database database : databases) { + client.dropDatabase( + database.getId().getInstanceId().getInstance(), database.getId().getDatabase()); + } + for (Backup backup : backups) { + client.deleteBackup(backup.getInstanceId().getInstance(), backup.getId().getBackup()); + } + } + } }